Skip to content

Commit

Permalink
Add support in RowBinaryWithDefaults (#1508)
Browse files Browse the repository at this point in the history
* Add support in RowBinaryWithDefaults.

* Skip unsuported test

* Skip unsuported test

* Change PREFERRED_LTS_VERSION version

* Do not fail the test when format is not supported by server

---------

Co-authored-by: mzitnik <[email protected]>
  • Loading branch information
mzitnik and mzitnik authored Dec 31, 2023
1 parent aa3870e commit fb65981
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ concurrency:
cancel-in-progress: true

env:
PREFERRED_LTS_VERSION: "23.3"
PREFERRED_LTS_VERSION: "23.7"

jobs:
compile:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.clickhouse.data.ClickHouseCompression;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

import org.testcontainers.containers.GenericContainer;
import org.testng.Assert;
Expand Down Expand Up @@ -183,4 +184,9 @@ public void testTransactionSnapshot() throws ClickHouseException {
public void testTransactionTimeout() throws ClickHouseException {
throw new SkipException("Skip due to session is not supported");
}
@Test(groups = { "integration" })
@Override
public void testRowBinaryWithDefaults() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
throw new SkipException("Skip due to supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -2455,4 +2456,62 @@ public void testImplicitTransaction() throws ClickHouseException {
checkRowCount(tableName, 2);
}
}
@Test(groups = "integration")
public void testRowBinaryWithDefaults() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
ClickHouseNode server = getServer();

String tableName = "test_row_binary_with_defaults";

String tableColumns = String.format("id Int64, updated_at DateTime DEFAULT now(), updated_at_date Date DEFAULT toDate(updated_at)");
sendAndWait(server, "drop table if exists " + tableName,
"create table " + tableName + " (" + tableColumns + ")engine=Memory");

long numRows = 1000;

try {
try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = client.read(server)
.write()
.table(tableName)
.format(ClickHouseFormat.RowBinaryWithDefaults);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;

try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
// write bytes into the piped stream
LongStream.range(0, numRows).forEachOrdered(
n -> {
try {
BinaryStreamUtils.writeNonNull(stream);
BinaryStreamUtils.writeInt64(stream, n);
BinaryStreamUtils.writeNull(stream); // When using the default
BinaryStreamUtils.writeNull(stream); // When using the default
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);

// We need to close the stream before getting a response
stream.close();
try (ClickHouseResponse response = future.get()) {
ClickHouseResponseSummary summary = response.getSummary();
Assert.assertEquals(summary.getWrittenRows(), numRows, "Num of written rows");
}
}

}
} catch (Exception e) {
Throwable th = e.getCause();
if (th instanceof ClickHouseException) {
ClickHouseException ce = (ClickHouseException) th;
Assert.assertEquals(73, ce.getErrorCode(), "It's Code: 73. DB::Exception: Unknown format RowBinaryWithDefaults. a server that not support the format");
} else {
Assert.assertTrue(false, e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public ClickHouseDataProcessor getProcessor(ClickHouseDataConfig config, ClickHo
throws IOException {
ClickHouseFormat format = ClickHouseChecker.nonNull(config, ClickHouseDataConfig.TYPE_NAME).getFormat();
ClickHouseDataProcessor processor = null;
if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format) {
if (ClickHouseFormat.RowBinary == format || ClickHouseFormat.RowBinaryWithNamesAndTypes == format || ClickHouseFormat.RowBinaryWithDefaults == format) {
processor = new ClickHouseRowBinaryProcessor(config, input, output, columns, settings);
} else if (format.isText()) {
processor = new ClickHouseTabSeparatedProcessor(config, input, output, columns, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public enum ClickHouseFormat {
RowBinary(true, true, true, false, true), // https://clickhouse.com/docs/en/interfaces/formats/#rowbinary
RowBinaryWithNames(true, true, true, true, true, RowBinary), // https://clickhouse.com/docs/en/interfaces/formats/#rowbinarywithnames
RowBinaryWithNamesAndTypes(true, true, true, true, true, RowBinary), // https://clickhouse.com/docs/en/interfaces/formats/#rowbinarywithnamesandtypes
RowBinaryWithDefaults(true, true, true, true, true, RowBinary), // https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithdefaults
TabSeparated(true, true, false, false, true), // https://clickhouse.com/docs/en/interfaces/formats/#tabseparated
TabSeparatedRaw(true, true, false, false, true), // https://clickhouse.com/docs/en/interfaces/formats/#tabseparatedraw
TabSeparatedRawWithNames(true, true, false, true, true, TabSeparatedRaw), // https://clickhouse.com/docs/en/interfaces/formats/#tabseparatedrawwithnames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
Expand Down Expand Up @@ -197,4 +198,11 @@ public void testTransactionSnapshot() throws ClickHouseException {
public void testTransactionTimeout() throws ClickHouseException {
throw new SkipException("Skip due to session is not supported");
}

@Test(groups = { "integration" })
@Override
public void testRowBinaryWithDefaults() throws ClickHouseException, IOException, ExecutionException, InterruptedException {
throw new SkipException("Skip due to supported");
}

}

0 comments on commit fb65981

Please sign in to comment.