diff --git a/README.md b/README.md index 578997c13..6c94082a1 100644 --- a/README.md +++ b/README.md @@ -21,32 +21,32 @@ Note: in general, the new driver(v0.3.2) is a few times faster with less memory ## Features -| Category | Feature | Supported | Remark | -| ------------- | ------------------------------------------------------------ | ------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------- | -| Protocol | [HTTP](https://clickhouse.com/docs/en/interfaces/http/) | :white_check_mark: | recommended, defaults to `java.net.HttpURLConnection` and can be changed to `java.net.http.HttpClient`(faster but less stable) | -| | [gRPC](https://clickhouse.com/docs/en/interfaces/grpc/) | :white_check_mark: | still experimental, known to has [issue](https://github.com/ClickHouse/ClickHouse/issues/28671#issuecomment-1087049993) when using LZ4 compression | -| | [TCP/Native](https://clickhouse.com/docs/en/interfaces/tcp/) | :x: | will be available in 0.3.3 | -| Compatibility | Server < 20.7 | :x: | use 0.3.1-patch(or 0.2.6 if you're stuck with JDK 7) | -| | Server >= 20.7 | :white_check_mark: | use 0.3.2 or above. All [active releases](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) are supported. | -| Data Type | AggregatedFunction | :x: | limited to `groupBitmap` | -| | Array(\*) | :white_check_mark: | | -| | Bool | :white_check_mark: | | -| | Date\* | :white_check_mark: | | -| | DateTime\* | :white_check_mark: | | -| | Decimal\* | :white_check_mark: | `SET output_format_decimal_trailing_zeros=1` in 21.9+ for consistency | -| | Enum\* | :white_check_mark: | can be treated as both string and integer | -| | Geo Types | :white_check_mark: | Point, Ring, Polygon, and MultiPolygon | -| | Int\*, UInt\* | :white_check_mark: | UInt64 is mapped to `long` | -| | IPv\* | :white_check_mark: | | -| | Map(\*) | :white_check_mark: | | -| | Nested(\*) | :white_check_mark: | | -| | Object('JSON') | :white_check_mark: | | -| | SimpleAggregateFunction | :white_check_mark: | | -| | \*String | :white_check_mark: | | -| | Tuple(\*) | :white_check_mark: | | -| | UUID | :white_check_mark: | | -| Format | RowBinary | :white_check_mark: | `RowBinaryWithNamesAndTypes` for query and `RowBinary` for insertion | -| | TabSeparated | :white_check_mark: | Does not support as many data types as RowBinary | +| Category | Feature | Supported | Remark | +| ------------- | ------------------------------------------------------------ | ------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Protocol | [HTTP](https://clickhouse.com/docs/en/interfaces/http/) | :white_check_mark: | recommended, defaults to `java.net.HttpURLConnection` and can be changed to `java.net.http.HttpClient`(faster but less stable) | +| | [gRPC](https://clickhouse.com/docs/en/interfaces/grpc/) | :white_check_mark: | still experimental, works with 22.3+, known to has [issue](https://github.com/ClickHouse/ClickHouse/issues/28671#issuecomment-1087049993) when using LZ4 compression | +| | [TCP/Native](https://clickhouse.com/docs/en/interfaces/tcp/) | :x: | will be available in 0.3.3 | +| Compatibility | Server < 20.7 | :x: | use 0.3.1-patch(or 0.2.6 if you're stuck with JDK 7) | +| | Server >= 20.7 | :white_check_mark: | use 0.3.2 or above. All [active releases](https://github.com/ClickHouse/ClickHouse/pulls?q=is%3Aopen+is%3Apr+label%3Arelease) are supported. | +| Data Type | AggregatedFunction | :x: | limited to `groupBitmap` | +| | Array(\*) | :white_check_mark: | | +| | Bool | :white_check_mark: | | +| | Date\* | :white_check_mark: | | +| | DateTime\* | :white_check_mark: | | +| | Decimal\* | :white_check_mark: | `SET output_format_decimal_trailing_zeros=1` in 21.9+ for consistency | +| | Enum\* | :white_check_mark: | can be treated as both string and integer | +| | Geo Types | :white_check_mark: | Point, Ring, Polygon, and MultiPolygon | +| | Int\*, UInt\* | :white_check_mark: | UInt64 is mapped to `long` | +| | IPv\* | :white_check_mark: | | +| | Map(\*) | :white_check_mark: | | +| | Nested(\*) | :white_check_mark: | | +| | Object('JSON') | :white_check_mark: | | +| | SimpleAggregateFunction | :white_check_mark: | | +| | \*String | :white_check_mark: | | +| | Tuple(\*) | :white_check_mark: | | +| | UUID | :white_check_mark: | | +| Format | RowBinary | :white_check_mark: | `RowBinaryWithNamesAndTypes` for query and `RowBinary` for insertion | +| | TabSeparated | :white_check_mark: | Does not support as many data types as RowBinary | ## Configuration @@ -228,7 +228,7 @@ In the case you prefer to test against an existing server, please follow instruc - add below two configuration files to the existing server and expose all ports for external access - [ports.xml](../../blob/master/clickhouse-client/src/test/resources/containers/clickhouse-server/config.d/ports.xml) - enable all ports - and [users.xml](../../blob/master/clickhouse-client/src/test/resources/containers/clickhouse-server/users.d/users.xml) - accounts used for integration test - Note: you may need to change root element from `clickhouse` to `yandex` when testing old version of ClickHouse. + Note: you may need to change root element from `clickhouse` to `yandex` when testing old version of ClickHouse. - put `test.properties` under either `~/.clickhouse` or `src/test/resources` of your project, with content like below: ```properties clickhouseServer=x.x.x.x diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index b792b71c1..058df11e8 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -43,8 +43,6 @@ protected Mutation(ClickHouseRequest request, boolean sealed) { this.options.putAll(request.options); this.settings.putAll(request.settings); - - this.sessionId = request.sessionId; } @Override @@ -90,13 +88,13 @@ public Mutation data(String file, ClickHouseCompression compression) { final ClickHouseRequest self = this; final String fileName = ClickHouseChecker.nonEmpty(file, "File"); - this.input = ClickHouseDeferredValue.of(() -> { + this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue.of(() -> { try { return ClickHouseInputStream.of(new FileInputStream(fileName), 123, compression); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } - }); + })); return this; } @@ -119,7 +117,8 @@ public Mutation data(InputStream input) { public Mutation data(ClickHouseInputStream input) { checkSealed(); - this.input = ClickHouseDeferredValue.of(input, ClickHouseInputStream.class); + this.input = changeProperty(PROP_DATA, this.input, + ClickHouseDeferredValue.of(input, ClickHouseInputStream.class)); return this; } @@ -133,7 +132,7 @@ public Mutation data(ClickHouseInputStream input) { public Mutation data(ClickHouseDeferredValue input) { checkSealed(); - this.input = input; + this.input = changeProperty(PROP_DATA, this.input, input); return this; } @@ -162,16 +161,7 @@ public ClickHouseResponse sendAndWait() throws ClickHouseException { @Override public Mutation table(String table, String queryId) { checkSealed(); - - this.queryId = queryId; - - String sql = "INSERT INTO " + ClickHouseChecker.nonBlank(table, "table"); - if (!sql.equals(this.sql)) { - this.sql = sql; - this.preparedQuery = null; - resetCache(); - } - + super.query("INSERT INTO " + ClickHouseChecker.nonBlank(table, "table"), queryId); return this; } @@ -190,7 +180,6 @@ public Mutation seal() { req.input = input; req.queryId = queryId; - req.sessionId = sessionId; req.sql = sql; req.preparedQuery = preparedQuery; @@ -202,6 +191,11 @@ public Mutation seal() { private static final long serialVersionUID = 4990313525960702287L; + static final String PROP_DATA = "data"; + static final String PROP_PREPARED_QUERY = "preparedQuery"; + static final String PROP_QUERY = "query"; + static final String PROP_QUERY_ID = "queryId"; + private final boolean sealed; private transient ClickHouseClient client; @@ -216,7 +210,6 @@ public Mutation seal() { protected transient ClickHouseDeferredValue input; protected String queryId; - protected String sessionId; protected String sql; protected ClickHouseParameterizedQuery preparedQuery; @@ -245,6 +238,13 @@ protected ClickHouseRequest(ClickHouseClient client, Function(); } + protected T changeProperty(String property, T oldValue, T newValue) { + if (changeListener != null && !Objects.equals(oldValue, newValue)) { + changeListener.propertyChanged(this, property, oldValue, newValue); + } + return newValue; + } + protected void checkSealed() { if (sealed) { throw new IllegalStateException("Sealed request is immutable"); @@ -291,7 +291,6 @@ public ClickHouseRequest copy() { req.namedParameters.putAll(namedParameters); req.input = input; req.queryId = queryId; - req.sessionId = sessionId; req.sql = sql; req.preparedQuery = preparedQuery; return req; @@ -412,7 +411,8 @@ public Optional getQueryId() { */ public ClickHouseParameterizedQuery getPreparedQuery() { if (preparedQuery == null) { - preparedQuery = ClickHouseParameterizedQuery.of(getConfig(), getQuery()); + preparedQuery = changeProperty(PROP_PREPARED_QUERY, preparedQuery, + ClickHouseParameterizedQuery.of(getConfig(), getQuery())); } return preparedQuery; @@ -433,6 +433,7 @@ public Map getSettings() { * @return session id */ public Optional getSessionId() { + String sessionId = (String) getConfig().getOption(ClickHouseClientOption.SESSION_ID); return ClickHouseChecker.isNullOrEmpty(sessionId) ? Optional.empty() : Optional.of(sessionId); } @@ -681,12 +682,7 @@ public SelfT external(Collection tables) { @SuppressWarnings("unchecked") public SelfT format(ClickHouseFormat format) { checkSealed(); - - if (format == null) { - removeOption(ClickHouseClientOption.FORMAT); - } else { - option(ClickHouseClientOption.FORMAT, format); - } + option(ClickHouseClientOption.FORMAT, format); return (SelfT) this; } @@ -739,11 +735,7 @@ public SelfT options(Map options) { m.putAll(this.options); if (options != null) { for (Entry e : options.entrySet()) { - if (e.getValue() == null) { - removeOption(e.getKey()); - } else { - option(e.getKey(), e.getValue()); - } + option(e.getKey(), e.getValue()); m.remove(e.getKey()); } } @@ -1078,13 +1070,13 @@ public SelfT query(String sql) { public SelfT query(ClickHouseParameterizedQuery query, String queryId) { checkSealed(); - if (!ClickHouseChecker.nonNull(query, "query").equals(this.preparedQuery)) { - this.preparedQuery = query; - this.sql = query.getOriginalQuery(); + if (!ClickHouseChecker.nonNull(query, PROP_QUERY).equals(this.preparedQuery)) { + this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, query); + this.sql = changeProperty(PROP_QUERY, this.sql, query.getOriginalQuery()); resetCache(); } - this.queryId = queryId; + this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, queryId); return (SelfT) this; } @@ -1092,21 +1084,21 @@ public SelfT query(ClickHouseParameterizedQuery query, String queryId) { /** * Sets query and optinally query id. * - * @param sql non-empty query + * @param query non-empty query * @param queryId query id, null means no query id * @return the request itself */ @SuppressWarnings("unchecked") - public SelfT query(String sql, String queryId) { + public SelfT query(String query, String queryId) { checkSealed(); - if (!ClickHouseChecker.nonBlank(sql, "sql").equals(this.sql)) { - this.sql = sql; - this.preparedQuery = null; + if (!ClickHouseChecker.nonBlank(query, PROP_QUERY).equals(this.sql)) { + this.sql = changeProperty(PROP_QUERY, this.sql, query); + this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null); resetCache(); } - this.queryId = queryId; + this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, queryId); return (SelfT) this; } @@ -1121,12 +1113,9 @@ public SelfT query(String sql, String queryId) { public SelfT clearSession() { checkSealed(); + removeOption(ClickHouseClientOption.SESSION_ID); removeOption(ClickHouseClientOption.SESSION_CHECK); removeOption(ClickHouseClientOption.SESSION_TIMEOUT); - if (this.sessionId != null) { - this.sessionId = null; - resetCache(); - } return (SelfT) this; } @@ -1178,12 +1167,9 @@ public SelfT session(String sessionId, Integer timeout) { public SelfT session(String sessionId, Boolean check, Integer timeout) { checkSealed(); + option(ClickHouseClientOption.SESSION_ID, sessionId); option(ClickHouseClientOption.SESSION_CHECK, check); option(ClickHouseClientOption.SESSION_TIMEOUT, timeout); - if (!Objects.equals(this.sessionId, sessionId)) { - this.sessionId = sessionId; - resetCache(); - } return (SelfT) this; } @@ -1263,12 +1249,7 @@ public SelfT table(String table, String queryId) { @SuppressWarnings("unchecked") public SelfT use(String database) { checkSealed(); - - if (database == null) { - removeOption(ClickHouseClientOption.DATABASE); - } else { - option(ClickHouseClientOption.DATABASE, database); - } + option(ClickHouseClientOption.DATABASE, database); return (SelfT) this; } @@ -1384,11 +1365,10 @@ public SelfT reset() { } this.namedParameters.clear(); - this.input = null; - this.sql = null; - this.preparedQuery = null; - this.queryId = null; - this.sessionId = null; + this.input = changeProperty(PROP_DATA, this.input, null); + this.sql = changeProperty(PROP_QUERY, this.sql, null); + this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null); + this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, null); resetCache(); @@ -1414,7 +1394,6 @@ public ClickHouseRequest seal() { req.input = input; req.queryId = queryId; - req.sessionId = sessionId; req.sql = sql; req.preparedQuery = preparedQuery; } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index 616d21640..a4a7f600b 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -141,6 +141,10 @@ public enum ClickHouseClientOption implements ClickHouseOption { * Server version. */ SERVER_VERSION("server_version", "", "Server version."), + /** + * Session id. + */ + SESSION_ID("session_id", "", "Session id"), /** * Whether to check if session id is validate. */ diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseConfigChangeListener.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseConfigChangeListener.java index 7b5f79633..faac212bb 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseConfigChangeListener.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseConfigChangeListener.java @@ -19,26 +19,25 @@ default void optionChanged(T source, ClickHouseOption option, Serializable oldVa } /** - * Triggered when ClickHouse setting(declared on client-side) was changed. - * Removing a setting is same as reseting its value to {@code null}. + * Triggered when property of {@code source} was changed. * * @param source source of the event - * @param setting the changed setting, which should never be null + * @param property name of the changed property, which should never be null * @param oldValue old option value, which could be null * @param newValue new option value, which could be null */ - default void settingChanged(T source, String setting, Serializable oldValue, Serializable newValue) { + default void propertyChanged(T source, String property, Object oldValue, Object newValue) { } /** - * Triggered when property of {@code source} was changed. + * Triggered when ClickHouse setting(declared on client-side) was changed. + * Removing a setting is same as reseting its value to {@code null}. * * @param source source of the event - * @param property name of the changed property, which should never be null + * @param setting the changed setting, which should never be null * @param oldValue old option value, which could be null * @param newValue new option value, which could be null */ - // default void propertyChanged(T source, String property, Object oldValue, - // Object newValue) { - // } + default void settingChanged(T source, String setting, Serializable oldValue, Serializable newValue) { + } } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java index 003b666bc..11d936d4c 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseRequestTest.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.UUID; @@ -73,8 +74,9 @@ public void testBuild() { @Test(groups = { "unit" }) public void testChangeListener() { + final ClickHouseConfig config = new ClickHouseConfig(); final List changedOptions = new ArrayList<>(); - // final List changedProperties = new ArrayList<>(); + final List changedProperties = new ArrayList<>(); final List changedSettings = new ArrayList<>(); ClickHouseConfigChangeListener> listener = new ClickHouseConfigChangeListener>() { @Override @@ -83,21 +85,30 @@ public void optionChanged(ClickHouseRequest source, ClickHouseOption option, changedOptions.add(new Object[] { source, option, oldValue, newValue }); } + @Override + public void propertyChanged(ClickHouseRequest source, String property, Object oldValue, + Object newValue) { + changedProperties.add(new Object[] { source, property, oldValue, newValue }); + } + @Override public void settingChanged(ClickHouseRequest source, String setting, Serializable oldValue, Serializable newValue) { changedSettings.add(new Object[] { source, setting, oldValue, newValue }); } }; + final ClickHouseParameterizedQuery select3 = ClickHouseParameterizedQuery.of(config, "select 3"); ClickHouseRequest request = ClickHouseClient.newInstance().connect(ClickHouseNode.builder().build()); request.setChangeListener(listener); Assert.assertTrue(changedOptions.isEmpty(), "Should have no option changed"); + Assert.assertTrue(changedProperties.isEmpty(), "Should have no property changed"); Assert.assertTrue(changedSettings.isEmpty(), "Should have no setting changed"); request.option(ClickHouseClientOption.ASYNC, false); request.format(ClickHouseFormat.Arrow); request.option(ClickHouseClientOption.FORMAT, ClickHouseFormat.Avro); request.removeOption(ClickHouseClientOption.BUFFER_SIZE); request.removeOption(ClickHouseClientOption.ASYNC); + request.query("select 1").query("select 2", "id=2").query(select3); request.reset(); request.format(ClickHouseFormat.TSV); Assert.assertEquals(changedOptions.toArray(new Object[0]), @@ -108,6 +119,14 @@ public void settingChanged(ClickHouseRequest source, String setting, Serializ ClickHouseFormat.Avro }, new Object[] { request, ClickHouseClientOption.ASYNC, false, null }, new Object[] { request, ClickHouseClientOption.FORMAT, ClickHouseFormat.Avro, null } }); + Assert.assertEquals(changedProperties.toArray(new Object[0]), new Object[][] { + { request, ClickHouseRequest.PROP_QUERY, null, "select 1" }, + { request, ClickHouseRequest.PROP_QUERY, "select 1", "select 2" }, + { request, ClickHouseRequest.PROP_QUERY_ID, null, "id=2" }, + { request, ClickHouseRequest.PROP_PREPARED_QUERY, null, select3 }, + { request, ClickHouseRequest.PROP_QUERY, "select 2", "select 3" }, + { request, ClickHouseRequest.PROP_QUERY_ID, "id=2", null }, + }); changedOptions.clear(); request.setChangeListener(listener); @@ -152,7 +171,7 @@ public void testCopy() { Assert.assertEquals(copy.namedParameters, request.namedParameters); Assert.assertEquals(copy.options, request.options); Assert.assertEquals(copy.queryId, request.queryId); - Assert.assertEquals(copy.sessionId, request.sessionId); + Assert.assertEquals(copy.getSessionId(), request.getSessionId()); Assert.assertEquals(copy.sql, request.sql); Assert.assertEquals(copy.getPreparedQuery(), request.getPreparedQuery()); @@ -162,7 +181,7 @@ public void testCopy() { Assert.assertTrue(copy.namedParameters.isEmpty(), "Named parameters should be empty"); Assert.assertEquals(copy.options, request.options); Assert.assertNull(copy.queryId, "Query ID should be null"); - Assert.assertEquals(copy.sessionId, request.sessionId); + Assert.assertEquals(copy.getSessionId(), request.getSessionId()); Assert.assertNull(copy.sql, "SQL should be null"); ClickHouseRequest newCopy = copy; @@ -178,7 +197,7 @@ public void testCopy() { Assert.assertEquals(copy.namedParameters, request.namedParameters); Assert.assertEquals(copy.options, request.options); Assert.assertEquals(copy.queryId, request.queryId); - Assert.assertEquals(copy.sessionId, request.sessionId); + Assert.assertEquals(copy.getSessionId(), request.getSessionId()); Assert.assertEquals(copy.sql, request.sql); Assert.assertEquals(copy.getPreparedQuery(), request.getPreparedQuery()); } @@ -283,7 +302,7 @@ public void testSeal() { Assert.assertEquals(sealed.namedParameters, request.namedParameters); Assert.assertEquals(sealed.options, request.options); Assert.assertEquals(sealed.queryId, request.queryId); - Assert.assertEquals(sealed.sessionId, request.sessionId); + Assert.assertEquals(sealed.getSessionId(), request.getSessionId()); Assert.assertEquals(sealed.sql, request.sql); Assert.assertEquals(sealed.getPreparedQuery(), request.getPreparedQuery()); @@ -295,7 +314,7 @@ public void testSession() { String sessionId = UUID.randomUUID().toString(); ClickHouseRequest request = ClickHouseClient.newInstance().connect(ClickHouseNode.builder().build()); Assert.assertEquals(request.getSessionId().isPresent(), false); - Assert.assertEquals(request.sessionId, null); + Assert.assertEquals(request.getSessionId(), Optional.empty()); Assert.assertEquals(request.getConfig().isSessionCheck(), false); Assert.assertEquals(request.getConfig().getSessionTimeout(), 0); @@ -319,7 +338,7 @@ public void testSession() { Assert.assertEquals(sealedRequest.getConfig().isSessionCheck(), true); Assert.assertEquals(sealedRequest.getConfig().getSessionTimeout(), 10); Assert.assertEquals(request.getSessionId().isPresent(), false); - Assert.assertEquals(request.sessionId, null); + Assert.assertEquals(request.getSessionId(), Optional.empty()); Assert.assertEquals(request.getConfig().isSessionCheck(), false); Assert.assertEquals(request.getConfig().getSessionTimeout(), 0); } diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index 35f959388..36218750b 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -262,12 +262,14 @@ protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHous @Override public void optionChanged(ClickHouseRequest source, ClickHouseOption option, Serializable oldValue, Serializable newValue) { - if (source != request || option != ClickHouseClientOption.FORMAT) { + if (source != request) { return; } - this.deserializer = ClickHouseDataStreamFactory.getInstance().getDeserializer(request.getFormat()); - this.serializer = ClickHouseDataStreamFactory.getInstance().getSerializer(request.getInputFormat()); + if (option == ClickHouseClientOption.FORMAT) { + this.deserializer = ClickHouseDataStreamFactory.getInstance().getDeserializer(request.getFormat()); + this.serializer = ClickHouseDataStreamFactory.getInstance().getSerializer(request.getInputFormat()); + } } @Override