Skip to content

Commit

Permalink
Merge pull request #969 from zhicwu/lz4
Browse files Browse the repository at this point in the history
minor changes to enhance lz4 support and cli-client
  • Loading branch information
zhicwu authored Jun 26, 2022
2 parents 90e80ff + 6f82de2 commit a9b4dab
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir);
String containerDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_CONTAINER_DIRECTORY);
if (ClickHouseChecker.isNullOrBlank(containerDir)) {
containerDir = "/data/";
containerDir = "/tmp/";
} else {
containerDir = ClickHouseUtils.normalizeDirectory(containerDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public enum ClickHouseCommandLineOption implements ClickHouseOption {
/**
* Work directory inside container, only works running in docker mode(when
* {@link #CLICKHOUSE_CLI_PATH} is not available). Empty value is treated as
* '/data'.
* '/tmp'.
*/
CLI_CONTAINER_DIRECTORY("cli_container_directory", "",
"Work directory inside container, empty value is treated as '/data'"),
"Work directory inside container, empty value is treated as '/tmp'"),
/**
* Command-line work directory. Empty value is treated as system temporary
* directory(e.g. {@code System.getProperty("java.io.tmpdir")}). When running in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class Lz4InputStream extends AbstractByteArrayInputStream {
private final InputStream stream;
private final byte[] header;

private byte[] compressedBlock;

private boolean readFully(byte[] b, int off, int len) throws IOException {
int n = 0;
while (n < len) {
Expand Down Expand Up @@ -61,7 +63,8 @@ protected int updateBuffer() throws IOException {
// 4 bytes - size of uncompressed data
int uncompressedSize = BinaryStreamUtils.toInt32(header, 21);
int offset = 9;
byte[] block = new byte[compressedSizeWithHeader];
final byte[] block = compressedBlock.length >= compressedSizeWithHeader ? compressedBlock
: (compressedBlock = new byte[compressedSizeWithHeader]);
block[0] = header[16];
BinaryStreamUtils.setInt32(block, 1, compressedSizeWithHeader);
BinaryStreamUtils.setInt32(block, 5, uncompressedSize);
Expand All @@ -70,17 +73,17 @@ protected int updateBuffer() throws IOException {
throw new IOException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, 0, compressedSizeWithHeader - offset));
}

long[] real = ClickHouseCityHash.cityHash128(block, 0, block.length);
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
if (real[0] != BinaryStreamUtils.toInt64(header, 0) || real[1] != BinaryStreamUtils.toInt64(header, 8)) {
throw new IOException("Checksum doesn't match: corrupted data.");
}

buffer = new byte[uncompressedSize];
decompressor.decompress(block, offset, buffer, 0, uncompressedSize);
final byte[] buf = buffer.length >= uncompressedSize ? buffer : (buffer = new byte[uncompressedSize]);
decompressor.decompress(block, offset, buf, 0, uncompressedSize);
if (copyTo != null) {
copyTo.write(buffer);
copyTo.write(buf);
}
return limit = buffer.length;
return limit = uncompressedSize;
}

public Lz4InputStream(InputStream stream) {
Expand All @@ -93,6 +96,8 @@ public Lz4InputStream(ClickHouseFile file, InputStream stream, Runnable postClos
this.decompressor = factory.fastDecompressor();
this.stream = ClickHouseChecker.nonNull(stream, "InputStream");
this.header = new byte[HEADER_LENGTH];

this.compressedBlock = ClickHouseByteBuffer.EMPTY_BYTES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,23 @@ public class Lz4OutputStream extends AbstractByteArrayOutputStream {

@Override
protected void flushBuffer() throws IOException {
int compressed = compressor.compress(buffer, 0, position, compressedBlock, 25);
byte[] block = compressedBlock;
block[16] = Lz4InputStream.MAGIC;
int compressed = compressor.compress(buffer, 0, position, block, 25);
int compressedSizeWithHeader = compressed + 9;
BinaryStreamUtils.setInt32(compressedBlock, 17, compressedSizeWithHeader); // compressed size with header
BinaryStreamUtils.setInt32(compressedBlock, 21, position); // uncompressed size
long[] hash = ClickHouseCityHash.cityHash128(compressedBlock, 16, compressedSizeWithHeader);
BinaryStreamUtils.setInt64(compressedBlock, 0, hash[0]);
BinaryStreamUtils.setInt64(compressedBlock, 8, hash[1]);
output.write(compressedBlock, 0, compressed + 25);
BinaryStreamUtils.setInt32(block, 17, compressedSizeWithHeader); // compressed size with header
BinaryStreamUtils.setInt32(block, 21, position); // uncompressed size
long[] hash = ClickHouseCityHash.cityHash128(block, 16, compressedSizeWithHeader);
BinaryStreamUtils.setInt64(block, 0, hash[0]);
BinaryStreamUtils.setInt64(block, 8, hash[1]);
output.write(block, 0, compressed + 25);
position = 0;
}

@Override
protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException {
int maxLen = compressor.maxCompressedLength(length) + 15;
byte[] block = maxLen < compressedBlock.length ? compressedBlock : new byte[maxLen];
byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen];
block[16] = Lz4InputStream.MAGIC;

int compressed = compressor.compress(bytes, offset, length, block, 25);
Expand All @@ -61,7 +63,6 @@ public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int maxCompress
compressor = factory.fastCompressor();
// reserve the first 9 bytes for calculating checksum
compressedBlock = new byte[compressor.maxCompressedLength(maxCompressBlockSize) + 15];
compressedBlock[16] = Lz4InputStream.MAGIC;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,28 @@ public void testOpenCloseClient() throws Exception {

@Test(dataProvider = "compressionMatrix", groups = { "integration" })
public void testCompression(ClickHouseFormat format, ClickHouseBufferingMode bufferingMode,
boolean compressRequest, boolean compressResponse) throws ClickHouseException {
boolean compressRequest, boolean compressResponse) throws Exception {
ClickHouseNode server = getServer();
String uuid = UUID.randomUUID().toString();
ClickHouseClient.send(server, "create table if not exists test_compress_decompress(id UUID)engine=Memory")
.get();
try (ClickHouseClient client = getClient()) {
ClickHouseRequest<?> request = client.connect(server)
.format(format)
.option(ClickHouseClientOption.RESPONSE_BUFFERING, bufferingMode)
.compressServerResponse(compressResponse)
.decompressClientRequest(compressRequest);
// start with insert
try (ClickHouseResponse resp = request
.query("insert into test_compress_decompress values(:uuid)").params(ClickHouseStringValue.of(uuid))
.executeAndWait()) {
Assert.assertNotNull(resp);
}

boolean hasResult = false;
try (ClickHouseResponse resp = request
.query("select :uuid").params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
.query("select id from test_compress_decompress where id = :uuid")
.params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), uuid);
hasResult = true;
}
Expand Down Expand Up @@ -966,7 +976,6 @@ public void testCustomWriter() throws Exception {

@Test(groups = { "integration" })
public void testDumpAndLoadFile() throws Exception {
// super.testLoadRawData();
ClickHouseNode server = getServer();
ClickHouseClient.send(server, "drop table if exists test_dump_load_file",
"create table test_dump_load_file(a UInt64, b Nullable(String)) engine=MergeTree() order by tuple()")
Expand Down

0 comments on commit a9b4dab

Please sign in to comment.