Skip to content

Commit

Permalink
Fix decompress bug in NonBlockingPipedOutputStream (#1542)
Browse files Browse the repository at this point in the history
* Fix decompress bug in NonBlockingPipedOutputStream & extra logging

* Use StringUtils to make large content

* Move test to http client test

---------

Co-authored-by: mzitnik <[email protected]>
  • Loading branch information
mzitnik and mzitnik authored Jan 31, 2024
1 parent bbd6808 commit c63a34d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 0.6.1

### Bug Fixes
- Fix buffering issue caused by decompress flag not to work when working with HTTP Client.

## 0.6.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.clickhouse.data.value.UnsignedShort;

import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import com.clickhouse.data.ClickHouseDataUpdater;
import com.clickhouse.data.ClickHousePassThruStream;
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;

public abstract class AbstractByteArrayOutputStream extends ClickHouseOutputStream {
private static final Logger log = LoggerFactory.getLogger(AbstractByteArrayOutputStream.class);
protected final byte[] buffer;

protected int position;
Expand Down Expand Up @@ -72,6 +75,7 @@ public ClickHouseOutputStream writeBuffer(ClickHouseByteBuffer buffer) throws IO
byte[] b = this.buffer;
int limit = b.length;
int length = buffer.length();
log.debug("writeBuffer limit:[{}] length:[{}] position:[{}]", limit, length, position);
if (length <= limit - position) {
System.arraycopy(buffer.array(), buffer.position(), b, position, length);
position += length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@
import com.clickhouse.data.ClickHouseOutputStream;
import com.clickhouse.data.ClickHousePassThruStream;

import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;

public class Lz4OutputStream extends AbstractByteArrayOutputStream {
private static final LZ4Factory factory = LZ4Factory.fastestInstance();

private static final Logger log = LoggerFactory.getLogger(Lz4OutputStream.class);
private final OutputStream output;

private final LZ4Compressor compressor;
private final byte[] compressedBlock;

@Override
protected void flushBuffer() throws IOException {
log.debug("flushBuffer [{}:{}]", 0, position);
if (position == 0) {
log.debug("flushBuffer: nothing to flush");
return;
}
byte[] block = compressedBlock;
block[16] = Lz4InputStream.MAGIC;
int compressed = compressor.compress(buffer, 0, position, block, 25);
Expand All @@ -37,6 +44,7 @@ protected void flushBuffer() throws IOException {

@Override
protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException {
log.debug("flushBuffer [{}:{}]", offset, length);
int maxLen = compressor.maxCompressedLength(length) + 15;
byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen];
block[16] = Lz4InputStream.MAGIC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.ClickHouseUtils;
import com.clickhouse.data.ClickHouseWriter;
import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;

/**
* A combination of {@link java.io.PipedOutputStream} and
Expand All @@ -23,6 +25,9 @@
* reader are on two separate threads.
*/
public class NonBlockingPipedOutputStream extends ClickHousePipedOutputStream {

private static final Logger log = LoggerFactory.getLogger(NonBlockingPipedOutputStream.class);

protected final AdaptiveQueue<ByteBuffer> queue;

protected final int bufferSize;
Expand Down Expand Up @@ -176,6 +181,7 @@ public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) t
ByteBuffer b = buffer;
while (length > 0) {
int remain = b.remaining();
log.debug("writeBytes length:[%d] remain:[%d] offset: [%d]", length, remain, offset);
if (length < remain) {
b.put(bytes, offset, length);
length = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.LongStream;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
Expand All @@ -27,16 +30,20 @@
import com.clickhouse.client.http.config.HttpConnectionProvider;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseCompression;
import com.clickhouse.data.ClickHouseDataStreamFactory;
import com.clickhouse.data.ClickHouseExternalTable;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.data.ClickHousePipedOutputStream;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseVersion;
import com.clickhouse.data.format.BinaryStreamUtils;
import com.clickhouse.data.value.ClickHouseStringValue;

import eu.rekawek.toxiproxy.ToxiproxyClient;

import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -469,4 +476,63 @@ public void testProxyConnection() throws ClickHouseException, IOException {
}
}
}
@Test(groups = "integration")
public void testDecompressWithLargeChunk() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
ClickHouseNode server = getServer();

String tableName = "test_decompress_with_large_chunk";

String tableColumns = String.format("id Int64, raw String");
sendAndWait(server, "drop table if exists " + tableName,
"create table " + tableName + " (" + tableColumns + ")engine=Memory");

long numRows = 1;
String content = StringUtils.repeat("*", 50000);
try {
try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = client.read(server)
.write()
.table(tableName)
.decompressClientRequest(true)
//.option(ClickHouseClientOption.USE_BLOCKING_QUEUE, "true")
.format(ClickHouseFormat.RowBinary);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;

try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
// write bytes into the piped stream
LongStream.range(0, numRows).forEachOrdered(
n -> {
try {
BinaryStreamUtils.writeInt64(stream, n);
BinaryStreamUtils.writeString(stream, content);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);

// We need to close the stream before getting a response
stream.close();
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
}
}

}
} catch (Exception e) {
Throwable th = e.getCause();
// if (th instanceof ClickHouseException) {
// ClickHouseException ce = (ClickHouseException) th;
// Assert.assertEquals(73, ce.getErrorCode(), "It's Code: 73. DB::Exception: Unknown format RowBinaryWithDefaults. a server that not support the format");
// } else {
Assert.assertTrue(false, e.getMessage());
// }
}

}
}

0 comments on commit c63a34d

Please sign in to comment.