From e12c99f4e7d5456c6dda531d863f2301d6d8973f Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Sat, 10 Aug 2024 00:34:05 -0700 Subject: [PATCH 1/4] exceptions --- .../main/java/com/clickhouse/client/api/Client.java | 11 ++++++++--- .../client/api/internal/HttpAPIClientHelper.java | 7 ++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index bec981067..035873147 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -46,6 +46,7 @@ import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.NoHttpResponseException; import org.slf4j.Logger; @@ -892,7 +893,7 @@ public CompletableFuture insert(String tableName, List data, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException e) { + } catch (NoHttpResponseException | ConnectionRequestTimeoutException e) { LOG.warn("Failed to get response. Retrying.", e); selectedNode = getNextAliveNode(); continue; @@ -1010,12 +1011,12 @@ public CompletableFuture insert(String tableName, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException e) { + } catch (NoHttpResponseException | ConnectionRequestTimeoutException e) { if (i < maxRetries) { try { data.reset(); } catch (IOException ioe) { - throw new ClientException("Failed to get response", e); + throw new ClientException("Failed to reset stream for retry", e); } LOG.warn("Failed to get response. Retrying.", e); selectedNode = getNextAliveNode(); @@ -1173,6 +1174,10 @@ public CompletableFuture query(String sqlQuery, Map Date: Wed, 21 Aug 2024 13:58:31 -0700 Subject: [PATCH 2/4] added configuration parameters and aligned logic --- .../com/clickhouse/client/api/Client.java | 159 +++++++++++------- .../client/api/ClientFaultCause.java | 10 ++ .../api/internal/HttpAPIClientHelper.java | 49 ++++++ .../client/api/internal/SerializerUtils.java | 14 ++ .../com/clickhouse/client/ClientTests.java | 21 --- ...mentTests.java => HttpTransportTests.java} | 82 ++++++++- .../clickhouse/client/insert/InsertTests.java | 47 ------ 7 files changed, 254 insertions(+), 128 deletions(-) create mode 100644 client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java rename client-v2/src/test/java/com/clickhouse/client/{ConnectionManagementTests.java => HttpTransportTests.java} (65%) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 9bc8afe30..cd1fd0e9d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.StringJoiner; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -81,6 +82,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import static java.time.temporal.ChronoUnit.MILLIS; import static java.time.temporal.ChronoUnit.SECONDS; /** @@ -714,8 +716,31 @@ public Builder setSharedOperationExecutor(ExecutorService executorService) { return this; } + + /** + * Sets list of causes that should be retried on. + * Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]} + * Use {@link ClientFaultCause#None} to disable retries. + * + * @param causes - list of causes + * @return + */ + public Builder retryOnFailures(ClientFaultCause ...causes) { + StringJoiner joiner = new StringJoiner(VALUES_LIST_DELIMITER); + for (ClientFaultCause cause : causes) { + joiner.add(cause.name()); + } + this.configuration.put("client_retry_on_failures", joiner.toString()); + return this; + } + + public Builder setMaxRetries(int maxRetries) { + this.configuration.put(ClickHouseClientOption.RETRY.getKey(), String.valueOf(maxRetries)); + return this; + } + public Client build() { - this.configuration = setDefaults(this.configuration); + setDefaults(); // check if endpoint are empty. so can not initiate client if (this.endpoints.isEmpty()) { @@ -762,61 +787,65 @@ public Client build() { return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor); } - private Map setDefaults(Map userConfig) { + private void setDefaults() { // set default database name if not specified - if (!userConfig.containsKey("database")) { - userConfig.put("database", (String) ClickHouseDefaults.DATABASE.getDefaultValue()); + if (!configuration.containsKey("database")) { + setDefaultDatabase((String) ClickHouseDefaults.DATABASE.getDefaultValue()); } - if (!userConfig.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) { - userConfig.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), - String.valueOf(ClickHouseClientOption.MAX_EXECUTION_TIME.getDefaultValue())); + if (!configuration.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) { + setExecutionTimeout(0, MILLIS); } - if (!userConfig.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) { - userConfig.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), + if (!configuration.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) { + configuration.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue())); } - if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) { - userConfig.put("compression.lz4.uncompressed_buffer_size", - String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE)); + if (!configuration.containsKey("compression.lz4.uncompressed_buffer_size")) { + setLZ4UncompressedBufferSize(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE); + } + + if (!configuration.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) { + useServerTimeZone(true); } - if (!userConfig.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) { - userConfig.put(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "true"); + if (!configuration.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) { + setServerTimeZone("UTC"); } - if (!userConfig.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) { - userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC"); + if (!configuration.containsKey(ClickHouseClientOption.ASYNC.getKey())) { + useAsyncRequests(false); } - if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) { - userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false"); + if (!configuration.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) { + setMaxConnections(10); } - if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) { - userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10"); + if (!configuration.containsKey("connection_request_timeout")) { + setConnectionRequestTimeout(10, SECONDS); } - if (!userConfig.containsKey("connection_request_timeout")) { - userConfig.put("connection_request_timeout", "10000"); + if (!configuration.containsKey("connection_reuse_strategy")) { + setConnectionReuseStrategy(ConnectionReuseStrategy.FIFO); } - if (!userConfig.containsKey("connection_reuse_strategy")) { - userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name()); + if (!configuration.containsKey("connection_pool_enabled")) { + enableConnectionPool(true); } - if (!userConfig.containsKey("connection_pool_enabled")) { - userConfig.put("connection_pool_enabled", "true"); + if (!configuration.containsKey("connection_ttl")) { + setConnectionTTL(-1, MILLIS); } - if (!userConfig.containsKey("connection_ttl")) { - userConfig.put("connection_ttl", "-1"); + if (!configuration.containsKey("client_retry_on_failures")) { + retryOnFailures(ClientFaultCause.NoHttpResponse, ClientFaultCause.ConnectTimeout, ClientFaultCause.ConnectionRequestTimeout); } - return userConfig; + if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) { + setMaxRetries(3); + } } } @@ -990,6 +1019,7 @@ public CompletableFuture insert(String tableName, List data, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1028,16 +1058,19 @@ public CompletableFuture insert(String tableName, List data, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException | ConnectionRequestTimeoutException e) { - LOG.warn("Failed to get response. Retrying.", e); - selectedNode = getNextAliveNode(); - continue; + } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) { + lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { + LOG.warn("Retrying", e); + selectedNode = getNextAliveNode(); + } else { + throw lastException; + } } catch (IOException e) { - LOG.info("Interrupted while waiting for response."); - throw new ClientException("Failed to get query response", e); + throw new ClientException("Insert request failed", e); } } - throw new ClientException("Failed to get table schema: too many retries"); + throw new ClientException("Insert request failed after retries", lastException); }; return runAsyncOperation(supplier, settings.getAllSettings()); @@ -1057,7 +1090,6 @@ public CompletableFuture insert(String tableName, List data, } globalClientStats.get(operationId).stop(ClientMetrics.OP_SERIALIZATION); - LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization")); return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), format, settings); } } @@ -1114,6 +1146,7 @@ public CompletableFuture insert(String tableName, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1148,25 +1181,27 @@ public CompletableFuture insert(String tableName, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException | ConnectionRequestTimeoutException e) { - if (i < maxRetries) { - try { - data.reset(); - } catch (IOException ioe) { - throw new ClientException("Failed to reset stream for retry", e); - } - LOG.warn("Failed to get response. Retrying.", e); + } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) { + lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { + LOG.warn("Retrying", e); selectedNode = getNextAliveNode(); } else { - throw new ClientException("Server did not respond", e); + throw lastException; } - continue; } catch (IOException e) { - LOG.info("Interrupted while waiting for response."); - throw new ClientException("Failed to get query response", e); + throw new ClientException("Insert request failed", e); + } + + if (i < maxRetries) { + try { + data.reset(); + } catch (IOException ioe) { + throw new ClientException("Failed to reset stream before next attempt", ioe); + } } } - throw new ClientException("Failed to insert data: too many retries"); + throw new ClientException("Insert request failed after retries", lastException); }; } else { responseSupplier = () -> { @@ -1193,7 +1228,6 @@ public CompletableFuture insert(String tableName, clickHouseResponse = future.get(); } InsertResponse response = new InsertResponse(clickHouseResponse, clientStats); - LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert")); return response; } catch (ExecutionException e) { throw new ClientException("Failed to get insert response", e.getCause()); @@ -1282,6 +1316,7 @@ public CompletableFuture query(String sqlQuery, Map { // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { try { ClassicHttpResponse httpResponse = @@ -1307,19 +1342,23 @@ public CompletableFuture query(String sqlQuery, Map request = oldClient.read(getServerNode()); @@ -1595,4 +1634,6 @@ public Set getEndpoints() { private ClickHouseNode getNextAliveNode() { return serverNodes.get(0); } + + public static final String VALUES_LIST_DELIMITER = ","; } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java new file mode 100644 index 000000000..8d66f8136 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java @@ -0,0 +1,10 @@ +package com.clickhouse.client.api; + +public enum ClientFaultCause { + + None, + + NoHttpResponse, + ConnectTimeout, + ConnectionRequestTimeout, +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 4da7edf28..b7705102f 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -4,7 +4,9 @@ import com.clickhouse.client.ClickHouseSslContextProvider; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ClientMisconfigurationException; +import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; import com.clickhouse.client.api.ServerException; import com.clickhouse.client.api.enums.ProxyType; @@ -12,6 +14,7 @@ import com.clickhouse.client.config.ClickHouseDefaults; import com.clickhouse.client.http.ClickHouseHttpProto; import com.clickhouse.client.http.config.ClickHouseHttpOption; +import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; @@ -57,7 +60,11 @@ import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.util.Base64; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -74,6 +81,8 @@ public class HttpAPIClientHelper { private String proxyAuthHeaderValue; + private final Set defaultRetryCauses; + public HttpAPIClientHelper(Map configuration) { this.chConfiguration = configuration; this.httpClient = createHttpClient(); @@ -89,6 +98,11 @@ public HttpAPIClientHelper(Map configuration) { boolean usingServerCompression= chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true"); boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true"); LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression); + + defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class); + if (defaultRetryCauses.contains(ClientFaultCause.None)) { + defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None); + } } /** @@ -411,4 +425,39 @@ public static T getHeaderVal(Header header, T defaultValue, Function requestSettings) { + Set retryCauses = (Set) + requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses); + + if (retryCauses.contains(ClientFaultCause.None)) { + return false; + } + + if (ex instanceof NoHttpResponseException ) { + return retryCauses.contains(ClientFaultCause.NoHttpResponse); + } + + if (ex instanceof ConnectException) { + return retryCauses.contains(ClientFaultCause.ConnectTimeout); + } + + if (ex instanceof ConnectionRequestTimeoutException) { + return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout); + } + + return false; + } + + // This method wraps some client specific exceptions into specific ClientException or just ClientException + // ClientException will be also wrapped + public ClientException wrapException(String message, Exception cause) { + if (cause instanceof ConnectionRequestTimeoutException || + cause instanceof ConnectTimeoutException || + cause instanceof ConnectException) { + return new ConnectionInitiationException(message, cause); + } + + return new ClientException(message, cause); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java index dcb5a801a..c456b1842 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java @@ -1,5 +1,7 @@ package com.clickhouse.client.api.internal; +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.format.BinaryStreamUtils; import org.slf4j.Logger; @@ -14,9 +16,14 @@ import java.net.Inet6Address; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.StringTokenizer; import java.util.UUID; import static com.clickhouse.data.ClickHouseDataType.*; @@ -212,4 +219,11 @@ public static BigInteger convertToBigInteger(Object value) { } + public static > Set parseEnumList(String value, Class enumType) { + Set values = new HashSet<>(); + for (StringTokenizer causes = new StringTokenizer(value, Client.VALUES_LIST_DELIMITER); causes.hasMoreTokens();) { + values.add(Enum.valueOf(enumType, causes.nextToken())); + } + return values; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index b0f303c2c..06a532e65 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -66,27 +66,6 @@ private static Client[] secureClientProvider() throws Exception { }; } - @Test(groups = { "integration" }) - public void testSecureConnection() { - ClickHouseNode secureServer = getSecureServer(ClickHouseProtocol.HTTP); - - try (Client client = new Client.Builder() - .addEndpoint("https://localhost:" + secureServer.getPort()) - .setUsername("default") - .setPassword("") - .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") - .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .build()) { - - List records = client.queryAll("SELECT timezone()"); - Assert.assertTrue(records.size() > 0); - Assert.assertEquals(records.get(0).getString(1), "UTC"); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - @Test public void testRawSettings() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); diff --git a/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java similarity index 65% rename from client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java rename to client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 21918980d..92d1a4b61 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1,16 +1,22 @@ package com.clickhouse.client; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; +import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.enums.ProxyType; +import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.data.ClickHouseFormat; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.ConsoleNotifier; import com.github.tomakehurst.wiremock.common.Slf4jNotifier; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; @@ -19,6 +25,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.ByteArrayInputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.time.temporal.ChronoUnit; @@ -26,9 +33,12 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -public class ConnectionManagementTests extends BaseIntegrationTest{ +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; + +public class HttpTransportTests extends BaseIntegrationTest{ @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") @@ -143,6 +153,7 @@ public void testConnectionRequestTimeout() { .addEndpoint("http://localhost:" + serverPort) .setUsername("default") .setPassword(getPassword()) + .retryOnFailures(ClientFaultCause.None) .useNewImplementation(true) .setMaxConnections(1) .setOption(ClickHouseClientOption.ASYNC.getKey(), "true") @@ -186,4 +197,73 @@ public void testConnectionReuseStrategy() { Assert.fail(e.getMessage()); } } + + @Test(groups = { "integration" }) + public void testSecureConnection() { + ClickHouseNode secureServer = getSecureServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint("https://localhost:" + secureServer.getPort()) + .setUsername("default") + .setPassword("") + .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") + .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) + .build()) { + + List records = client.queryAll("SELECT timezone()"); + Assert.assertTrue(records.size() > 0); + Assert.assertEquals(records.get(0).getString(1), "UTC"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test(groups = { "integration" }, enabled = true) + public void testNoHttpResponseFailure() { + WireMockServer faultyServer = new WireMockServer( WireMockConfiguration + .options().port(9090).notifier(new ConsoleNotifier(false))); + faultyServer.start(); + + byte[] requestBody = ("INSERT INTO table01 FORMAT " + + ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); + + // First request gets no response + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .inScenario("Retry") + .whenScenarioStateIs(STARTED) + .willSetStateTo("Failed") + .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); + + // Second request gets a response (retry) + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .inScenario("Retry") + .whenScenarioStateIs("Failed") + .willSetStateTo("Done") + .willReturn(WireMock.aResponse() + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + Client mockServerClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) + .setUsername("default") + .setPassword("") + .useNewImplementation(true) +// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) + .compressClientRequest(false) + .setOption(ClickHouseClientOption.RETRY.getKey(), "2") + .build(); + + try { + InsertResponse insertResponse = mockServerClient.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); + insertResponse.close(); + } catch (Exception e) { + Assert.fail("Unexpected exception", e); + } finally { + faultyServer.stop(); + } + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index f1bed9c48..2bf869672 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -155,51 +155,4 @@ public void insertRawData() throws Exception { List records = client.queryAll("SELECT * FROM " + tableName); assertEquals(records.size(), 1000); } - - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { - WireMockServer faultyServer = new WireMockServer( WireMockConfiguration - .options().port(9090).notifier(new ConsoleNotifier(false))); - faultyServer.start(); - - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - - // First request gets no response - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs(STARTED) - .willSetStateTo("Failed") - .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); - - // Second request gets a response (retry) - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs("Failed") - .willSetStateTo("Done") - .willReturn(WireMock.aResponse() - .withHeader("X-ClickHouse-Summary", - "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); - - Client mockServerClient = new Client.Builder() - .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) - .setUsername("default") - .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") - .build(); - try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS); - insertResponse.close(); - } catch (Exception e) { - Assert.fail("Unexpected exception", e); - } finally { - faultyServer.stop(); - } - } } From 4e23300f11f63d75821c9315bb0a1d336763d343 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Tue, 27 Aug 2024 23:53:58 -0700 Subject: [PATCH 3/4] Added tests for query --- .../clickhouse/client/HttpTransportTests.java | 64 +++++++++++++++---- .../clickhouse/client/insert/InsertTests.java | 47 -------------- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 92d1a4b61..2dbf97a6c 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1,6 +1,7 @@ package com.clickhouse.client; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientException; import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; @@ -21,6 +22,7 @@ import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.net.URIBuilder; +import org.testcontainers.utility.ThrowingFunction; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -33,13 +35,18 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; public class HttpTransportTests extends BaseIntegrationTest{ + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") @SuppressWarnings("java:S2925") @@ -219,18 +226,16 @@ public void testSecureConnection() { } } - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { + @Test(groups = { "integration" }, dataProvider = "NoResponseFailureProvider") + public void testInsertAndNoHttpResponseFailure(String body, int maxRetries, ThrowingFunction function, + boolean shouldFail) { WireMockServer faultyServer = new WireMockServer( WireMockConfiguration .options().port(9090).notifier(new ConsoleNotifier(false))); faultyServer.start(); - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - // First request gets no response faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .withRequestBody(WireMock.equalTo(body)) .inScenario("Retry") .whenScenarioStateIs(STARTED) .willSetStateTo("Failed") @@ -238,7 +243,7 @@ public void testNoHttpResponseFailure() { // Second request gets a response (retry) faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) + .withRequestBody(WireMock.equalTo(body)) .inScenario("Retry") .whenScenarioStateIs("Failed") .willSetStateTo("Done") @@ -250,20 +255,53 @@ public void testNoHttpResponseFailure() { .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) .setUsername("default") .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) + .useNewImplementation(true) // because of the internal differences .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") + .setMaxRetries(maxRetries) .build(); try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); - insertResponse.close(); + function.apply(mockServerClient); + } catch (ClientException e) { + e.printStackTrace(); + if (!shouldFail) { + Assert.fail("Unexpected exception", e); + } + return; } catch (Exception e) { Assert.fail("Unexpected exception", e); } finally { faultyServer.stop(); } + + if (shouldFail) { + Assert.fail("Expected exception"); + } + } + + @DataProvider(name = "NoResponseFailureProvider") + public static Object[][] noResponseFailureProvider() { + + String insertBody = "INSERT INTO table01 FORMAT " + ClickHouseFormat.TSV.name() + " \n1\t2\t3\n"; + ThrowingFunction insertFunction = (client) -> { + InsertResponse insertResponse = client.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); + insertResponse.close(); + return null; + }; + + String selectBody = "select timezone()"; + ThrowingFunction queryFunction = (client) -> { + QueryResponse response = client.query("select timezone()").get(30, TimeUnit.SECONDS); + response.close(); + return null; + }; + + return new Object[][]{ + {insertBody, 1, insertFunction, false}, + {selectBody, 1, queryFunction, false}, + {insertBody, 0, insertFunction, true}, + {selectBody, 0, queryFunction, true} + }; } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index 8f3cabf92..eac611bf6 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -169,51 +169,4 @@ public void insertRawDataSimple(int numberOfRecords) throws Exception { OperationMetrics metrics = response.getMetrics(); assertEquals((int)response.getWrittenRows(), numberOfRecords ); } - - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { - WireMockServer faultyServer = new WireMockServer( WireMockConfiguration - .options().port(9090).notifier(new ConsoleNotifier(false))); - faultyServer.start(); - - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - - // First request gets no response - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs(STARTED) - .willSetStateTo("Failed") - .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); - - // Second request gets a response (retry) - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs("Failed") - .willSetStateTo("Done") - .willReturn(WireMock.aResponse() - .withHeader("X-ClickHouse-Summary", - "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); - - Client mockServerClient = new Client.Builder() - .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) - .setUsername("default") - .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") - .build(); - try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS); - insertResponse.close(); - } catch (Exception e) { - Assert.fail("Unexpected exception", e); - } finally { - faultyServer.stop(); - } - } } From b81bde22e1c6f088a73dc02da89b7ad4d3de371a Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Fri, 30 Aug 2024 19:42:00 -0700 Subject: [PATCH 4/4] made constant for buffer size --- client-v2/src/main/java/com/clickhouse/client/api/Client.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index a358da2c5..ee4f4678d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -801,6 +801,8 @@ public Client build() { return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor); } + private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000; + private void setDefaults() { // set default database name if not specified @@ -858,7 +860,7 @@ private void setDefaults() { } if (!configuration.containsKey("client_network_buffer_size")) { - setClientNetworkBufferSize(8192); + setClientNetworkBufferSize(DEFAULT_NETWORK_BUFFER_SIZE); } if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {