Skip to content

Commit

Permalink
Merge pull request #1768 from ClickHouse/clientv2_retries
Browse files Browse the repository at this point in the history
[client-v2] Retries on failures
  • Loading branch information
chernser authored Aug 31, 2024
2 parents 584e4ed + b81bde2 commit 12c921d
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 126 deletions.
161 changes: 104 additions & 57 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -82,6 +83,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.SECONDS;

/**
Expand Down Expand Up @@ -728,8 +730,31 @@ public Builder setClientNetworkBufferSize(int size) {
return this;
}


/**
* Sets list of causes that should be retried on.
* Default {@code [NoHttpResponse, ConnectTimeout, ConnectionRequestTimeout]}
* Use {@link ClientFaultCause#None} to disable retries.
*
* @param causes - list of causes
* @return
*/
public Builder retryOnFailures(ClientFaultCause ...causes) {
StringJoiner joiner = new StringJoiner(VALUES_LIST_DELIMITER);
for (ClientFaultCause cause : causes) {
joiner.add(cause.name());
}
this.configuration.put("client_retry_on_failures", joiner.toString());
return this;
}

public Builder setMaxRetries(int maxRetries) {
this.configuration.put(ClickHouseClientOption.RETRY.getKey(), String.valueOf(maxRetries));
return this;
}

public Client build() {
this.configuration = setDefaults(this.configuration);
setDefaults();

// check if endpoint are empty. so can not initiate client
if (this.endpoints.isEmpty()) {
Expand Down Expand Up @@ -776,65 +801,71 @@ public Client build() {
return new Client(this.endpoints, this.configuration, this.useNewImplementation, this.sharedOperationExecutor);
}

private Map<String, String> setDefaults(Map<String, String> userConfig) {
private static final int DEFAULT_NETWORK_BUFFER_SIZE = 300_000;

private void setDefaults() {

// set default database name if not specified
if (!userConfig.containsKey("database")) {
userConfig.put("database", (String) ClickHouseDefaults.DATABASE.getDefaultValue());
if (!configuration.containsKey("database")) {
setDefaultDatabase((String) ClickHouseDefaults.DATABASE.getDefaultValue());
}

if (!userConfig.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) {
userConfig.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(),
String.valueOf(ClickHouseClientOption.MAX_EXECUTION_TIME.getDefaultValue()));
if (!configuration.containsKey(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey())) {
setExecutionTimeout(0, MILLIS);
}

if (!userConfig.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) {
userConfig.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(),
if (!configuration.containsKey(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey())) {
configuration.put(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getKey(),
String.valueOf(ClickHouseClientOption.MAX_THREADS_PER_CLIENT.getDefaultValue()));
}

if (!userConfig.containsKey("compression.lz4.uncompressed_buffer_size")) {
userConfig.put("compression.lz4.uncompressed_buffer_size",
String.valueOf(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE));
if (!configuration.containsKey("compression.lz4.uncompressed_buffer_size")) {
setLZ4UncompressedBufferSize(ClickHouseLZ4OutputStream.UNCOMPRESSED_BUFF_SIZE);
}

if (!configuration.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) {
useServerTimeZone(true);
}

if (!userConfig.containsKey(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey())) {
userConfig.put(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "true");
if (!configuration.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) {
setServerTimeZone("UTC");
}

if (!userConfig.containsKey(ClickHouseClientOption.SERVER_TIME_ZONE.getKey())) {
userConfig.put(ClickHouseClientOption.SERVER_TIME_ZONE.getKey(), "UTC");
if (!configuration.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
useAsyncRequests(false);
}

if (!userConfig.containsKey(ClickHouseClientOption.ASYNC.getKey())) {
userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false");
if (!configuration.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) {
setMaxConnections(10);
}

if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) {
userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10");
if (!configuration.containsKey("connection_request_timeout")) {
setConnectionRequestTimeout(10, SECONDS);
}

if (!userConfig.containsKey("connection_request_timeout")) {
userConfig.put("connection_request_timeout", "10000");
if (!configuration.containsKey("connection_reuse_strategy")) {
setConnectionReuseStrategy(ConnectionReuseStrategy.FIFO);
}

if (!userConfig.containsKey("connection_reuse_strategy")) {
userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name());
if (!configuration.containsKey("connection_pool_enabled")) {
enableConnectionPool(true);
}

if (!userConfig.containsKey("connection_pool_enabled")) {
userConfig.put("connection_pool_enabled", "true");
if (!configuration.containsKey("connection_ttl")) {
setConnectionTTL(-1, MILLIS);
}

if (!userConfig.containsKey("connection_ttl")) {
userConfig.put("connection_ttl", "-1");
if (!configuration.containsKey("client_retry_on_failures")) {
retryOnFailures(ClientFaultCause.NoHttpResponse, ClientFaultCause.ConnectTimeout, ClientFaultCause.ConnectionRequestTimeout);
}

if (!userConfig.containsKey("client_network_buffer_size")) {
setClientNetworkBufferSize(8192);
if (!configuration.containsKey("client_network_buffer_size")) {
setClientNetworkBufferSize(DEFAULT_NETWORK_BUFFER_SIZE);
}

return userConfig;
if (!configuration.containsKey(ClickHouseClientOption.RETRY.getKey())) {
setMaxRetries(3);
}
}
}

Expand Down Expand Up @@ -1008,6 +1039,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand Down Expand Up @@ -1046,16 +1078,19 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException e) {
LOG.warn("Failed to get response. Retrying.", e);
selectedNode = getNextAliveNode();
continue;
} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (IOException e) {
LOG.info("Interrupted while waiting for response.");
throw new ClientException("Failed to get query response", e);
throw new ClientException("Insert request failed", e);
}
}
throw new ClientException("Failed to get table schema: too many retries");
throw new ClientException("Insert request failed after retries", lastException);
};

return runAsyncOperation(supplier, settings.getAllSettings());
Expand All @@ -1075,7 +1110,6 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
}

globalClientStats.get(operationId).stop(ClientMetrics.OP_SERIALIZATION);
LOG.debug("Total serialization time: {}", globalClientStats.get(operationId).getElapsedTime("serialization"));
return insert(tableName, new ByteArrayInputStream(stream.toByteArray()), format, settings);
}
}
Expand Down Expand Up @@ -1132,6 +1166,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();

ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
Expand Down Expand Up @@ -1166,25 +1201,27 @@ public CompletableFuture<InsertResponse> insert(String tableName,
metrics.operationComplete();
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (NoHttpResponseException e) {
if (i < maxRetries) {
try {
data.reset();
} catch (IOException ioe) {
throw new ClientException("Failed to get response", e);
}
LOG.warn("Failed to get response. Retrying.", e);
} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
lastException = httpClientHelper.wrapException("Insert request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying", e);
selectedNode = getNextAliveNode();
} else {
throw new ClientException("Server did not respond", e);
throw lastException;
}
continue;
} catch (IOException e) {
LOG.info("Interrupted while waiting for response.");
throw new ClientException("Failed to get query response", e);
throw new ClientException("Insert request failed", e);
}

if (i < maxRetries) {
try {
data.reset();
} catch (IOException ioe) {
throw new ClientException("Failed to reset stream before next attempt", ioe);
}
}
}
throw new ClientException("Failed to insert data: too many retries");
throw new ClientException("Insert request failed after retries", lastException);
};
} else {
responseSupplier = () -> {
Expand All @@ -1211,7 +1248,6 @@ public CompletableFuture<InsertResponse> insert(String tableName,
clickHouseResponse = future.get();
}
InsertResponse response = new InsertResponse(clickHouseResponse, clientStats);
LOG.debug("Total insert (InputStream) time: {}", clientStats.getElapsedTime("insert"));
return response;
} catch (ExecutionException e) {
throw new ClientException("Failed to get insert response", e.getCause());
Expand Down Expand Up @@ -1300,6 +1336,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
responseSupplier = () -> {
// Selecting some node
ClickHouseNode selectedNode = getNextAliveNode();
ClientException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
try {
ClassicHttpResponse httpResponse =
Expand All @@ -1325,15 +1362,23 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
metrics.operationComplete();

return new QueryResponse(httpResponse, finalSettings.getFormat(), finalSettings, metrics);

} catch ( NoHttpResponseException | ConnectionRequestTimeoutException | ConnectTimeoutException e) {
lastException = httpClientHelper.wrapException("Query request initiation failed", e);
if (httpClientHelper.shouldRetry(e, finalSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedNode = getNextAliveNode();
} else {
throw lastException;
}
} catch (ClientException e) {
throw e;
} catch (ConnectionRequestTimeoutException | ConnectTimeoutException e) {
throw new ConnectionInitiationException("Failed to get connection", e);
} catch (Exception e) {
throw new ClientException("Failed to execute query", e);
throw new ClientException("Query request failed", e);
}
}
throw new ClientException("Failed to get table schema: too many retries");

throw new ClientException("Query request failed after retries", lastException);
};
} else {
ClickHouseRequest<?> request = oldClient.read(getServerNode());
Expand Down Expand Up @@ -1610,4 +1655,6 @@ public Set<String> getEndpoints() {
private ClickHouseNode getNextAliveNode() {
return serverNodes.get(0);
}

public static final String VALUES_LIST_DELIMITER = ",";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.clickhouse.client.api;

public enum ClientFaultCause {

None,

NoHttpResponse,
ConnectTimeout,
ConnectionRequestTimeout,
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
import com.clickhouse.client.ClickHouseSslContextProvider;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.ClientFaultCause;
import com.clickhouse.client.api.ClientMisconfigurationException;
import com.clickhouse.client.api.ConnectionInitiationException;
import com.clickhouse.client.api.ConnectionReuseStrategy;
import com.clickhouse.client.api.ServerException;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.http.ClickHouseHttpProto;
import com.clickhouse.client.http.config.ClickHouseHttpOption;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
Expand Down Expand Up @@ -61,7 +64,11 @@
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand All @@ -78,6 +85,8 @@ public class HttpAPIClientHelper {

private String proxyAuthHeaderValue;

private final Set<ClientFaultCause> defaultRetryCauses;

public HttpAPIClientHelper(Map<String, String> configuration) {
this.chConfiguration = configuration;
this.httpClient = createHttpClient();
Expand All @@ -93,6 +102,11 @@ public HttpAPIClientHelper(Map<String, String> configuration) {
boolean usingServerCompression= chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression);

defaultRetryCauses = SerializerUtils.parseEnumList(chConfiguration.get("client_retry_on_failures"), ClientFaultCause.class);
if (defaultRetryCauses.contains(ClientFaultCause.None)) {
defaultRetryCauses.removeIf(c -> c != ClientFaultCause.None);
}
}

/**
Expand Down Expand Up @@ -426,4 +440,39 @@ public static <T> T getHeaderVal(Header header, T defaultValue, Function<String,

return converter.apply(header.getValue());
}

public boolean shouldRetry(Exception ex, Map<String, Object> requestSettings) {
Set<ClientFaultCause> retryCauses = (Set<ClientFaultCause>)
requestSettings.getOrDefault("retry_on_failures", defaultRetryCauses);

if (retryCauses.contains(ClientFaultCause.None)) {
return false;
}

if (ex instanceof NoHttpResponseException ) {
return retryCauses.contains(ClientFaultCause.NoHttpResponse);
}

if (ex instanceof ConnectException) {
return retryCauses.contains(ClientFaultCause.ConnectTimeout);
}

if (ex instanceof ConnectionRequestTimeoutException) {
return retryCauses.contains(ClientFaultCause.ConnectionRequestTimeout);
}

return false;
}

// This method wraps some client specific exceptions into specific ClientException or just ClientException
// ClientException will be also wrapped
public ClientException wrapException(String message, Exception cause) {
if (cause instanceof ConnectionRequestTimeoutException ||
cause instanceof ConnectTimeoutException ||
cause instanceof ConnectException) {
return new ConnectionInitiationException(message, cause);
}

return new ClientException(message, cause);
}
}
Loading

0 comments on commit 12c921d

Please sign in to comment.