Skip to content

Commit

Permalink
Merge pull request #540 from zhicwu/failed-to-respond
Browse files Browse the repository at this point in the history
Retry for NoHttpResponseException
  • Loading branch information
zhicwu authored Feb 12, 2021
2 parents f6a0d54 + f90acb0 commit 41f3e34
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
+ " ClickHouse rejects request execution if its time exceeds max_execution_time"),


@Deprecated
KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 1000, ""),

/**
Expand All @@ -35,6 +36,7 @@ public enum ClickHouseConnectionSettings implements DriverPropertyCreator {
TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60 * 1000, ""),
DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500, ""),
MAX_TOTAL("maxTotal", 10000, ""),
MAX_RETRIES("maxRetries", 3, "Maximum retries(default to 3) for idempotent operation. Set 0 to disable retry."),

/**
* additional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ClickHouseProperties {
private int timeToLiveMillis;
private int defaultMaxPerRoute;
private int maxTotal;
private int maxRetries;
private String host;
private int port;
private boolean usePathAsDb;
Expand Down Expand Up @@ -113,6 +114,7 @@ public ClickHouseProperties(Properties info) {
this.timeToLiveMillis = (Integer)getSetting(info, ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS);
this.defaultMaxPerRoute = (Integer)getSetting(info, ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE);
this.maxTotal = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_TOTAL);
this.maxRetries = (Integer)getSetting(info, ClickHouseConnectionSettings.MAX_RETRIES);
this.maxCompressBufferSize = (Integer) getSetting(info, ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE);
this.ssl = (Boolean) getSetting(info, ClickHouseConnectionSettings.SSL);
this.sslRootCertificate = (String) getSetting(info, ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE);
Expand Down Expand Up @@ -179,6 +181,7 @@ public Properties asProperties() {
ret.put(ClickHouseConnectionSettings.TIME_TO_LIVE_MILLIS.getKey(), String.valueOf(timeToLiveMillis));
ret.put(ClickHouseConnectionSettings.DEFAULT_MAX_PER_ROUTE.getKey(), String.valueOf(defaultMaxPerRoute));
ret.put(ClickHouseConnectionSettings.MAX_TOTAL.getKey(), String.valueOf(maxTotal));
ret.put(ClickHouseConnectionSettings.MAX_RETRIES.getKey(), String.valueOf(maxRetries));
ret.put(ClickHouseConnectionSettings.MAX_COMPRESS_BUFFER_SIZE.getKey(), String.valueOf(maxCompressBufferSize));
ret.put(ClickHouseConnectionSettings.SSL.getKey(), String.valueOf(ssl));
ret.put(ClickHouseConnectionSettings.SSL_ROOT_CERTIFICATE.getKey(), String.valueOf(sslRootCertificate));
Expand Down Expand Up @@ -248,6 +251,7 @@ public ClickHouseProperties(ClickHouseProperties properties) {
setTimeToLiveMillis(properties.timeToLiveMillis);
setDefaultMaxPerRoute(properties.defaultMaxPerRoute);
setMaxTotal(properties.maxTotal);
setMaxRetries(properties.maxRetries);
setMaxCompressBufferSize(properties.maxCompressBufferSize);
setSsl(properties.ssl);
setSslRootCertificate(properties.sslRootCertificate);
Expand Down Expand Up @@ -594,6 +598,14 @@ public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public int getMaxCompressBufferSize() {
return maxCompressBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
Expand All @@ -50,11 +49,10 @@
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;

import ru.yandex.clickhouse.settings.ClickHouseProperties;
Expand All @@ -71,16 +69,32 @@ public ClickHouseHttpClientBuilder(ClickHouseProperties properties) {
public CloseableHttpClient buildClient() throws Exception {
return HttpClientBuilder.create()
.setConnectionManager(getConnectionManager())
.setRetryHandler(getRequestRetryHandler())
.setConnectionReuseStrategy(getConnectionReuseStrategy())
.setDefaultConnectionConfig(getConnectionConfig())
.setDefaultRequestConfig(getRequestConfig())
.setDefaultHeaders(getDefaultHeaders())
.setDefaultCredentialsProvider(getDefaultCredentialsProvider())
.disableContentCompression() // gzip здесь ни к чему. Используется lz4 при compress=1
.disableContentCompression() // gzip is not needed. Use lz4 when compress=1
.disableRedirectHandling()
.build();
}

private HttpRequestRetryHandler getRequestRetryHandler() {
final int maxRetries = properties.getMaxRetries();
return new DefaultHttpRequestRetryHandler(maxRetries, false) {
@Override
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
if (executionCount > maxRetries || context == null
|| !Boolean.TRUE.equals(context.getAttribute("is_idempotent"))) {
return false;
}

return (exception instanceof NoHttpResponseException) || super.retryRequest(exception, executionCount, context);
}
};
}

public static HttpClientContext createClientContext(ClickHouseProperties props) {
if (props == null
|| !isConfigurationValidForAuth(props))
Expand Down Expand Up @@ -155,29 +169,6 @@ private Collection<Header> getDefaultHeaders() {
return headers;
}

private ConnectionKeepAliveStrategy createKeepAliveStrategy() {
return new ConnectionKeepAliveStrategy() {
@Override
public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) {
// in case of errors keep-alive not always works. close connection just in case
if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) {
return -1;
}
HeaderElementIterator it = new BasicHeaderElementIterator(
httpResponse.headerIterator(HTTP.CONN_DIRECTIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
//String value = he.getValue();
if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) {
return properties.getKeepAliveTimeout();
}
}
return -1;
}
};
}

private SSLContext getSSLContext()
throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package ru.yandex.clickhouse.util;

import org.apache.http.HttpHost;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -141,7 +145,72 @@ private static Object[][] provideAuthUserPasswordTestData() {
null, null, "baz", "Basic ZGVmYXVsdDpiYXo=" // default:baz
},
};
}

private static WireMockServer newServer() {
WireMockServer server = new WireMockServer(
WireMockConfiguration.wireMockConfig().dynamicPort());
server.start();
server.stubFor(WireMock.post(WireMock.urlPathMatching("/*"))
.willReturn(WireMock.aResponse().withStatus(200).withHeader("Connection", "Keep-Alive")
.withHeader("Content-Type", "text/plain; charset=UTF-8")
.withHeader("Transfer-Encoding", "chunked").withHeader("Keep-Alive", "timeout=3")
.withBody("OK.........................").withFixedDelay(2)));
return server;
}

private static void shutDownServerWithDelay(final WireMockServer server, final long delayMs) {
new Thread() {
public void run() {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
e.printStackTrace();
}

server.shutdownServer();
server.stop();
}
}.start();
}

// @Test(dependsOnMethods = { "testWithRetry" }, expectedExceptions = { NoHttpResponseException.class })
public void testWithoutRetry() throws Exception {
final WireMockServer server = newServer();

ClickHouseProperties props = new ClickHouseProperties();
props.setMaxRetries(0);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%201");

shutDownServerWithDelay(server, 100);

try {
client.execute(post);
} finally {
client.close();
}
}

// @Test(expectedExceptions = { HttpHostConnectException.class })
public void testWithRetry() throws Exception {
final WireMockServer server = newServer();

ClickHouseProperties props = new ClickHouseProperties();
// props.setMaxRetries(3);
ClickHouseHttpClientBuilder builder = new ClickHouseHttpClientBuilder(props);
CloseableHttpClient client = builder.buildClient();
HttpContext context = new BasicHttpContext();
context.setAttribute("is_idempotent", Boolean.TRUE);
HttpPost post = new HttpPost("http://localhost:" + server.port() + "/?db=system&query=select%202");

shutDownServerWithDelay(server, 100);

try {
client.execute(post, context);
} finally {
client.close();
}
}
}

0 comments on commit 41f3e34

Please sign in to comment.