From 98f2c455f27e426bb298aad105b8a4e12bae5a23 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 30 Dec 2021 16:57:27 +0800 Subject: [PATCH 1/4] Fix minor issues after 0.3.2 pre-release --- .../clickhouse/client/ClickHouseColumn.java | 13 +- .../clickhouse/client/ClickHouseDataType.java | 1 + clickhouse-jdbc/legacy.xml | 15 ++ .../internal/ClickHouseConnectionImpl.java | 5 +- .../internal/ClickHouseStatementImpl.java | 26 +++- .../internal/InputBasedPreparedStatement.java | 3 +- .../internal/SqlBasedPreparedStatement.java | 145 +++++++++++------- .../internal/TableBasedPreparedStatement.java | 43 ++++-- .../jdbc/ClickHousePreparedStatementTest.java | 121 ++++++++++++--- .../jdbc/ClickHouseStatementTest.java | 115 +++++++++++++- 10 files changed, 372 insertions(+), 115 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseColumn.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseColumn.java index f50a3ea11..ba3e875e1 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseColumn.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseColumn.java @@ -19,6 +19,7 @@ public final class ClickHouseColumn implements Serializable { private static final String KEYWORD_NULLABLE = "Nullable"; private static final String KEYWORD_LOW_CARDINALITY = "LowCardinality"; private static final String KEYWORD_AGGREGATE_FUNCTION = ClickHouseDataType.AggregateFunction.name(); + private static final String KEYWORD_SIMPLE_AGGREGATE_FUNCTION = ClickHouseDataType.SimpleAggregateFunction.name(); private static final String KEYWORD_ARRAY = ClickHouseDataType.Array.name(); private static final String KEYWORD_TUPLE = ClickHouseDataType.Tuple.name(); private static final String KEYWORD_MAP = ClickHouseDataType.Map.name(); @@ -140,8 +141,10 @@ protected static int readColumn(String args, int startIndex, int len, String nam brackets++; } - if (args.startsWith(KEYWORD_AGGREGATE_FUNCTION, i)) { - int index = args.indexOf('(', i + KEYWORD_AGGREGATE_FUNCTION.length()); + String matchedKeyword; + if (args.startsWith((matchedKeyword = KEYWORD_AGGREGATE_FUNCTION), i) + || args.startsWith((matchedKeyword = KEYWORD_SIMPLE_AGGREGATE_FUNCTION), i)) { + int index = args.indexOf('(', i + matchedKeyword.length()); if (index < i) { throw new IllegalArgumentException("Missing function parameters"); } @@ -160,8 +163,8 @@ protected static int readColumn(String args, int startIndex, int len, String nam nestedColumns.add(ClickHouseColumn.of("", p)); } } - column = new ClickHouseColumn(ClickHouseDataType.AggregateFunction, name, args.substring(startIndex, i), - nullable, lowCardinality, params, nestedColumns); + column = new ClickHouseColumn(ClickHouseDataType.valueOf(matchedKeyword), name, + args.substring(startIndex, i), nullable, lowCardinality, params, nestedColumns); column.aggFuncType = aggFunc; } else if (args.startsWith(KEYWORD_ARRAY, i)) { int index = args.indexOf('(', i + KEYWORD_ARRAY.length()); @@ -395,7 +398,9 @@ private ClickHouseColumn(ClickHouseDataType dataType, String columnName, String } public boolean isAggregateFunction() { + // || dataType == ClickHouseDataType.SimpleAggregateFunction; return dataType == ClickHouseDataType.AggregateFunction; + } public boolean isArray() { diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataType.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataType.java index e34c94981..1c23f285a 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataType.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseDataType.java @@ -76,6 +76,7 @@ public enum ClickHouseDataType { "NATIONAL CHARACTER", "NATIONAL CHARACTER LARGE OBJECT", "NATIONAL CHARACTER VARYING", "NCHAR", "NCHAR LARGE OBJECT", "NCHAR VARYING", "NVARCHAR", "TEXT", "TINYBLOB", "TINYTEXT", "VARCHAR", "VARCHAR2"), AggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0), // implementation-defined intermediate state + SimpleAggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0), Array(Object.class, true, true, false, 0, 0, 0, 0, 0), Map(Map.class, true, true, false, 0, 0, 0, 0, 0), Nested(Object.class, true, true, false, 0, 0, 0, 0, 0), Tuple(List.class, true, true, false, 0, 0, 0, 0, 0), Point(Object.class, false, true, true, 33, 0, 0, 0, 0), // same as Tuple(Float64, Float64) diff --git a/clickhouse-jdbc/legacy.xml b/clickhouse-jdbc/legacy.xml index b8a8a5415..c6b986a24 100644 --- a/clickhouse-jdbc/legacy.xml +++ b/clickhouse-jdbc/legacy.xml @@ -121,6 +121,21 @@ testng test + + mysql + mysql-connector-java + test + + + org.mariadb.jdbc + mariadb-java-client + test + + + org.postgresql + postgresql + test + diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java index 073a37c01..4f62170ac 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java @@ -568,9 +568,8 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res if (parsedStmt.hasTempTable()) { // non-insert queries using temp table ps = new TableBasedPreparedStatement(this, - clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), - parsedStmt.getTempTables(), resultSetType, - resultSetConcurrency, resultSetHoldability); + clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), parsedStmt, + resultSetType, resultSetConcurrency, resultSetHoldability); } else if (parsedStmt.getStatementType() == StatementType.INSERT) { if (!ClickHouseChecker.isNullOrBlank(parsedStmt.getInput())) { // insert query using input function diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index 7691dd37c..20f5a0647 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -13,6 +13,7 @@ import java.util.Map.Entry; import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseRequest; @@ -30,6 +31,7 @@ import com.clickhouse.jdbc.SqlExceptionUtils; import com.clickhouse.jdbc.JdbcWrapper; import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; +import com.clickhouse.jdbc.parser.StatementType; public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseStatement { private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class); @@ -50,7 +52,7 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt private int maxFieldSize; private int maxRows; private boolean poolable; - private String queryId; + private volatile String queryId; private int queryTimeout; private ClickHouseResultSet currentResult; @@ -149,6 +151,7 @@ protected int executeInsert(String sql, InputStream input) throws SQLException { try (ClickHouseResponse resp = request.write().query(sql, queryId = connection.newQueryId()) .format(ClickHouseFormat.RowBinary).data(input).execute() .get()) { + updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp); summary = resp.getSummary(); } catch (InterruptedException e) { log.error("can not close stream: %s", e.getMessage()); @@ -197,6 +200,9 @@ protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse rs = currentResult; } else { currentUpdateCount = response.getSummary().getUpdateCount(); + if (currentUpdateCount <= 0) { + currentUpdateCount = 1; + } response.close(); } @@ -362,11 +368,19 @@ public void setQueryTimeout(int seconds) throws SQLException { @Override public void cancel() throws SQLException { - if (this.queryId == null || isClosed()) { + final String qid; + if ((qid = this.queryId) == null || isClosed()) { return; } - executeQuery(String.format("KILL QUERY WHERE query_id='%s'", queryId)); + ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid)) + .whenComplete((summary, exception) -> { + if (exception != null) { + log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage()); + } else if (summary != null) { + log.debug("Killed query [%s]", qid); + } + }); } @Override @@ -500,8 +514,10 @@ public int[] executeBatch() throws SQLException { int len = batchStmts.size(); int[] results = new int[len]; for (int i = 0; i < len; i++) { - try (ClickHouseResponse r = executeStatement(batchStmts.get(i), null, null, null)) { - results[i] = (int) r.getSummary().getWrittenRows(); + ClickHouseSqlStatement s = batchStmts.get(i); + try (ClickHouseResponse r = executeStatement(s, null, null, null)) { + updateResult(s, r); + results[i] = currentUpdateCount <= 0 ? 0 : currentUpdateCount; } catch (Exception e) { results[i] = EXECUTE_FAILED; log.error("Faled to execute task %d of %d", i + 1, len, e); diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java index e011d65a4..423dd51df 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java @@ -107,7 +107,8 @@ public int executeUpdate() throws SQLException { ensureParams(); addBatch(); - return executeBatch()[0]; + int row = getUpdateCount(); + return row > 0 ? row : 0; } @Override diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java index 27e37f352..84066e8e5 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java @@ -116,6 +116,72 @@ protected void ensureParams() throws SQLException { } } + protected int[] executeBatch(boolean keepLastResponse) throws SQLException { + ensureOpen(); + + boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + int[] results = new int[counter]; + ClickHouseResponse r = null; + if (builder.length() > 0) { // insert ... values + int result = 0; + try { + r = executeStatement(builder.toString(), null, null, null); + updateResult(parsedStmt, r); + long rows = r.getSummary().getWrittenRows(); + if (rows > 0 && rows != counter) { + log.warn("Expect %d rows being inserted but got %d", counter, rows); + } + + result = 1; + } catch (Exception e) { + if (!continueOnError) { + throw SqlExceptionUtils.handle(e); + } + // actually we don't know which ones failed + result = EXECUTE_FAILED; + log.error("Failed to execute batch insertion of %d records", counter, e); + } finally { + if (!keepLastResponse && r != null) { + r.close(); + } + clearBatch(); + } + + Arrays.fill(results, result); + } else { + int index = 0; + try { + for (String[] params : batch) { + builder.setLength(0); + preparedQuery.apply(builder, params); + try { + r = executeStatement(builder.toString(), null, null, null); + updateResult(parsedStmt, r); + int count = getUpdateCount(); + results[index] = count > 0 ? count : 0; + } catch (Exception e) { + if (!continueOnError) { + throw SqlExceptionUtils.handle(e); + } + results[index] = EXECUTE_FAILED; + log.error("Failed to execute batch insert at %d of %d", index + 1, counter, e); + } finally { + index++; + if (!keepLastResponse || index < counter || results[index - 1] == EXECUTE_FAILED) { + if (r != null) { + r.close(); + } + } + } + } + } finally { + clearBatch(); + } + } + + return results; + } + protected int toArrayIndex(int parameterIndex) throws SQLException { if (parameterIndex < 1 || parameterIndex > templates.length) { throw SqlExceptionUtils.clientError(ClickHouseUtils @@ -129,19 +195,28 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { public ResultSet executeQuery() throws SQLException { ensureParams(); - // FIXME ResultSet should never be null - StringBuilder builder = new StringBuilder(); - preparedQuery.apply(builder, values); - return executeQuery(builder.toString()); + addBatch(); + int[] results = executeBatch(true); + for (int i = 0; i < results.length; i++) { + if (results[i] == EXECUTE_FAILED) { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + } + return getResultSet(); } @Override public int executeUpdate() throws SQLException { ensureParams(); - StringBuilder builder = new StringBuilder(); - preparedQuery.apply(builder, values); - return executeUpdate(builder.toString()); + addBatch(); + int[] results = executeBatch(false); + for (int i = 0; i < results.length; i++) { + if (results[i] == EXECUTE_FAILED) { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + } + return getUpdateCount(); } @Override @@ -303,9 +378,9 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - StringBuilder builder = new StringBuilder(); - preparedQuery.apply(builder, values); - return execute(builder.toString()); + addBatch(); + executeBatch(true); + return getResultSet() != null; } @Override @@ -342,55 +417,7 @@ public void addBatch() throws SQLException { @Override public int[] executeBatch() throws SQLException { - ensureOpen(); - - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int[] results = new int[counter]; - if (builder.length() > 0) { // insert ... values - int result = 0; - try (ClickHouseResponse r = executeStatement(builder.toString(), null, null, null)) { - long rows = r.getSummary().getWrittenRows(); - if (rows > 0 && rows != counter) { - log.warn("Expect %d rows being inserted but got %d", counter, rows); - } - - result = 1; - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - // actually we don't know which ones failed - result = EXECUTE_FAILED; - log.error("Failed to execute batch insertion of %d records", counter, e); - } finally { - clearBatch(); - } - - Arrays.fill(results, result); - } else { - int index = 0; - StringBuilder builder = new StringBuilder(); - try { - for (String[] params : batch) { - builder.setLength(0); - preparedQuery.apply(builder, params); - try (ClickHouseResponse r = executeStatement(builder.toString(), null, null, null)) { - results[index] = (int) r.getSummary().getWrittenRows(); - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - results[index] = EXECUTE_FAILED; - log.error("Failed to execute batch insert at %d of %d", index + 1, counter, e); - } - index++; - } - } finally { - clearBatch(); - } - } - - return results; + return executeBatch(false); } @Override diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java index 8bd4d0411..a2aa9c13f 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java @@ -11,10 +11,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; -import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Set; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; @@ -31,23 +31,27 @@ public class TableBasedPreparedStatement extends ClickHouseStatementImpl impleme private static final String ERROR_SET_TABLE = "Please use setObject(ClickHouseExternalTable) method instead"; + private final ClickHouseSqlStatement parsedStmt; private final List tables; private final ClickHouseExternalTable[] values; private final List> batch; protected TableBasedPreparedStatement(ClickHouseConnectionImpl connection, ClickHouseRequest request, - Collection tables, int resultSetType, int resultSetConcurrency, int resultSetHoldability) + ClickHouseSqlStatement parsedStmt, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { super(connection, request, resultSetType, resultSetConcurrency, resultSetHoldability); - if (tables == null) { + Set set = parsedStmt != null ? parsedStmt.getTempTables() : null; + if (set == null) { throw SqlExceptionUtils.clientError("Non-null table list is required"); } - int size = tables.size(); + this.parsedStmt = parsedStmt; + int size = set.size(); this.tables = new ArrayList<>(size); - this.tables.addAll(tables); + this.tables.addAll(set); values = new ClickHouseExternalTable[size]; batch = new LinkedList<>(); } @@ -65,6 +69,12 @@ protected void ensureParams() throws SQLException { } } + protected String getSql() { + // why? because request can be modified so it might not always same as + // parsedStmt.getSQL() + return getRequest().getStatements(false).get(0); + } + protected int toArrayIndex(int parameterIndex) throws SQLException { if (parameterIndex < 1 || parameterIndex > values.length) { throw SqlExceptionUtils.clientError(ClickHouseUtils @@ -78,17 +88,17 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { public ResultSet executeQuery() throws SQLException { ensureParams(); - ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getRequest().getStatements(false).get(0)); - return updateResult(stmt, executeStatement(stmt, null, Arrays.asList(values), null)); + ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); + return updateResult(parsedStmt, executeStatement(stmt, null, Arrays.asList(values), null)); } @Override public int executeUpdate() throws SQLException { ensureParams(); - try (ClickHouseResponse r = executeStatement(getRequest().getStatements(false).get(0), null, - Arrays.asList(values), null)) { - return (int) r.getSummary().getWrittenRows(); + try (ClickHouseResponse r = executeStatement(getSql(), null, Arrays.asList(values), null)) { + updateResult(parsedStmt, r); + return getUpdateCount(); } } @@ -162,9 +172,9 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getRequest().getStatements(false).get(0)); + ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); ClickHouseResponse r = executeStatement(stmt, null, Arrays.asList(values), null); - return updateResult(stmt, r) != null; + return updateResult(parsedStmt, r) != null; } @Override @@ -197,11 +207,12 @@ public int[] executeBatch() throws SQLException { int[] results = new int[len]; int counter = 0; try { + String sql = getSql(); for (List list : batch) { - try (ClickHouseResponse r = executeStatement(getRequest().getStatements(false).get(0), null, list, - null)) { - int rows = (int) r.getSummary().getWrittenRows(); - results[counter] = rows > 0 ? rows : 1; + try (ClickHouseResponse r = executeStatement(sql, null, list, null)) { + updateResult(parsedStmt, r); + int rows = getUpdateCount(); + results[counter] = rows > 0 ? rows : 0; } catch (Exception e) { if (!continueOnError) { throw SqlExceptionUtils.handle(e); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 24269ef73..9cc8c4621 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -1,6 +1,8 @@ package com.clickhouse.jdbc; import java.io.ByteArrayInputStream; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -13,8 +15,10 @@ import java.time.ZoneOffset; import java.util.Properties; import java.util.TimeZone; +import java.util.UUID; import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseBitmap; import com.clickhouse.client.data.ClickHouseExternalTable; @@ -74,7 +78,7 @@ public void testReadWriteDate() throws SQLException { @Test(groups = "integration") public void testReadWriteDateWithClientTimeZone() throws SQLException { Properties props = new Properties(); - props.setProperty("use_server_time_zone_for_date", "true"); + props.setProperty(ClickHouseClientOption.USE_SERVER_TIME_ZONE_FOR_DATES.getKey(), "false"); try (ClickHouseConnection conn = newConnection(props); Statement s = conn.createStatement(); PreparedStatement stmt = conn @@ -160,29 +164,28 @@ public void testReadWriteDateTime() throws SQLException { @Test(groups = "integration") public void testReadWriteDateTimeWithClientTimeZone() throws SQLException { Properties props = new Properties(); - props.setProperty("use_server_time_zone_for_date", "false"); + props.setProperty(ClickHouseClientOption.USE_SERVER_TIME_ZONE.getKey(), "false"); LocalDateTime dt = LocalDateTime.of(2021, 3, 25, 8, 50, 56); Timestamp x = Timestamp.valueOf(dt); try (ClickHouseConnection conn = newConnection(props); - PreparedStatement stmt = conn - .prepareStatement("insert into test_read_write_datetime_cz values(?,?,?)")) { - conn.createStatement().execute("drop table if exists test_read_write_datetime_cz;" + Statement s = conn.createStatement()) { + s.execute("drop table if exists test_read_write_datetime_cz;" + "create table test_read_write_datetime_cz(id Int32, d1 DateTime32, d2 DateTime64(3))engine=Memory"); - stmt.setInt(1, 1); - stmt.setObject(2, dt); - stmt.setObject(3, dt); - stmt.addBatch(); - stmt.setInt(1, 2); - stmt.setTimestamp(2, x); - stmt.setTimestamp(3, x); - stmt.addBatch(); - int[] results = stmt.executeBatch(); - Assert.assertEquals(results, new int[] { 1, 1 }); + try (PreparedStatement stmt = conn + .prepareStatement("insert into test_read_write_datetime_cz")) { + stmt.setInt(1, 1); + stmt.setObject(2, dt); + stmt.setObject(3, dt); + stmt.addBatch(); + stmt.setInt(1, 2); + stmt.setTimestamp(2, x); + stmt.setTimestamp(3, x); + stmt.addBatch(); + int[] results = stmt.executeBatch(); + Assert.assertEquals(results, new int[] { 1, 1 }); + } - LocalDateTime dx = dt.atZone(TimeZone.getDefault().toZoneId()) - .withZoneSameInstant(conn.getServerTimeZone().toZoneId()).toLocalDateTime(); - Timestamp xx = Timestamp.valueOf(dx); - ResultSet rs = conn.createStatement().executeQuery("select * from test_read_write_datetime_cz order by id"); + ResultSet rs = s.executeQuery("select * from test_read_write_datetime_cz order by id"); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getInt(1), 1); Assert.assertEquals(rs.getObject(2), dt); @@ -191,14 +194,51 @@ public void testReadWriteDateTimeWithClientTimeZone() throws SQLException { Assert.assertEquals(rs.getTimestamp(3), x); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getInt(1), 2); - Assert.assertEquals(rs.getObject(2), dx); - Assert.assertEquals(rs.getTimestamp(2), xx); - Assert.assertEquals(rs.getObject(3), dx); - Assert.assertEquals(rs.getTimestamp(3), xx); + Assert.assertEquals(rs.getObject(2), dt); + Assert.assertEquals(rs.getTimestamp(2), x); + Assert.assertEquals(rs.getObject(3), dt); + Assert.assertEquals(rs.getTimestamp(3), x); Assert.assertFalse(rs.next()); } } + @Test(groups = "integration") + public void testInsertQueryDateTime64() throws SQLException { + try (ClickHouseConnection conn = newConnection(new Properties()); + ClickHouseStatement s = conn.createStatement();) { + s.execute("drop table if exists test_issue_612;" + + "CREATE TABLE IF NOT EXISTS test_issue_612 (id UUID, date DateTime64(6)) ENGINE = MergeTree() ORDER BY (id, date)"); + UUID id = UUID.randomUUID(); + long value = 1617359745321000L; + try (PreparedStatement ps = conn.prepareStatement("insert into test_issue_612 values(?,?)")) { + ps.setLong(2, value); + ps.setObject(1, id); + ps.execute(); + ps.setObject(1, UUID.randomUUID()); + ps.setString(2, "2021-09-01 00:00:00.123456"); + ps.executeUpdate(); + } + + try (PreparedStatement ps = conn.prepareStatement("select * from test_issue_612 where id = ?")) { + ps.setObject(1, id); + ResultSet rs = ps.executeQuery(); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getObject(1), id); + Assert.assertEquals(rs.getObject(2), LocalDateTime.of(2021, 4, 2, 10, 35, 45, 321000000)); + Assert.assertEquals(rs.getLong(2), 1617359745L); + Assert.assertFalse(rs.next()); + } + + try (PreparedStatement ps = conn.prepareStatement("select * from test_issue_612 where id != ?")) { + ps.setObject(1, id); + ResultSet rs = ps.executeQuery(); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getObject(2), LocalDateTime.of(2021, 9, 1, 0, 0, 0, 123456000)); + Assert.assertFalse(rs.next()); + } + } + } + @Test(groups = "integration") public void testBatchInsert() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -401,6 +441,41 @@ public void testArrayParameter(String t, Object v) throws SQLException { } } + @Test(groups = "integration") + public void testInsertWithFunction() throws Exception { + try (ClickHouseConnection conn = newConnection(new Properties()); + Statement s = conn.createStatement(); + PreparedStatement stmt = conn.prepareStatement( + "insert into test_issue_315(id, src, dst) values (?,IPv4ToIPv6(toIPv4(?)),IPv4ToIPv6(toIPv4(?)))")) { + s.execute("drop table if exists test_issue_315;" + + "create table test_issue_315(id Int32, src IPv6, dst IPv6)engine=Memory"); + + stmt.setObject(1, 1); + stmt.setString(2, "127.0.0.1"); + stmt.setString(3, "127.0.0.2"); + Assert.assertEquals(stmt.executeUpdate(), 1); + + // omitted '(id, src, dst)' in the query for simplicity + try (PreparedStatement ps = conn.prepareStatement("insert into test_issue_315")) { + stmt.setObject(1, 2); + stmt.setObject(2, Inet4Address.getByName("127.0.0.2")); + stmt.setObject(3, Inet6Address.getByName("::1")); + Assert.assertEquals(stmt.executeUpdate(), 1); + } + + ResultSet rs = s.executeQuery("select * from test_issue_315 order by id"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertEquals(rs.getString(2), "0:0:0:0:0:ffff:7f00:1"); + Assert.assertEquals(rs.getString(3), "0:0:0:0:0:ffff:7f00:2"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertEquals(rs.getString(2), "0:0:0:0:0:ffff:7f00:2"); + Assert.assertEquals(rs.getString(3), "0:0:0:0:0:ffff:0:0"); + Assert.assertFalse(rs.next()); + } + } + @Test(groups = "integration") public void testQueryWithNamedParameter() throws SQLException { Properties props = new Properties(); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index effbc5d2a..1ad4d971b 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -1,10 +1,12 @@ package com.clickhouse.jdbc; +import java.sql.Array; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Struct; import java.sql.Time; import java.sql.Timestamp; import java.time.Instant; @@ -18,16 +20,21 @@ import java.util.Calendar; import java.util.Collections; import java.util.GregorianCalendar; +import java.util.List; import java.util.Locale; import java.util.Properties; import java.util.TimeZone; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseDataType; import com.clickhouse.client.ClickHouseParameterizedQuery; import com.clickhouse.client.ClickHouseValues; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseDateTimeValue; +import com.clickhouse.client.http.config.ClickHouseHttpOption; import org.testng.Assert; import org.testng.annotations.Test; @@ -113,6 +120,103 @@ public void testMutation() throws SQLException { } } + @Test(groups = "integration") + public void testAsyncInsert() throws SQLException { + Properties props = new Properties(); + props.setProperty(ClickHouseHttpOption.CUSTOM_PARAMS.getKey(), "async_insert=1,wait_for_async_insert=1"); + try (ClickHouseConnection conn = newConnection(props); + ClickHouseStatement stmt = conn.createStatement();) { + stmt.execute("drop table if exists test_async_insert; " + + "create table test_async_insert(id UInt32, s String) ENGINE = Memory; " + + "INSERT INTO test_async_insert VALUES(1, 'a'); " + + "select * from test_async_insert"); + ResultSet rs = stmt.getResultSet(); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertEquals(rs.getString(2), "a"); + Assert.assertFalse(rs.next()); + } + + props.setProperty(ClickHouseHttpOption.CUSTOM_PARAMS.getKey(), "async_insert=1,wait_for_async_insert=0"); + try (ClickHouseConnection conn = newConnection(props); + ClickHouseStatement stmt = conn.createStatement();) { + stmt.execute("truncate table test_async_insert; " + + "INSERT INTO test_async_insert VALUES(1, 'a'); " + + "select * from test_async_insert"); + ResultSet rs = stmt.getResultSet(); + Assert.assertFalse(rs.next()); + } + } + + @Test(groups = "integration") + public void testCancelQuery() throws Exception { + try (ClickHouseConnection conn = newConnection(new Properties()); + ClickHouseStatement stmt = conn.createStatement();) { + CountDownLatch c = new CountDownLatch(1); + ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete( + (rs, e) -> { + int index = 0; + + try { + while (rs.next()) { + if (index++ < 1) { + c.countDown(); + } + } + } catch (SQLException ex) { + // ignore + } + }); + try { + c.await(5, TimeUnit.SECONDS); + } finally { + stmt.cancel(); + } + } + } + + @Test(groups = "integration") + public void testSimpleAggregateFunction() throws SQLException { + try (ClickHouseConnection conn = newConnection(new Properties()); + ClickHouseStatement stmt = conn.createStatement();) { + stmt.execute("drop table if exists test_simple_agg_func; " + + "CREATE TABLE test_simple_agg_func (x SimpleAggregateFunction(max, UInt64)) ENGINE=AggregatingMergeTree ORDER BY tuple(); " + + "INSERT INTO test_simple_agg_func VALUES(1)"); + + ResultSet rs = stmt.executeQuery("select * from test_simple_agg_func"); + // sorry, not supported at this point + Assert.assertThrows(IllegalArgumentException.class, () -> rs.next()); + } + } + + @Test(groups = "integration") + public void testWrapperObject() throws SQLException { + String sql = "SELECT CAST('[(''a'',''b'')]' AS Array(Tuple(String, String))), ('a', 'b')"; + List expectedTuple = Arrays.asList("a", "b"); + Object expectedArray = new List[] { expectedTuple }; + Properties props = new Properties(); + try (ClickHouseConnection conn = newConnection(props); + ClickHouseStatement stmt = conn.createStatement();) { + ResultSet rs = stmt.executeQuery(sql); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getArray(1).getArray(), expectedArray); + Assert.assertEquals(rs.getObject(1), expectedArray); + Assert.assertEquals(rs.getObject(2), expectedTuple); + Assert.assertFalse(rs.next()); + } + + props.setProperty(JdbcConfig.PROP_WRAPPER_OBJ, "true"); + try (ClickHouseConnection conn = newConnection(props); + ClickHouseStatement stmt = conn.createStatement();) { + ResultSet rs = stmt.executeQuery(sql); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getArray(1).getArray(), expectedArray); + Assert.assertEquals(((Array) rs.getObject(1)).getArray(), expectedArray); + Assert.assertEquals(((Struct) rs.getObject(2)).getAttributes(), expectedTuple.toArray(new String[0])); + Assert.assertFalse(rs.next()); + } + } + @Test(groups = "integration") public void testQuery() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties())) { @@ -129,6 +233,13 @@ public void testQuery() throws SQLException { while (rs.next()) { continue; } + + // batch query + stmt.addBatch("select 1"); + stmt.addBatch("select 2"); + stmt.addBatch("select 3"); + int[] results = stmt.executeBatch(); + Assert.assertEquals(results, new int[] { 0, 0, 0 }); } } @@ -155,10 +266,8 @@ public void testMultiStatementQuery() throws SQLException { @Test(groups = "integration") public void testTimestamp() throws SQLException { Properties props = new Properties(); - TimeZone serverTimeZone = TimeZone.getDefault(); try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement()) { - serverTimeZone = conn.getServerTimeZone(); ResultSet rs = stmt.executeQuery("select now(), now('Asia/Chongqing')"); Assert.assertTrue(rs.next()); LocalDateTime dt1 = (LocalDateTime) rs.getObject(1); @@ -177,8 +286,6 @@ public void testTimestamp() throws SQLException { + "toUInt32(toDateTime('2021-03-25 08:50:56', 'Asia/Chongqing'))"; props.setProperty("use_time_zone", tz); props.setProperty("use_server_time_zone", "false"); - props.setProperty("time_zone_for_date", "CLIENT"); - LocalDateTime dt = LocalDateTime.ofInstant(Instant.ofEpochSecond(1616633456L), serverTimeZone.toZoneId()); try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery(sql); From e70408246ab78cae8f513b07a654735492121e42 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 30 Dec 2021 17:13:28 +0800 Subject: [PATCH 2/4] Skip async_insert test before 21.12 --- .../java/com/clickhouse/jdbc/ClickHouseStatementTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index 1ad4d971b..677107ce8 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -123,6 +123,12 @@ public void testMutation() throws SQLException { @Test(groups = "integration") public void testAsyncInsert() throws SQLException { Properties props = new Properties(); + try (ClickHouseConnection conn = newConnection(props)) { + if (conn.getServerVersion().check("(,21.12)")) { + return; + } + } + props.setProperty(ClickHouseHttpOption.CUSTOM_PARAMS.getKey(), "async_insert=1,wait_for_async_insert=1"); try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement();) { From 27d034f1653cd1221f89f228ccb4547b03b6810e Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 30 Dec 2021 17:17:44 +0800 Subject: [PATCH 3/4] Enable #770 unit tests in CI --- .../yandex/clickhouse/response/ClickHouseResultSetTest.java | 6 +++--- .../response/parser/ClickHouseValueParserTest.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/ClickHouseResultSetTest.java b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/ClickHouseResultSetTest.java index 0e03dd46c..378ccd7d1 100644 --- a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/ClickHouseResultSetTest.java +++ b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/ClickHouseResultSetTest.java @@ -167,7 +167,7 @@ public void withTotals() throws Exception { assertEquals(70511139L, rs.getLong(2)); } - @Test + @Test(groups = "unit") public void withTotalsAndEmptyStrings() throws Exception { String response = "SiteName\tCountry\n" + "String\tString\n" + @@ -203,7 +203,7 @@ public void withTotalsAndEmptyStrings() throws Exception { rs.getTotals(); assertEquals("", rs.getString(1)); - assertEquals(70511139L, rs.getLong(2)); + assertEquals(0L, rs.getLong(2)); } @Test(groups = "unit") @@ -267,7 +267,7 @@ public void withTotalsSingleIntColumn() throws Exception { assertEquals(0L, rs.getLong(1)); } - @Test + @Test(groups = "unit") public void withTotalsSingleFloatColumn() throws Exception { String response = "Code\n" diff --git a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/parser/ClickHouseValueParserTest.java b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/parser/ClickHouseValueParserTest.java index 3c40e031d..beee501d4 100644 --- a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/parser/ClickHouseValueParserTest.java +++ b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/response/parser/ClickHouseValueParserTest.java @@ -195,7 +195,7 @@ public void testParseBoolean() throws SQLException { assertFalse(ClickHouseValueParser.parseBoolean(ByteFragment.fromString(" true"), columnInfo)); } - @Test (dataProvider = "float_test_data") + @Test (dataProvider = "float_test_data", groups = "unit") public void testParseFloat(String byteFragmentString, Float expectedValue) throws SQLException { ClickHouseColumnInfo columnInfo = ClickHouseColumnInfo.parse("Float32", "columnName", null); float floatDelta = 0.001f; @@ -210,7 +210,7 @@ public void testParseFloat(String byteFragmentString, Float expectedValue) throw } } - @Test (dataProvider = "double_test_data") + @Test (dataProvider = "double_test_data", groups = "unit") public void testParseDouble(String byteFragmentString, Double expectedValue) throws SQLException { ClickHouseColumnInfo columnInfo = ClickHouseColumnInfo.parse("Float64", "columnName", null); double doubleDelta = 0.001; @@ -241,7 +241,7 @@ public void testGetParserFloat(String byteFragmentString, Float expectedValue) t } } - @Test (dataProvider = "double_test_data") + @Test (dataProvider = "double_test_data", groups = "unit") public void testGetParserDouble(String byteFragmentString, Double expectedValue) throws SQLException { ClickHouseColumnInfo columnInfo = ClickHouseColumnInfo.parse("Float64", "columnName", null); double doubleDelta = 0.001d; From 70358b1bf9ad343e71a8614904f457977b1ceb5d Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Thu, 30 Dec 2021 17:48:46 +0800 Subject: [PATCH 4/4] Fix time zone check failure --- .../client/data/ClickHouseDateTimeValue.java | 18 +++++++++--------- .../jdbc/ClickHousePreparedStatementTest.java | 6 ++++-- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseDateTimeValue.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseDateTimeValue.java index 7b0cbbe0e..a0f02cb99 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseDateTimeValue.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseDateTimeValue.java @@ -10,7 +10,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.TimeZone; @@ -126,40 +125,41 @@ public ClickHouseDateTimeValue copy(boolean deep) { @Override public byte asByte() { - return isNullOrEmpty() ? (byte) 0 : (byte) getValue().toEpochSecond(ZoneOffset.UTC); + return isNullOrEmpty() ? (byte) 0 : (byte) getValue().atZone(tz.toZoneId()).toEpochSecond(); } @Override public short asShort() { - return isNullOrEmpty() ? (short) 0 : (short) getValue().toEpochSecond(ZoneOffset.UTC); + return isNullOrEmpty() ? (short) 0 : (short) getValue().atZone(tz.toZoneId()).toEpochSecond(); } @Override public int asInteger() { - return isNullOrEmpty() ? 0 : (int) getValue().toEpochSecond(ZoneOffset.UTC); + return isNullOrEmpty() ? 0 : (int) getValue().atZone(tz.toZoneId()).toEpochSecond(); } @Override public long asLong() { - return isNullOrEmpty() ? 0L : getValue().toEpochSecond(ZoneOffset.UTC); + return isNullOrEmpty() ? 0L : getValue().atZone(tz.toZoneId()).toEpochSecond(); } @Override public float asFloat() { return isNullOrEmpty() ? 0F - : getValue().toEpochSecond(ZoneOffset.UTC) + getValue().getNano() / ClickHouseValues.NANOS.floatValue(); + : getValue().atZone(tz.toZoneId()).toEpochSecond() + + getValue().getNano() / ClickHouseValues.NANOS.floatValue(); } @Override public double asDouble() { return isNullOrEmpty() ? 0D - : getValue().toEpochSecond(ZoneOffset.UTC) + : getValue().atZone(tz.toZoneId()).toEpochSecond() + getValue().getNano() / ClickHouseValues.NANOS.doubleValue(); } @Override public BigInteger asBigInteger() { - return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().toEpochSecond(ZoneOffset.UTC)); + return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().atZone(tz.toZoneId()).toEpochSecond()); } @Override @@ -168,7 +168,7 @@ public BigDecimal asBigDecimal(int scale) { BigDecimal v = null; if (value != null) { int nanoSeconds = value.getNano(); - v = new BigDecimal(BigInteger.valueOf(value.toEpochSecond(ZoneOffset.UTC)), scale); + v = new BigDecimal(BigInteger.valueOf(value.atZone(tz.toZoneId()).toEpochSecond()), scale); if (scale != 0 && nanoSeconds != 0) { v = v.add(BigDecimal.valueOf(nanoSeconds).divide(ClickHouseValues.NANOS).setScale(scale, RoundingMode.HALF_UP)); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 9cc8c4621..30334ec6b 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -210,6 +210,8 @@ public void testInsertQueryDateTime64() throws SQLException { + "CREATE TABLE IF NOT EXISTS test_issue_612 (id UUID, date DateTime64(6)) ENGINE = MergeTree() ORDER BY (id, date)"); UUID id = UUID.randomUUID(); long value = 1617359745321000L; + Instant i = Instant.ofEpochMilli(value / 1000L); + LocalDateTime dt = LocalDateTime.ofInstant(i, conn.getServerTimeZone().toZoneId()); try (PreparedStatement ps = conn.prepareStatement("insert into test_issue_612 values(?,?)")) { ps.setLong(2, value); ps.setObject(1, id); @@ -224,8 +226,8 @@ public void testInsertQueryDateTime64() throws SQLException { ResultSet rs = ps.executeQuery(); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getObject(1), id); - Assert.assertEquals(rs.getObject(2), LocalDateTime.of(2021, 4, 2, 10, 35, 45, 321000000)); - Assert.assertEquals(rs.getLong(2), 1617359745L); + Assert.assertEquals(rs.getObject(2), dt); + Assert.assertEquals(rs.getLong(2), dt.atZone(conn.getServerTimeZone().toZoneId()).toEpochSecond()); Assert.assertFalse(rs.next()); }