diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index ec35e64b3..ee4f4678d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -71,6 +71,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.StringJoiner; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -82,6 +83,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import static java.time.temporal.ChronoUnit.MILLIS; import static java.time.temporal.ChronoUnit.SECONDS; /** @@ -728,8 +730,31 @@ public Builder setClientNetworkBufferSize(int size) { return this; } + + /** + * Sets list of causes that should be retried on. + * Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]} + * Use {@link ClientFaultCause#None} to disable retries. + * + * @param causes - list of causes + * @return + */ + public Builder retryOnFailures(ClientFaultCause ...causes) { + StringJoiner joiner = new StringJoiner(VALUES_LIST_DELIMITER); + for (ClientFaultCause cause : causes) { + joiner.add(cause.name()); + } + this.configuration.put("client_retry_on_failures", joiner.toString()); + return this; + } + + public Builder setMaxRetries(int maxRetries) { + this.configuration.put(ClickHouseClientOption.RETRY.getKey(), String.valueOf(maxRetries)); + return this; + } + public Client build() { - this.configuration = setDefaults(this.configuration); + setDefaults(); // check if endpoint are empty. so can not initiate client if (this.endpoints.isEmpty()) { @@ -776,65 +801,71 @@ public Client build() { return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor); } - private Map setDefaults(Map userConfig) { + private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000; + + private void setDefaults() { // set default database name if not specified - if (!userConfig.containsKey("database")) { - userConfig.put("database", (String) ClickHouseDefaults.DATABASE.getDefaultValue()); + if (!configuration.containsKey("database")) { + setDefaultDatabase((String) ClickHouseDefaults.DATABASE.getDefaultValue()); } - if (!userConfig.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) { - userConfig.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), - String.valueOf(ClickHouseClientOption.MAX_EXECUTION_TIME.getDefaultValue())); + if (!configuration.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) { + setExecutionTimeout(0, MILLIS); } - if (!userConfig.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) { - userConfig.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), + if (!configuration.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) { + configuration.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(), String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue())); } - if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) { - userConfig.put("compression.lz4.uncompressed_buffer_size", - String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE)); + if (!configuration.containsKey("compression.lz4.uncompressed_buffer_size")) { + setLZ4UncompressedBufferSize(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE); + } + + if (!configuration.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) { + useServerTimeZone(true); } - if (!userConfig.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) { - userConfig.put(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "true"); + if (!configuration.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) { + setServerTimeZone("UTC"); } - if (!userConfig.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) { - userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC"); + if (!configuration.containsKey(ClickHouseClientOption.ASYNC.getKey())) { + useAsyncRequests(false); } - if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) { - userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false"); + if (!configuration.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) { + setMaxConnections(10); } - if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) { - userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10"); + if (!configuration.containsKey("connection_request_timeout")) { + setConnectionRequestTimeout(10, SECONDS); } - if (!userConfig.containsKey("connection_request_timeout")) { - userConfig.put("connection_request_timeout", "10000"); + if (!configuration.containsKey("connection_reuse_strategy")) { + setConnectionReuseStrategy(ConnectionReuseStrategy.FIFO); } - if (!userConfig.containsKey("connection_reuse_strategy")) { - userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name()); + if (!configuration.containsKey("connection_pool_enabled")) { + enableConnectionPool(true); } - if (!userConfig.containsKey("connection_pool_enabled")) { - userConfig.put("connection_pool_enabled", "true"); + if (!configuration.containsKey("connection_ttl")) { + setConnectionTTL(-1, MILLIS); } - if (!userConfig.containsKey("connection_ttl")) { - userConfig.put("connection_ttl", "-1"); + if (!configuration.containsKey("client_retry_on_failures")) { + retryOnFailures(ClientFaultCause.NoHttpResponse, ClientFaultCause.ConnectTimeout, ClientFaultCause.ConnectionRequestTimeout); } - if (!userConfig.containsKey("client_network_buffer_size")) { - setClientNetworkBufferSize(8192); + if (!configuration.containsKey("client_network_buffer_size")) { + setClientNetworkBufferSize(DEFAULT_NETWORK_BUFFER_SIZE); } - return userConfig; + if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) { + setMaxRetries(3); + } } } @@ -1008,6 +1039,7 @@ public CompletableFuture insert(String tableName, List data, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1046,16 +1078,19 @@ public CompletableFuture insert(String tableName, List data, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException e) { - LOG.warn("Failed to get response. Retrying.", e); - selectedNode = getNextAliveNode(); - continue; + } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) { + lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { + LOG.warn("Retrying", e); + selectedNode = getNextAliveNode(); + } else { + throw lastException; + } } catch (IOException e) { - LOG.info("Interrupted while waiting for response."); - throw new ClientException("Failed to get query response", e); + throw new ClientException("Insert request failed", e); } } - throw new ClientException("Failed to get table schema: too many retries"); + throw new ClientException("Insert request failed after retries", lastException); }; return runAsyncOperation(supplier, settings.getAllSettings()); @@ -1075,7 +1110,6 @@ public CompletableFuture insert(String tableName, List data, } globalClientStats.get(operationId).stop(ClientMetrics.OP_SERIALIZATION); - LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization")); return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), format, settings); } } @@ -1132,6 +1166,7 @@ public CompletableFuture insert(String tableName, // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { // Execute request try (ClassicHttpResponse httpResponse = @@ -1166,25 +1201,27 @@ public CompletableFuture insert(String tableName, metrics.operationComplete(); metrics.setQueryId(queryId); return new InsertResponse(metrics); - } catch (NoHttpResponseException e) { - if (i < maxRetries) { - try { - data.reset(); - } catch (IOException ioe) { - throw new ClientException("Failed to get response", e); - } - LOG.warn("Failed to get response. Retrying.", e); + } catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) { + lastException = httpClientHelper.wrapException("Insert request initiation failed", e); + if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) { + LOG.warn("Retrying", e); selectedNode = getNextAliveNode(); } else { - throw new ClientException("Server did not respond", e); + throw lastException; } - continue; } catch (IOException e) { - LOG.info("Interrupted while waiting for response."); - throw new ClientException("Failed to get query response", e); + throw new ClientException("Insert request failed", e); + } + + if (i < maxRetries) { + try { + data.reset(); + } catch (IOException ioe) { + throw new ClientException("Failed to reset stream before next attempt", ioe); + } } } - throw new ClientException("Failed to insert data: too many retries"); + throw new ClientException("Insert request failed after retries", lastException); }; } else { responseSupplier = () -> { @@ -1211,7 +1248,6 @@ public CompletableFuture insert(String tableName, clickHouseResponse = future.get(); } InsertResponse response = new InsertResponse(clickHouseResponse, clientStats); - LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert")); return response; } catch (ExecutionException e) { throw new ClientException("Failed to get insert response", e.getCause()); @@ -1300,6 +1336,7 @@ public CompletableFuture query(String sqlQuery, Map { // Selecting some node ClickHouseNode selectedNode = getNextAliveNode(); + ClientException lastException = null; for (int i = 0; i <= maxRetries; i++) { try { ClassicHttpResponse httpResponse = @@ -1325,15 +1362,23 @@ public CompletableFuture query(String sqlQuery, Map request = oldClient.read(getServerNode()); @@ -1610,4 +1655,6 @@ public Set getEndpoints() { private ClickHouseNode getNextAliveNode() { return serverNodes.get(0); } + + public static final String VALUES_LIST_DELIMITER = ","; } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java new file mode 100644 index 000000000..8d66f8136 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClientFaultCause.java @@ -0,0 +1,10 @@ +package com.clickhouse.client.api; + +public enum ClientFaultCause { + + None, + + NoHttpResponse, + ConnectTimeout, + ConnectionRequestTimeout, +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 903d3c4e3..ceff6b6c4 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -4,7 +4,9 @@ import com.clickhouse.client.ClickHouseSslContextProvider; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ClientMisconfigurationException; +import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; import com.clickhouse.client.api.ServerException; import com.clickhouse.client.api.enums.ProxyType; @@ -12,6 +14,7 @@ import com.clickhouse.client.config.ClickHouseDefaults; import com.clickhouse.client.http.ClickHouseHttpProto; import com.clickhouse.client.http.config.ClickHouseHttpOption; +import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.RequestConfig; @@ -61,7 +64,11 @@ import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.util.Base64; +import java.util.EnumSet; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -78,6 +85,8 @@ public class HttpAPIClientHelper { private String proxyAuthHeaderValue; + private final Set defaultRetryCauses; + public HttpAPIClientHelper(Map configuration) { this.chConfiguration = configuration; this.httpClient = createHttpClient(); @@ -93,6 +102,11 @@ public HttpAPIClientHelper(Map configuration) { boolean usingServerCompression= chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true"); boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true"); LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression); + + defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class); + if (defaultRetryCauses.contains(ClientFaultCause.None)) { + defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None); + } } /** @@ -426,4 +440,39 @@ public static T getHeaderVal(Header header, T defaultValue, Function requestSettings) { + Set retryCauses = (Set) + requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses); + + if (retryCauses.contains(ClientFaultCause.None)) { + return false; + } + + if (ex instanceof NoHttpResponseException ) { + return retryCauses.contains(ClientFaultCause.NoHttpResponse); + } + + if (ex instanceof ConnectException) { + return retryCauses.contains(ClientFaultCause.ConnectTimeout); + } + + if (ex instanceof ConnectionRequestTimeoutException) { + return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout); + } + + return false; + } + + // This method wraps some client specific exceptions into specific ClientException or just ClientException + // ClientException will be also wrapped + public ClientException wrapException(String message, Exception cause) { + if (cause instanceof ConnectionRequestTimeoutException || + cause instanceof ConnectTimeoutException || + cause instanceof ConnectException) { + return new ConnectionInitiationException(message, cause); + } + + return new ClientException(message, cause); + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java index dcb5a801a..c456b1842 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/SerializerUtils.java @@ -1,5 +1,7 @@ package com.clickhouse.client.api.internal; +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.format.BinaryStreamUtils; import org.slf4j.Logger; @@ -14,9 +16,14 @@ import java.net.Inet6Address; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.StringTokenizer; import java.util.UUID; import static com.clickhouse.data.ClickHouseDataType.*; @@ -212,4 +219,11 @@ public static BigInteger convertToBigInteger(Object value) { } + public static > Set parseEnumList(String value, Class enumType) { + Set values = new HashSet<>(); + for (StringTokenizer causes = new StringTokenizer(value, Client.VALUES_LIST_DELIMITER); causes.hasMoreTokens();) { + values.add(Enum.valueOf(enumType, causes.nextToken())); + } + return values; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 5fd04f471..7f775b8da 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -69,27 +69,6 @@ private static Client[] secureClientProvider() throws Exception { }; } - @Test(groups = { "integration" }) - public void testSecureConnection() { - ClickHouseNode secureServer = getSecureServer(ClickHouseProtocol.HTTP); - - try (Client client = new Client.Builder() - .addEndpoint("https://localhost:" + secureServer.getPort()) - .setUsername("default") - .setPassword("") - .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") - .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .build()) { - - List records = client.queryAll("SELECT timezone()"); - Assert.assertTrue(records.size() > 0); - Assert.assertEquals(records.get(0).getString(1), "UTC"); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - @Test public void testRawSettings() { ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); diff --git a/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java similarity index 58% rename from client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java rename to client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 21918980d..2dbf97a6c 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1,24 +1,33 @@ package com.clickhouse.client; import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.ClientFaultCause; import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ConnectionReuseStrategy; +import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.enums.ProxyType; +import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.data.ClickHouseFormat; import com.github.tomakehurst.wiremock.WireMockServer; import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.ConsoleNotifier; import com.github.tomakehurst.wiremock.common.Slf4jNotifier; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.http.Fault; import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.net.URIBuilder; +import org.testcontainers.utility.ThrowingFunction; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.ByteArrayInputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.time.temporal.ChronoUnit; @@ -26,10 +35,18 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; -public class ConnectionManagementTests extends BaseIntegrationTest{ +import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; +public class HttpTransportTests extends BaseIntegrationTest{ + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") @SuppressWarnings("java:S2925") @@ -143,6 +160,7 @@ public void testConnectionRequestTimeout() { .addEndpoint("http://localhost:" + serverPort) .setUsername("default") .setPassword(getPassword()) + .retryOnFailures(ClientFaultCause.None) .useNewImplementation(true) .setMaxConnections(1) .setOption(ClickHouseClientOption.ASYNC.getKey(), "true") @@ -186,4 +204,104 @@ public void testConnectionReuseStrategy() { Assert.fail(e.getMessage()); } } + + @Test(groups = { "integration" }) + public void testSecureConnection() { + ClickHouseNode secureServer = getSecureServer(ClickHouseProtocol.HTTP); + + try (Client client = new Client.Builder() + .addEndpoint("https://localhost:" + secureServer.getPort()) + .setUsername("default") + .setPassword("") + .setRootCertificate("containers/clickhouse-server/certs/localhost.crt") + .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) + .build()) { + + List records = client.queryAll("SELECT timezone()"); + Assert.assertTrue(records.size() > 0); + Assert.assertEquals(records.get(0).getString(1), "UTC"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test(groups = { "integration" }, dataProvider = "NoResponseFailureProvider") + public void testInsertAndNoHttpResponseFailure(String body, int maxRetries, ThrowingFunction function, + boolean shouldFail) { + WireMockServer faultyServer = new WireMockServer( WireMockConfiguration + .options().port(9090).notifier(new ConsoleNotifier(false))); + faultyServer.start(); + + // First request gets no response + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withRequestBody(WireMock.equalTo(body)) + .inScenario("Retry") + .whenScenarioStateIs(STARTED) + .willSetStateTo("Failed") + .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); + + // Second request gets a response (retry) + faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withRequestBody(WireMock.equalTo(body)) + .inScenario("Retry") + .whenScenarioStateIs("Failed") + .willSetStateTo("Done") + .willReturn(WireMock.aResponse() + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); + + Client mockServerClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) + .setUsername("default") + .setPassword("") + .useNewImplementation(true) // because of the internal differences + .compressClientRequest(false) + .setMaxRetries(maxRetries) + .build(); + + try { + function.apply(mockServerClient); + } catch (ClientException e) { + e.printStackTrace(); + if (!shouldFail) { + Assert.fail("Unexpected exception", e); + } + return; + } catch (Exception e) { + Assert.fail("Unexpected exception", e); + } finally { + faultyServer.stop(); + } + + if (shouldFail) { + Assert.fail("Expected exception"); + } + } + + @DataProvider(name = "NoResponseFailureProvider") + public static Object[][] noResponseFailureProvider() { + + String insertBody = "INSERT INTO table01 FORMAT " + ClickHouseFormat.TSV.name() + " \n1\t2\t3\n"; + ThrowingFunction insertFunction = (client) -> { + InsertResponse insertResponse = client.insert("table01", + new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV).get(30, TimeUnit.SECONDS); + insertResponse.close(); + return null; + }; + + String selectBody = "select timezone()"; + ThrowingFunction queryFunction = (client) -> { + QueryResponse response = client.query("select timezone()").get(30, TimeUnit.SECONDS); + response.close(); + return null; + }; + + return new Object[][]{ + {insertBody, 1, insertFunction, false}, + {selectBody, 1, queryFunction, false}, + {insertBody, 0, insertFunction, true}, + {selectBody, 0, queryFunction, true} + }; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index 8f3cabf92..eac611bf6 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -169,51 +169,4 @@ public void insertRawDataSimple(int numberOfRecords) throws Exception { OperationMetrics metrics = response.getMetrics(); assertEquals((int)response.getWrittenRows(), numberOfRecords ); } - - @Test(groups = { "integration" }, enabled = true) - public void testNoHttpResponseFailure() { - WireMockServer faultyServer = new WireMockServer( WireMockConfiguration - .options().port(9090).notifier(new ConsoleNotifier(false))); - faultyServer.start(); - - byte[] requestBody = ("INSERT INTO table01 FORMAT " + - ClickHouseFormat.TSV.name() + " \n1\t2\t3\n").getBytes(); - - // First request gets no response - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs(STARTED) - .willSetStateTo("Failed") - .willReturn(WireMock.aResponse().withFault(Fault.EMPTY_RESPONSE)).build()); - - // Second request gets a response (retry) - faultyServer.addStubMapping(WireMock.post(WireMock.anyUrl()) - .withRequestBody(WireMock.binaryEqualTo(requestBody)) - .inScenario("Retry") - .whenScenarioStateIs("Failed") - .willSetStateTo("Done") - .willReturn(WireMock.aResponse() - .withHeader("X-ClickHouse-Summary", - "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}")).build()); - - Client mockServerClient = new Client.Builder() - .addEndpoint(Protocol.HTTP, "localhost", faultyServer.port(), false) - .setUsername("default") - .setPassword("") - .useNewImplementation(true) -// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true")) - .compressClientRequest(false) - .setOption(ClickHouseClientOption.RETRY.getKey(), "2") - .build(); - try { - InsertResponse insertResponse = mockServerClient.insert("table01", - new ByteArrayInputStream("1\t2\t3\n".getBytes()), ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS); - insertResponse.close(); - } catch (Exception e) { - Assert.fail("Unexpected exception", e); - } finally { - faultyServer.stop(); - } - } }