Skip to content

Commit

Permalink
Call HttpURLConnection.disconnect when closing response, and avoid do…
Browse files Browse the repository at this point in the history
…uble buffering
  • Loading branch information
zhicwu committed Feb 22, 2022
1 parent 105de13 commit db4e219
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ private boolean readFully(byte[] b, int off, int len) throws IOException {
}

public ClickHouseLZ4InputStream(InputStream stream) {
super(null);
this(stream, null);
}

public ClickHouseLZ4InputStream(InputStream stream, Runnable afterClose) {
super(afterClose);

this.decompressor = factory.fastDecompressor();
this.stream = ClickHouseChecker.nonNull(stream, "InputStream");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.clickhouse.client.ClickHouseCompression;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseUtils;
Expand Down Expand Up @@ -203,6 +204,14 @@ protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest<?> r
this.defaultHeaders = Collections.unmodifiableMap(map);
}

protected void closeQuietly() {
try {
close();
} catch (Exception e) {
// ignore
}
}

protected String getBaseUrl() {
String baseUrl;
int index = url.indexOf('?');
Expand Down Expand Up @@ -236,23 +245,30 @@ protected OutputStream getRequestOutputStream(OutputStream out) throws IOExcepti
return out;
}

protected InputStream getResponseInputStream(InputStream in) throws IOException {
protected ClickHouseInputStream getResponseInputStream(InputStream in) throws IOException {
Runnable afterClose = null;
if (!isReusable()) {
afterClose = this::closeQuietly;
}
ClickHouseInputStream chInput;
if (config.isCompressServerResponse()) {
// TODO support more algorithms
ClickHouseCompression algorithm = config.getCompressAlgorithmForServerResponse();
switch (algorithm) {
case GZIP:
in = new GZIPInputStream(in);
chInput = ClickHouseInputStream.of(new GZIPInputStream(in), config.getMaxBufferSize(), afterClose);
break;
case LZ4:
in = new ClickHouseLZ4InputStream(in);
chInput = new ClickHouseLZ4InputStream(in, afterClose);
break;
default:
throw new UnsupportedOperationException("Unsupported compression algorithm: " + algorithm);
}
} else {
chInput = ClickHouseInputStream.of(in, config.getMaxBufferSize(), afterClose);
}

return in;
return chInput;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.clickhouse.client.http;

import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -39,16 +38,6 @@ private static long getLongValue(Map<String, String> map, String key) {

protected final ClickHouseResponseSummary summary;

protected void closeConnection() {
if (!connection.isReusable()) {
try {
connection.close();
} catch (Exception e) {
// ignore
}
}
}

protected ClickHouseConfig getConfig(ClickHouseRequest<?> request) {
ClickHouseConfig config = request.getConfig();
if (format != null && format != config.getFormat()) {
Expand All @@ -61,14 +50,14 @@ protected ClickHouseConfig getConfig(ClickHouseRequest<?> request) {
return config;
}

public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream input, String serverDisplayName,
String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) {
public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInputStream input,
String serverDisplayName, String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) {
if (connection == null || input == null) {
throw new IllegalArgumentException("Non-null connection and input stream are required");
}

this.connection = connection;
this.input = ClickHouseInputStream.of(input, connection.config.getMaxBufferSize(), this::closeConnection);
this.input = input;

this.serverDisplayName = !ClickHouseChecker.isNullOrEmpty(serverDisplayName) ? serverDisplayName
: connection.server.getHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ private void checkResponse(HttpURLConnection conn) throws IOException {
// TODO get exception from response header, for example:
// X-ClickHouse-Exception-Code: 47
StringBuilder builder = new StringBuilder();
try (Reader reader = new BufferedReader(
new InputStreamReader(getResponseInputStream(conn.getErrorStream()), StandardCharsets.UTF_8))) {
try (Reader reader = new InputStreamReader(getResponseInputStream(conn.getErrorStream()),
StandardCharsets.UTF_8)) {
int c = 0;
while ((c = reader.read()) != -1) {
builder.append((char) c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ private HttpResponse<InputStream> checkResponse(HttpResponse<InputStream> r) thr
// TODO get exception from response header, for example:
// X-ClickHouse-Exception-Code: 47
StringBuilder builder = new StringBuilder();
try (Reader reader = new BufferedReader(
new InputStreamReader(getResponseInputStream(r.body()), StandardCharsets.UTF_8))) {
try (Reader reader = new InputStreamReader(getResponseInputStream(r.body()), StandardCharsets.UTF_8)) {
int c = 0;
while ((c = reader.read()) != -1) {
builder.append((char) c);
Expand Down Expand Up @@ -278,5 +277,6 @@ public boolean ping(int timeout) {

@Override
public void close() {
// nothing
}
}

0 comments on commit db4e219

Please sign in to comment.