Skip to content

Commit

Permalink
Merge pull request #2046 from ClickHouse/fix-connection-leak
Browse files Browse the repository at this point in the history
Adjusting cancel and ensure close
  • Loading branch information
Paultagoras authored Dec 22, 2024
2 parents 753c989 + 0888d5c commit 5b3917f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 23 deletions.
40 changes: 26 additions & 14 deletions jdbc-v2/src/main/java/com/clickhouse/jdbc/ResultSetImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.Reader;
import java.math.BigDecimal;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.sql.*;
Expand Down Expand Up @@ -68,23 +69,34 @@ public boolean next() throws SQLException {
@Override
public void close() throws SQLException {
closed = true;
if (reader != null) {
try {
reader.close();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}

reader = null;
Exception e = null;
try {
if (reader != null) {
try {
reader.close();
} catch (Exception re) {
log.debug("Error closing reader", re);
e = re;
} finally {
reader = null;
}
}
} finally {
if (response != null) {
try {
response.close();
} catch (Exception re) {
log.debug("Error closing response", re);
e = re;
} finally {
response = null;
}
}
}

if (response != null) {
try {
response.close();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
response = null;
if (e != null) {
throw ExceptionUtils.toSqlState(e);
}
}

Expand Down
38 changes: 29 additions & 9 deletions jdbc-v2/src/main/java/com/clickhouse/jdbc/StatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class StatementImpl implements Statement, JdbcV2Wrapper {
Expand All @@ -34,7 +35,7 @@ public class StatementImpl implements Statement, JdbcV2Wrapper {
private OperationMetrics metrics;
private List<String> batch;
private String lastSql;
private String lastQueryId;
private volatile String lastQueryId;
private String schema;
private int maxRows;

Expand Down Expand Up @@ -146,6 +147,14 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
checkClosed();
QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);

if (mergedSettings.getQueryId() != null) {
lastQueryId = mergedSettings.getQueryId();
} else {
lastQueryId = UUID.randomUUID().toString();
mergedSettings.setQueryId(lastQueryId);
}
LOG.debug("Query ID: {}", lastQueryId);

try {
lastSql = parseJdbcEscapeSyntax(sql);
QueryResponse response;
Expand All @@ -169,7 +178,6 @@ public ResultSetImpl executeQuery(String sql, QuerySettings settings) throws SQL
}
currentResultSet = new ResultSetImpl(this, response, reader);
metrics = response.getMetrics();
lastQueryId = response.getQueryId();
} catch (Exception e) {
throw ExceptionUtils.toSqlState(e);
}
Expand All @@ -193,6 +201,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException

QuerySettings mergedSettings = QuerySettings.merge(connection.getDefaultQuerySettings(), settings);

if (mergedSettings.getQueryId() != null) {
lastQueryId = mergedSettings.getQueryId();
} else {
lastQueryId = UUID.randomUUID().toString();
mergedSettings.setQueryId(lastQueryId);
}

lastSql = parseJdbcEscapeSyntax(sql);
int updateCount = 0;
try (QueryResponse response = queryTimeout == 0 ? connection.client.query(lastSql, mergedSettings).get()
Expand All @@ -212,8 +227,13 @@ public int executeUpdate(String sql, QuerySettings settings) throws SQLException
public void close() throws SQLException {
closed = true;
if (currentResultSet != null) {
currentResultSet.close();
currentResultSet = null;
try {
currentResultSet.close();
} catch (Exception e) {
LOG.debug("Failed to close current result set", e);
} finally {
currentResultSet = null;
}
}
}

Expand Down Expand Up @@ -267,10 +287,10 @@ public void cancel() throws SQLException {
return;
}

try {
connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
lastQueryId), connection.getDefaultQuerySettings()).get();
try (QueryResponse response = connection.client.query(String.format("KILL QUERY%sWHERE query_id = '%s'",
connection.onCluster ? " ON CLUSTER " + connection.cluster + " " : " ",
lastQueryId), connection.getDefaultQuerySettings()).get()){
LOG.debug("Query {} was killed by {}", lastQueryId, response.getQueryId());
} catch (Exception e) {
throw new SQLException(e);
}
Expand Down Expand Up @@ -298,7 +318,7 @@ public boolean execute(String sql) throws SQLException {
return execute(sql, new QuerySettings().setDatabase(schema));
}

private boolean execute(String sql, QuerySettings settings) throws SQLException {
public boolean execute(String sql, QuerySettings settings) throws SQLException {
checkClosed();
StatementType type = parseStatementType(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,12 @@ public static SQLException toSqlState(String message, Exception cause) {

return new SQLException(exceptionMessage, SQL_STATE_CLIENT_ERROR, cause);//Default
}

public static Throwable getRootCause(Throwable throwable) {
Throwable cause = throwable;
while (cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
}
36 changes: 36 additions & 0 deletions jdbc-v2/src/test/java/com/clickhouse/jdbc/StatementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QuerySettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import com.clickhouse.data.ClickHouseVersion;
import org.apache.commons.lang3.RandomStringUtils;
import org.testng.annotations.Test;
Expand All @@ -18,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand All @@ -27,6 +32,8 @@


public class StatementTest extends JdbcIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(StatementTest.class);

@Test(groups = { "integration" })
public void testExecuteQuerySimpleNumbers() throws Exception {
try (Connection conn = getJdbcConnection()) {
Expand Down Expand Up @@ -492,4 +499,33 @@ public void testConnectionExhaustion() throws Exception {
}
}
}

@Test(groups = { "integration" })
public void testConcurrentCancel() throws Exception {
int maxNumConnections = 3;

try (Connection conn = getJdbcConnection()) {
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
stmt.cancel();
}
for (int i = 0; i < maxNumConnections; i++) {
try (StatementImpl stmt = (StatementImpl) conn.createStatement()) {
final int threadNum = i;
log.info("Starting thread {}", threadNum);
new Thread(() -> {
try {
ResultSet rs = stmt.executeQuery("SELECT number FROM system.numbers LIMIT 10000000000");
rs.next();
log.info(rs.getObject(1).toString());
} catch (SQLException e) {
log.error("Error in thread {}", threadNum, e);
}
}).start();

stmt.cancel();
}
}
}
}
}

0 comments on commit 5b3917f

Please sign in to comment.