From c63a34d24742d840b0057dcaaeafc3ac0b5fa49e Mon Sep 17 00:00:00 2001 From: Mark Zitnik Date: Wed, 31 Jan 2024 16:55:19 +0200 Subject: [PATCH] Fix decompress bug in NonBlockingPipedOutputStream (#1542) * Fix decompress bug in NonBlockingPipedOutputStream & extra logging * Use StringUtils to make large content * Move test to http client test --------- Co-authored-by: mzitnik --- CHANGELOG.md | 1 + .../client/ClientIntegrationTest.java | 1 + .../stream/AbstractByteArrayOutputStream.java | 4 ++ .../data/stream/Lz4OutputStream.java | 10 ++- .../stream/NonBlockingPipedOutputStream.java | 6 ++ .../client/http/ClickHouseHttpClientTest.java | 66 +++++++++++++++++++ 6 files changed, 87 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88614def9..b9caf0939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 0.6.1 ### Bug Fixes +- Fix buffering issue caused by decompress flag not to work when working with HTTP Client. ## 0.6.0 diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 2e6c7ff20..c76f856ec 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -44,6 +44,7 @@ import com.clickhouse.data.value.UnsignedShort; import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import org.testng.Assert; import org.testng.SkipException; import org.testng.annotations.DataProvider; diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/AbstractByteArrayOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/AbstractByteArrayOutputStream.java index 44404ae76..6a3c73acc 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/AbstractByteArrayOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/AbstractByteArrayOutputStream.java @@ -6,8 +6,11 @@ import com.clickhouse.data.ClickHouseDataUpdater; import com.clickhouse.data.ClickHousePassThruStream; import com.clickhouse.data.ClickHouseOutputStream; +import com.clickhouse.logging.Logger; +import com.clickhouse.logging.LoggerFactory; public abstract class AbstractByteArrayOutputStream extends ClickHouseOutputStream { + private static final Logger log = LoggerFactory.getLogger(AbstractByteArrayOutputStream.class); protected final byte[] buffer; protected int position; @@ -72,6 +75,7 @@ public ClickHouseOutputStream writeBuffer(ClickHouseByteBuffer buffer) throws IO byte[] b = this.buffer; int limit = b.length; int length = buffer.length(); + log.debug("writeBuffer limit:[{}] length:[{}] position:[{}]", limit, length, position); if (length <= limit - position) { System.arraycopy(buffer.array(), buffer.position(), b, position, length); position += length; diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4OutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4OutputStream.java index a5c5084af..1f4a92397 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4OutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/Lz4OutputStream.java @@ -9,12 +9,14 @@ import com.clickhouse.data.ClickHouseOutputStream; import com.clickhouse.data.ClickHousePassThruStream; +import com.clickhouse.logging.Logger; +import com.clickhouse.logging.LoggerFactory; import net.jpountz.lz4.LZ4Compressor; import net.jpountz.lz4.LZ4Factory; public class Lz4OutputStream extends AbstractByteArrayOutputStream { private static final LZ4Factory factory = LZ4Factory.fastestInstance(); - + private static final Logger log = LoggerFactory.getLogger(Lz4OutputStream.class); private final OutputStream output; private final LZ4Compressor compressor; @@ -22,6 +24,11 @@ public class Lz4OutputStream extends AbstractByteArrayOutputStream { @Override protected void flushBuffer() throws IOException { + log.debug("flushBuffer [{}:{}]", 0, position); + if (position == 0) { + log.debug("flushBuffer: nothing to flush"); + return; + } byte[] block = compressedBlock; block[16] = Lz4InputStream.MAGIC; int compressed = compressor.compress(buffer, 0, position, block, 25); @@ -37,6 +44,7 @@ protected void flushBuffer() throws IOException { @Override protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException { + log.debug("flushBuffer [{}:{}]", offset, length); int maxLen = compressor.maxCompressedLength(length) + 15; byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen]; block[16] = Lz4InputStream.MAGIC; diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java index 4c2c083ea..8f617b323 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/stream/NonBlockingPipedOutputStream.java @@ -15,6 +15,8 @@ import com.clickhouse.data.ClickHousePipedOutputStream; import com.clickhouse.data.ClickHouseUtils; import com.clickhouse.data.ClickHouseWriter; +import com.clickhouse.logging.Logger; +import com.clickhouse.logging.LoggerFactory; /** * A combination of {@link java.io.PipedOutputStream} and @@ -23,6 +25,9 @@ * reader are on two separate threads. */ public class NonBlockingPipedOutputStream extends ClickHousePipedOutputStream { + + private static final Logger log = LoggerFactory.getLogger(NonBlockingPipedOutputStream.class); + protected final AdaptiveQueue queue; protected final int bufferSize; @@ -176,6 +181,7 @@ public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) t ByteBuffer b = buffer; while (length > 0) { int remain = b.remaining(); + log.debug("writeBytes length:[%d] remain:[%d] offset: [%d]", length, remain, offset); if (length < remain) { b.put(bytes, offset, length); length = 0; diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java index 55a36d58e..a17816f5c 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java @@ -3,6 +3,9 @@ import java.io.IOException; import java.io.Serializable; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.LongStream; import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseConfig; @@ -27,16 +30,20 @@ import com.clickhouse.client.http.config.HttpConnectionProvider; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseCompression; +import com.clickhouse.data.ClickHouseDataStreamFactory; import com.clickhouse.data.ClickHouseExternalTable; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.ClickHouseInputStream; +import com.clickhouse.data.ClickHousePipedOutputStream; import com.clickhouse.data.ClickHouseRecord; import com.clickhouse.data.ClickHouseVersion; +import com.clickhouse.data.format.BinaryStreamUtils; import com.clickhouse.data.value.ClickHouseStringValue; import eu.rekawek.toxiproxy.ToxiproxyClient; import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -469,4 +476,63 @@ public void testProxyConnection() throws ClickHouseException, IOException { } } } + @Test(groups = "integration") + public void testDecompressWithLargeChunk() throws ClickHouseException, IOException, ExecutionException, InterruptedException { + ClickHouseNode server = getServer(); + + String tableName = "test_decompress_with_large_chunk"; + + String tableColumns = String.format("id Int64, raw String"); + sendAndWait(server, "drop table if exists " + tableName, + "create table " + tableName + " (" + tableColumns + ")engine=Memory"); + + long numRows = 1; + String content = StringUtils.repeat("*", 50000); + try { + try (ClickHouseClient client = getClient()) { + ClickHouseRequest.Mutation request = client.read(server) + .write() + .table(tableName) + .decompressClientRequest(true) + //.option(ClickHouseClientOption.USE_BLOCKING_QUEUE, "true") + .format(ClickHouseFormat.RowBinary); + ClickHouseConfig config = request.getConfig(); + CompletableFuture future; + + try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config)) { + // start the worker thread which transfer data from the input into ClickHouse + future = request.data(stream.getInputStream()).execute(); + // write bytes into the piped stream + LongStream.range(0, numRows).forEachOrdered( + n -> { + try { + BinaryStreamUtils.writeInt64(stream, n); + BinaryStreamUtils.writeString(stream, content); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ); + + // We need to close the stream before getting a response + stream.close(); + try (ClickHouseResponse response = future.get()) { + ClickHouseResponseSummary summary = response.getSummary(); + Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows"); + } + } + + } + } catch (Exception e) { + Throwable th = e.getCause(); +// if (th instanceof ClickHouseException) { +// ClickHouseException ce = (ClickHouseException) th; +// Assert.assertEquals(73, ce.getErrorCode(), "It's Code: 73. DB::Exception: Unknown format RowBinaryWithDefaults. a server that not support the format"); +// } else { + Assert.assertTrue(false, e.getMessage()); +// } + } + + } }