From 6fdf02a87f22d96a0ca5c12d60eac82d2ea5541f Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 21 Feb 2022 15:44:14 +0800 Subject: [PATCH 1/6] Change max_result_rows from int to long --- .../main/java/com/clickhouse/client/ClickHouseConfig.java | 6 +++--- .../clickhouse/client/config/ClickHouseClientOption.java | 2 +- .../clickhouse/client/http/ClickHouseHttpConnection.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java index bf40d6305..ca9ed0142 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java @@ -115,7 +115,7 @@ protected static final Object mergeMetricRegistry(List list) { private final int maxExecutionTime; private final int maxQueuedBuffers; private final int maxQueuedRequests; - private final int maxResultRows; + private final long maxResultRows; private final int maxThreads; private final boolean retry; private final boolean reuseValueWrapper; @@ -193,7 +193,7 @@ public ClickHouseConfig(Map options, ClickHouseC this.maxExecutionTime = (int) getOption(ClickHouseClientOption.MAX_EXECUTION_TIME); this.maxQueuedBuffers = (int) getOption(ClickHouseClientOption.MAX_QUEUED_BUFFERS); this.maxQueuedRequests = (int) getOption(ClickHouseClientOption.MAX_QUEUED_REQUESTS); - this.maxResultRows = (int) getOption(ClickHouseClientOption.MAX_RESULT_ROWS); + this.maxResultRows = (long) getOption(ClickHouseClientOption.MAX_RESULT_ROWS); this.maxThreads = (int) getOption(ClickHouseClientOption.MAX_THREADS_PER_CLIENT); this.retry = (boolean) getOption(ClickHouseClientOption.RETRY); this.reuseValueWrapper = (boolean) getOption(ClickHouseClientOption.REUSE_VALUE_WRAPPER); @@ -291,7 +291,7 @@ public int getMaxQueuedRequests() { return maxQueuedRequests; } - public int getMaxResultRows() { + public long getMaxResultRows() { return maxResultRows; } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index c1f2ef9ce..43c7875bc 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -98,7 +98,7 @@ public enum ClickHouseClientOption implements ClickHouseOption { /** * Maximum rows allowed in the result. */ - MAX_RESULT_ROWS("max_result_rows", 0, + MAX_RESULT_ROWS("max_result_rows", 0L, "Limit on the number of rows in the result." + "Also checked for subqueries, and on remote servers when running parts of a distributed query."), /** diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index b2191bd5c..df2c34d19 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -76,8 +76,8 @@ static String buildQueryParams(ClickHouseRequest request) { appendQueryParameter(builder, settingKey, String.valueOf(config.getMaxExecutionTime())); } settingKey = "max_result_rows"; - if (config.getMaxResultRows() > 0 && !settings.containsKey(settingKey)) { - appendQueryParameter(builder, settingKey, String.valueOf(config.getMaxExecutionTime())); + if (config.getMaxResultRows() > 0L && !settings.containsKey(settingKey)) { + appendQueryParameter(builder, settingKey, String.valueOf(config.getMaxResultRows())); appendQueryParameter(builder, "result_overflow_mode", "break"); } settingKey = "log_comment"; From 8ff5cdf36ac3cd26c3e071b9d547b6285ad2409c Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 21 Feb 2022 15:48:07 +0800 Subject: [PATCH 2/6] Add reusable empty ClickHouseResponse which is stateless --- .../clickhouse/client/ClickHouseResponse.java | 37 +++++++++++++++++++ .../client/data/ClickHouseSimpleResponse.java | 1 + 2 files changed, 38 insertions(+) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponse.java index c9467c9bc..4bedc1871 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponse.java @@ -5,6 +5,7 @@ import java.io.OutputStream; import java.io.Serializable; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Spliterator; @@ -28,6 +29,42 @@ * */ public interface ClickHouseResponse extends AutoCloseable, Serializable { + /** + * Empty response that can never be closed. + */ + static final ClickHouseResponse EMPTY = new ClickHouseResponse() { + @Override + public List getColumns() { + return Collections.emptyList(); + } + + @Override + public ClickHouseResponseSummary getSummary() { + return ClickHouseResponseSummary.EMPTY; + } + + @Override + public InputStream getInputStream() { + return null; + } + + @Override + public Iterable records() { + return Collections.emptyList(); + } + + @Override + public void close() { + // do nothing + } + + @Override + public boolean isClosed() { + // ensure the instance is "stateless" + return false; + } + }; + /** * Gets list of columns. * diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java index 81c70a2a1..999a9e4eb 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java @@ -18,6 +18,7 @@ * A simple response built on top of two lists: columns and records. */ public class ClickHouseSimpleResponse implements ClickHouseResponse { + @Deprecated public static final ClickHouseSimpleResponse EMPTY = new ClickHouseSimpleResponse(Collections.emptyList(), new ClickHouseValue[0][], ClickHouseResponseSummary.EMPTY); From ab09d64633281c1f24f5d7f650b5cb0246532a7e Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 21 Feb 2022 22:07:28 +0800 Subject: [PATCH 3/6] Enhance exception handling along with largeMaxRows and largeUpdateCount support --- .../jdbc/ClickHousePreparedStatement.java | 11 - .../clickhouse/jdbc/SqlExceptionUtils.java | 48 ++++ .../internal/AbstractPreparedStatement.java | 153 ++++++++++++ .../internal/ClickHouseConnectionImpl.java | 2 +- .../internal/ClickHouseStatementImpl.java | 141 +++++++---- .../internal/InputBasedPreparedStatement.java | 119 +++++---- .../internal/SqlBasedPreparedStatement.java | 104 ++++---- .../internal/TableBasedPreparedStatement.java | 103 ++++---- .../jdbc/ClickHousePreparedStatementTest.java | 233 +++++++++++++++++- .../jdbc/ClickHouseStatementTest.java | 219 ++++++++++++++-- 10 files changed, 921 insertions(+), 212 deletions(-) create mode 100644 clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java index b899b64cc..2cb2d9c73 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java @@ -68,17 +68,6 @@ default void setObject(int parameterIndex, Object x, int targetSqlType) throws S setObject(parameterIndex, x, targetSqlType, 0); } - @Override - default boolean execute() throws SQLException { - return executeQuery() != null; - } - - @Override - default void addBatch(String sql) throws SQLException { - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); - } - @Override default void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { String s = null; diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java index 10f7a7a98..da5b824be 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java @@ -1,6 +1,7 @@ package com.clickhouse.jdbc; import java.net.ConnectException; +import java.sql.BatchUpdateException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; @@ -76,6 +77,53 @@ public static SQLException handle(Throwable e) { return new SQLException(cause); } + public static BatchUpdateException batchUpdateError(Throwable e, long[] updateCounts) { + if (e == null) { + return new BatchUpdateException("Something went wrong when performing batch update", SQL_STATE_CLIENT_ERROR, + 0, updateCounts, null); + } else if (e instanceof BatchUpdateException) { + return (BatchUpdateException) e; + } else if (e instanceof ClickHouseException) { + return batchUpdateError(e, updateCounts); + } else if (e instanceof SQLException) { + SQLException sqlExp = (SQLException) e; + return new BatchUpdateException(sqlExp.getMessage(), sqlExp.getSQLState(), sqlExp.getErrorCode(), + updateCounts, null); + } + + Throwable cause = e.getCause(); + if (e instanceof BatchUpdateException) { + return (BatchUpdateException) e; + } else if (cause instanceof ClickHouseException) { + return batchUpdateError(cause, updateCounts); + } else if (e instanceof SQLException) { + SQLException sqlExp = (SQLException) e; + return new BatchUpdateException(sqlExp.getMessage(), sqlExp.getSQLState(), sqlExp.getErrorCode(), + updateCounts, null); + } else if (cause == null) { + cause = e; + } + + return new BatchUpdateException("Unexpected error", SQL_STATE_SQL_ERROR, 0, updateCounts, cause); + } + + public static SQLException emptyBatchError() { + return clientError("Please call addBatch method at least once before batch execution"); + } + + public static BatchUpdateException queryInBatchError(int[] updateCounts) { + return new BatchUpdateException("Query is not allow in batch update", SQL_STATE_CLIENT_ERROR, updateCounts); + } + + public static BatchUpdateException queryInBatchError(long[] updateCounts) { + return new BatchUpdateException("Query is not allow in batch update", SQL_STATE_CLIENT_ERROR, 0, updateCounts, + null); + } + + public static SQLException undeterminedExecutionError() { + return clientError("Please either call clearBatch() to clean up context first, or use executeBatch() instead"); + } + public static SQLException forCancellation(Exception e) { Throwable cause = e.getCause(); if (cause == null) { diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java new file mode 100644 index 000000000..5fe5cfaa9 --- /dev/null +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java @@ -0,0 +1,153 @@ +package com.clickhouse.jdbc.internal; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.jdbc.SqlExceptionUtils; + +public abstract class AbstractPreparedStatement extends ClickHouseStatementImpl implements PreparedStatement { + protected AbstractPreparedStatement(ClickHouseConnectionImpl connection, ClickHouseRequest request, + int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + super(connection, request, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + protected abstract long[] executeAny(boolean asBatch) throws SQLException; + + @Override + public final void addBatch(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public long[] executeLargeBatch() throws SQLException { + return executeAny(true); + } + + @Override + public final long executeLargeUpdate(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final ResultSet executeQuery(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeQuery(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate() throws SQLException { + return (int) executeLargeUpdate(); + } + + @Override + public final int executeUpdate(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpate(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } +} diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java index b75f72357..92d726d42 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java @@ -608,7 +608,7 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), ClickHouseColumn.parse(parsedStmt.getInput()), resultSetType, resultSetConcurrency, resultSetHoldability); - } else if (!parsedStmt.containsKeyword("SELECT") && + } else if (!parsedStmt.containsKeyword("SELECT") && !parsedStmt.hasValues() && (!parsedStmt.hasFormat() || clientRequest.getFormat().name().equals(parsedStmt.getFormat()))) { ps = new InputBasedPreparedStatement(this, clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index 3ce069c90..d1a314ac4 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -7,6 +7,7 @@ import java.sql.SQLWarning; import java.sql.Statement; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -22,7 +23,6 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseOption; import com.clickhouse.client.data.ClickHouseExternalTable; -import com.clickhouse.client.data.ClickHouseSimpleResponse; import com.clickhouse.client.logging.Logger; import com.clickhouse.client.logging.LoggerFactory; import com.clickhouse.jdbc.ClickHouseConnection; @@ -43,6 +43,8 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt private final int resultSetConcurrency; private final int resultSetHoldability; + private final List batchStmts; + private boolean closed; private boolean closeOnCompletion; @@ -50,16 +52,15 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt private boolean escapeScan; private int fetchSize; private int maxFieldSize; - private int maxRows; + private long maxRows; private boolean poolable; private volatile String queryId; private int queryTimeout; private ClickHouseResultSet currentResult; - private int currentUpdateCount; + private long currentUpdateCount; protected ClickHouseSqlStatement[] parsedStmts; - protected List batchStmts; private ClickHouseResponse getLastResponse(Map options, List tables, Map settings) throws SQLException { @@ -149,9 +150,8 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt, protected int executeInsert(String sql, InputStream input) throws SQLException { ClickHouseResponseSummary summary = null; try (ClickHouseResponse resp = request.write().query(sql, queryId = connection.newQueryId()) - .format(ClickHouseFormat.RowBinary).data(input).execute() - .get()) { - updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp); + .format(ClickHouseFormat.RowBinary).data(input).execute().get(); + ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) { summary = resp.getSummary(); } catch (InterruptedException e) { log.error("can not close stream: %s", e.getMessage()); @@ -191,22 +191,28 @@ protected ClickHouseSqlStatement parseSqlStatements(String sql) { return getLastStatement(); } + protected ClickHouseResultSet newEmptyResultSet() throws SQLException { + return new ClickHouseResultSet("", "", this, ClickHouseResponse.EMPTY); + } + protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse response) throws SQLException { ResultSet rs = null; if (stmt.isQuery() || !response.getColumns().isEmpty()) { - currentUpdateCount = -1; + currentUpdateCount = -1L; currentResult = new ClickHouseResultSet(stmt.getDatabaseOrDefault(getConnection().getCurrentDatabase()), stmt.getTable(), this, response); rs = currentResult; } else { - currentUpdateCount = response.getSummary().getUpdateCount(); - if (currentUpdateCount <= 0) { - currentUpdateCount = 1; + currentUpdateCount = response.getSummary().getWrittenRows(); + // FIXME apparently this is not always true + if (currentUpdateCount <= 0L) { + currentUpdateCount = 1L; } + currentResult = null; response.close(); } - return rs; + return rs == null ? newEmptyResultSet() : rs; } protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHouseRequest request, @@ -228,29 +234,34 @@ protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHous this.fetchSize = connection.getJdbcConfig().getFetchSize(); this.maxFieldSize = 0; - this.maxRows = 0; + this.maxRows = 0L; this.poolable = false; this.queryId = null; this.queryTimeout = 0; this.currentResult = null; - this.currentUpdateCount = -1; + this.currentUpdateCount = -1L; - this.batchStmts = new ArrayList<>(); + this.batchStmts = new LinkedList<>(); ClickHouseConfig c = request.getConfig(); - setMaxRows(c.getMaxResultRows()); + setLargeMaxRows(c.getMaxResultRows()); setQueryTimeout(c.getMaxExecutionTime()); } + @Override + public boolean execute(String sql) throws SQLException { + executeQuery(sql); + return currentResult != null; + } + @Override public ResultSet executeQuery(String sql) throws SQLException { ensureOpen(); - // forcibly disable extremes for ResultSet queries - // additionalDBParams = importAdditionalDBParameters(additionalDBParams); - // FIXME respect the value set in additionalDBParams? - // additionalDBParams.put(ClickHouseQueryParam.EXTREMES, "0"); + if (!batchStmts.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } parseSqlStatements(sql); @@ -268,8 +279,11 @@ public ResultSet executeQuery(String sql) throws SQLException { } @Override - public int executeUpdate(String sql) throws SQLException { + public long executeLargeUpdate(String sql) throws SQLException { ensureOpen(); + if (!batchStmts.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } parseSqlStatements(sql); @@ -280,7 +294,12 @@ public int executeUpdate(String sql) throws SQLException { throw SqlExceptionUtils.handle(e); } - return summary != null ? (int) summary.getWrittenRows() : 1; + return summary != null ? summary.getWrittenRows() : 1L; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return (int) executeLargeUpdate(sql); } @Override @@ -310,21 +329,26 @@ public void setMaxFieldSize(int max) throws SQLException { } @Override - public int getMaxRows() throws SQLException { + public long getLargeMaxRows() throws SQLException { ensureOpen(); return maxRows; } @Override - public void setMaxRows(int max) throws SQLException { - if (max < 0) { + public int getMaxRows() throws SQLException { + return (int) getLargeMaxRows(); + } + + @Override + public void setLargeMaxRows(long max) throws SQLException { + if (max < 0L) { throw SqlExceptionUtils.clientError("Max rows cannot be set to negative number"); } ensureOpen(); if (this.maxRows != max) { - if (max == 0) { + if (max == 0L) { request.removeSetting("max_result_rows"); request.removeSetting("result_overflow_mode"); } else { @@ -335,6 +359,11 @@ public void setMaxRows(int max) throws SQLException { } } + @Override + public void setMaxRows(int max) throws SQLException { + setLargeMaxRows(max); + } + @Override public void setEscapeProcessing(boolean enable) throws SQLException { ensureOpen(); @@ -402,13 +431,6 @@ public void setCursorName(String name) throws SQLException { cursorName = name; } - @Override - public boolean execute(String sql) throws SQLException { - // currentResult is stored here. InputString and currentResult will be closed on - // this.close() - return executeQuery(sql) != null; - } - @Override public ResultSet getResultSet() throws SQLException { ensureOpen(); @@ -417,12 +439,17 @@ public ResultSet getResultSet() throws SQLException { } @Override - public int getUpdateCount() throws SQLException { + public long getLargeUpdateCount() throws SQLException { ensureOpen(); return currentUpdateCount; } + @Override + public int getUpdateCount() throws SQLException { + return (int) getLargeUpdateCount(); + } + @Override public boolean getMoreResults() throws SQLException { ensureOpen(); @@ -431,7 +458,7 @@ public boolean getMoreResults() throws SQLException { currentResult.close(); currentResult = null; } - currentUpdateCount = -1; + currentUpdateCount = -1L; return false; } @@ -504,29 +531,46 @@ public void addBatch(String sql) throws SQLException { public void clearBatch() throws SQLException { ensureOpen(); - this.batchStmts = new ArrayList<>(); + this.batchStmts.clear(); } @Override public int[] executeBatch() throws SQLException { + long[] largeUpdateCounts = executeLargeBatch(); + + int len = largeUpdateCounts.length; + int[] results = new int[len]; + for (int i = 0; i < len; i++) { + results[i] = (int) largeUpdateCounts[i]; + } + return results; + } + + @Override + public long[] executeLargeBatch() throws SQLException { ensureOpen(); + if (batchStmts.isEmpty()) { + throw SqlExceptionUtils.emptyBatchError(); + } boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int len = batchStmts.size(); - int[] results = new int[len]; + long[] results = new long[batchStmts.size()]; try { - for (int i = 0; i < len; i++) { - ClickHouseSqlStatement s = batchStmts.get(i); - try (ClickHouseResponse r = executeStatement(s, null, null, null)) { - updateResult(s, r); - results[i] = currentUpdateCount <= 0 ? 0 : currentUpdateCount; + int i = 0; + for (ClickHouseSqlStatement s : batchStmts) { + try (ClickHouseResponse r = executeStatement(s, null, null, null); ResultSet rs = updateResult(s, r)) { + if (currentResult != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + results[i] = currentUpdateCount <= 0L ? 0L : currentUpdateCount; } catch (Exception e) { + results[i] = EXECUTE_FAILED; if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - - results[i] = EXECUTE_FAILED; - log.error("Faled to execute task %d of %d", i + 1, len, e); + log.error("Faled to execute task %d of %d", i + 1, batchStmts.size(), e); + } finally { + i++; } } } finally { @@ -559,8 +603,7 @@ public boolean getMoreResults(int current) throws SQLException { public ResultSet getGeneratedKeys() throws SQLException { ensureOpen(); - return new ClickHouseResultSet(request.getConfig().getDatabase(), "unknown", this, - ClickHouseSimpleResponse.EMPTY); + return new ClickHouseResultSet(request.getConfig().getDatabase(), "unknown", this, ClickHouseResponse.EMPTY); } @Override diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java index 9fb3ba02c..f6e9d35bc 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; import java.sql.Array; import java.sql.Date; import java.sql.ParameterMetaData; @@ -33,7 +32,7 @@ import com.clickhouse.jdbc.ClickHousePreparedStatement; import com.clickhouse.jdbc.SqlExceptionUtils; -public class InputBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class InputBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(InputBasedPreparedStatement.class); private final Calendar defaultCalendar; @@ -88,6 +87,56 @@ protected void ensureParams() throws SQLException { } } + @Override + protected long[] executeAny(boolean asBatch) throws SQLException { + ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (counter < 1) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (counter != 0) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } + + long[] results = new long[counter]; + long rows = 0; + try { + stream.close(); + rows = executeInsert(getRequest().getStatements(false).get(0), stream.getInput()); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + // FIXME grpc and tcp by default can provides accurate result + Arrays.fill(results, 1); + } catch (Exception e) { + // just a wild guess... + if (rows < 1) { + results[0] = EXECUTE_FAILED; + } else { + if (rows >= counter) { + rows = counter; + } + for (int i = 0, len = (int) rows - 1; i < len; i++) { + results[i] = 1; + } + results[(int) rows] = EXECUTE_FAILED; + } + if (!continueOnError) { + throw SqlExceptionUtils.batchUpdateError(e, results); + } + log.error("Failed to execute batch insert of %d records", counter + 1, e); + } finally { + clearBatch(); + } + + return results; + } + protected int toArrayIndex(int parameterIndex) throws SQLException { if (parameterIndex < 1 || parameterIndex > values.length) { throw SqlExceptionUtils.clientError(ClickHouseUtils @@ -99,16 +148,32 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { @Override public ResultSet executeQuery() throws SQLException { - throw SqlExceptionUtils.clientError("Input function can be only used for insertion not query"); + ensureParams(); + // log.warn("Input function can be only used for insertion not query"); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + + ResultSet rs = getResultSet(); + if (rs != null) { // should not happen + try { + rs.close(); + } catch (Exception e) { + // ignore + } + } + return newEmptyResultSet(); } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); - addBatch(); - int row = getUpdateCount(); - return row > 0 ? row : 0; + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + long row = getLargeUpdateCount(); + return row > 0L ? row : 0L; } @Override @@ -214,19 +279,12 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - addBatch(); - executeBatch(); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Execution failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } return false; } - @Override - public void addBatch(String sql) throws SQLException { - ensureOpen(); - - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); - } - @Override public void addBatch() throws SQLException { ensureOpen(); @@ -249,33 +307,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - ensureOpen(); - - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int[] results = new int[counter]; - int result = 0; - try { - stream.close(); - result = executeInsert(getRequest().getStatements(false).get(0), stream.getInput()); - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - - result = EXECUTE_FAILED; - log.error("Failed to execute batch insert of %d records", counter + 1, e); - } finally { - clearBatch(); - } - - // FIXME grpc and tcp by default can provides accurate result - Arrays.fill(results, 1); - - return results; - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java index 0492b5f2a..047599cc7 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java @@ -38,7 +38,7 @@ import com.clickhouse.jdbc.SqlExceptionUtils; import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; -public class SqlBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class SqlBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(SqlBasedPreparedStatement.class); private final Calendar defaultCalendar; @@ -116,38 +116,64 @@ protected void ensureParams() throws SQLException { } } - protected int[] executeBatch(boolean keepLastResponse) throws SQLException { + @Override + protected long[] executeAny(boolean asBatch) throws SQLException { ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (counter < 1) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (counter != 0) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int[] results = new int[counter]; + long[] results = new long[counter]; ClickHouseResponse r = null; if (builder.length() > 0) { // insert ... values - int result = 0; + long rows = 0L; try { r = executeStatement(builder.toString(), null, null, null); updateResult(parsedStmt, r); - long rows = r.getSummary().getWrittenRows(); - if (rows > 0 && rows != counter) { - log.warn("Expect %d rows being inserted but got %d", counter, rows); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); } - - result = 1; + rows = r.getSummary().getWrittenRows(); + // no effective rows for update and delete, and the number for insertion is not + // accurate as well + // if (rows > 0L && rows != counter) { + // log.warn("Expect %d rows being inserted but only got %d", counter, rows); + // } + // FIXME needs to enhance http client before getting back to this + Arrays.fill(results, 1); } catch (Exception e) { + // just a wild guess... + if (rows < 1) { + results[0] = EXECUTE_FAILED; + } else { + if (rows >= counter) { + rows = counter; + } + for (int i = 0, len = (int) rows - 1; i < len; i++) { + results[i] = 1; + } + results[(int) rows] = EXECUTE_FAILED; + } + if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - // actually we don't know which ones failed - result = EXECUTE_FAILED; log.error("Failed to execute batch insertion of %d records", counter, e); } finally { - if (!keepLastResponse && r != null) { + if (asBatch && r != null) { r.close(); } clearBatch(); } - - Arrays.fill(results, result); } else { int index = 0; try { @@ -157,20 +183,21 @@ protected int[] executeBatch(boolean keepLastResponse) throws SQLException { try { r = executeStatement(builder.toString(), null, null, null); updateResult(parsedStmt, r); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } int count = getUpdateCount(); results[index] = count > 0 ? count : 0; } catch (Exception e) { + results[index] = EXECUTE_FAILED; if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - results[index] = EXECUTE_FAILED; log.error("Failed to execute batch insert at %d of %d", index + 1, counter, e); } finally { index++; - if (!keepLastResponse || index < counter || results[index - 1] == EXECUTE_FAILED) { - if (r != null) { - r.close(); - } + if (asBatch && r != null) { + r.close(); } } } @@ -195,28 +222,21 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { public ResultSet executeQuery() throws SQLException { ensureParams(); - addBatch(); - int[] results = executeBatch(true); - for (int i = 0; i < results.length; i++) { - if (results[i] == EXECUTE_FAILED) { - throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); - } + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); } - return getResultSet(); + ResultSet rs = getResultSet(); + return rs == null ? newEmptyResultSet() : rs; } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); - addBatch(); - int[] results = executeBatch(false); - for (int i = 0; i < results.length; i++) { - if (results[i] == EXECUTE_FAILED) { - throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); - } + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); } - return getUpdateCount(); + return getLargeUpdateCount(); } @Override @@ -376,8 +396,9 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - addBatch(); - executeBatch(true); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Execution failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } return getResultSet() != null; } @@ -413,11 +434,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - return executeBatch(false); - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java index a2aa9c13f..edbd5d21a 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java @@ -26,7 +26,7 @@ import com.clickhouse.jdbc.SqlExceptionUtils; import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; -public class TableBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class TableBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(TableBasedPreparedStatement.class); private static final String ERROR_SET_TABLE = "Please use setObject(ClickHouseExternalTable) method instead"; @@ -69,6 +69,50 @@ protected void ensureParams() throws SQLException { } } + @Override + public long[] executeAny(boolean asBatch) throws SQLException { + ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (batch.isEmpty()) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } + + long[] results = new long[batch.size()]; + int index = 0; + try { + String sql = getSql(); + for (List list : batch) { + try (ClickHouseResponse r = executeStatement(sql, null, list, null); + ResultSet rs = updateResult(parsedStmt, r)) { + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + long rows = getLargeUpdateCount(); + results[index] = rows > 0L ? rows : 0L; + } catch (Exception e) { + results[index] = EXECUTE_FAILED; + if (!continueOnError) { + throw SqlExceptionUtils.batchUpdateError(e, results); + } + log.error("Failed to execute batch insert at %d of %d", index + 1, batch.size(), e); + } + index++; + } + } finally { + clearBatch(); + } + + return results; + } + protected String getSql() { // why? because request can be modified so it might not always same as // parsedStmt.getSQL() @@ -87,18 +131,24 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { @Override public ResultSet executeQuery() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); return updateResult(parsedStmt, executeStatement(stmt, null, Arrays.asList(values), null)); } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } try (ClickHouseResponse r = executeStatement(getSql(), null, Arrays.asList(values), null)) { updateResult(parsedStmt, r); - return getUpdateCount(); + return getLargeUpdateCount(); } } @@ -171,18 +221,13 @@ public void setObject(int parameterIndex, Object x) throws SQLException { @Override public boolean execute() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); - ClickHouseResponse r = executeStatement(stmt, null, Arrays.asList(values), null); - return updateResult(parsedStmt, r) != null; - } - - @Override - public void addBatch(String sql) throws SQLException { - ensureOpen(); - - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); + updateResult(parsedStmt, executeStatement(stmt, null, Arrays.asList(values), null)); + return getResultSet() != null; } @Override @@ -198,38 +243,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - ensureOpen(); - - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int len = batch.size(); - int[] results = new int[len]; - int counter = 0; - try { - String sql = getSql(); - for (List list : batch) { - try (ClickHouseResponse r = executeStatement(sql, null, list, null)) { - updateResult(parsedStmt, r); - int rows = getUpdateCount(); - results[counter] = rows > 0 ? rows : 0; - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - - results[counter] = EXECUTE_FAILED; - log.error("Failed to execute batch insert at %d of %d", counter + 1, len, e); - } - counter++; - } - } finally { - clearBatch(); - } - - return results; - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 5221226d2..ec9602eb1 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -3,6 +3,8 @@ import java.io.ByteArrayInputStream; import java.net.Inet4Address; import java.net.Inet6Address; +import java.sql.BatchUpdateException; +import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -22,6 +24,8 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseBitmap; import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.jdbc.internal.InputBasedPreparedStatement; +import com.clickhouse.jdbc.internal.SqlBasedPreparedStatement; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -35,6 +39,51 @@ private Object[][] getTypedParameters() { LocalDateTime.of(2021, 11, 2, 2, 3, 4) } } }; } + @DataProvider(name = "statementAndParams") + private Object[][] getStatementAndParameters() { + return new Object[][] { + // ddl + new Object[] { "ddl", "drop table if exists non_existing_table", SqlBasedPreparedStatement.class, false, + null, false }, + // query + new Object[] { "select1", "select 1", SqlBasedPreparedStatement.class, true, null, false }, + new Object[] { "select_param", "select ?", SqlBasedPreparedStatement.class, true, new String[] { "1" }, + false }, + // mutation + new Object[] { "insert_static", "insert into $table values(1)", + SqlBasedPreparedStatement.class, false, null, + false }, + new Object[] { "insert_table", "insert into $table", InputBasedPreparedStatement.class, false, + new String[] { "2" }, true }, + new Object[] { "insert_param", "insert into $table values(?)", SqlBasedPreparedStatement.class, + false, + new String[] { "3" }, true }, + new Object[] { "insert_input", "insert into $table select s from input('s String')", + InputBasedPreparedStatement.class, false, new String[] { "4" }, true }, + }; + } + + private void setParameters(PreparedStatement ps, String[] params) throws SQLException { + if (params != null) { + for (int i = 0; i < params.length; i++) { + ps.setString(i + 1, params[i]); + } + } + } + + private void checkTable(Statement stmt, String query, String[] results) throws SQLException { + if (results == null) { + return; + } + try (ResultSet rs = stmt.executeQuery(query)) { + for (int i = 0; i < results.length; i++) { + Assert.assertTrue(rs.next(), "Should have next row"); + Assert.assertEquals(rs.getString(1), results[i]); + } + Assert.assertFalse(rs.next(), "Should not have next row"); + } + } + @Test(groups = "integration") public void testReadWriteBinaryString() throws SQLException { Properties props = new Properties(); @@ -237,6 +286,34 @@ public void testReadWriteDateTime() throws SQLException { } } + @Test(groups = "integration") + public void testReadWriteDateTimeWithNanos() throws SQLException { + try (ClickHouseConnection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + stmt.execute("drop table if exists test_read_write_datetime_nanos;" + + "CREATE TABLE test_read_write_datetime_nanos (id UUID, date DateTime64(3)) ENGINE = MergeTree() ORDER BY (id, date)"); + UUID id = UUID.randomUUID(); + long value = 1617359745321000L; + Instant i = Instant.ofEpochMilli(value / 1000L); + LocalDateTime dt = LocalDateTime.ofInstant(i, conn.getServerTimeZone().toZoneId()); + try (PreparedStatement ps = conn + .prepareStatement("insert into test_read_write_datetime_nanos values(?,?)")) { + ps.setObject(1, id); + ps.setObject(2, dt); + // below works too but too slow + // ps.setTimestamp(2, new Timestamp(value / 1000L)); + ps.executeUpdate(); + } + + ResultSet rs = stmt.executeQuery("select * from test_read_write_datetime_nanos"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getObject(1), id); + Assert.assertEquals(rs.getObject(2), dt); + // rs.getString(2) will return "2021-04-02 03:35:45.321" + Assert.assertFalse(rs.next()); + } + } + @Test(groups = "integration") public void testReadWriteDateTimeWithClientTimeZone() throws SQLException { Properties props = new Properties(); @@ -478,8 +555,160 @@ public void testBatchQuery() throws SQLException { stmt.addBatch(); stmt.setInt(1, 2); stmt.addBatch(); - int[] results = stmt.executeBatch(); - Assert.assertEquals(results, new int[] { 0, 0 }); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + } + } + + @Test(dataProvider = "statementAndParams", groups = "integration") + public void testExecuteWithOrWithoutParameters(String tableSuffix, String query, Class clazz, + boolean hasResultSet, String[] params, boolean checkTable) throws SQLException { + String tableName = "test_execute_ps_" + tableSuffix; + query = query.replace("$table", tableName); + Properties props = new Properties(); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("drop table if exists " + tableName + + "; create table " + tableName + "(s String)engine=Memory"), "Should not have result set"); + + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + + // executeQuery + setParameters(ps, params); + Assert.assertNotNull(ps.executeQuery(), "executeQuery should never return null result set"); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // execute + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + if (hasResultSet) { + Assert.assertTrue(ps.execute(), "Should have result set"); + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertFalse(ps.execute(), "Should not have result set"); + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeLargeUpdate + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + Assert.assertEquals(ps.executeLargeUpdate(), ps.getLargeUpdateCount()); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeUpdate + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + Assert.assertEquals(ps.executeUpdate(), ps.getUpdateCount()); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + + // executeLargeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertThrows(SQLException.class, () -> ps.executeLargeBatch()); + } else { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + + // executeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertThrows(SQLException.class, () -> ps.executeBatch()); + } else { + Assert.assertEquals(ps.executeBatch(), new int[] { 1 }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + } + + props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); + try (Connection conn = newConnection(props); + Statement stmt = conn.createStatement(); + PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + + // executeLargeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { Statement.EXECUTE_FAILED }); + } else { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertEquals(ps.executeBatch(), new int[] { Statement.EXECUTE_FAILED }); + } else { + Assert.assertEquals(ps.executeBatch(), new int[] { 1 }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); } } diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index c09f3a156..83fef9807 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -1,6 +1,7 @@ package com.clickhouse.jdbc; import java.sql.Array; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -96,8 +97,8 @@ public void testLogComment() throws SQLException { String sql = "-- select something " + uuid + "\nselect 12345"; stmt.execute(sql + "; system flush logs;"); ResultSet rs = stmt.executeQuery( - "select distinct query, type from system.query_log where log_comment = 'select something " + uuid - + "'"); + "select distinct query from system.query_log where type = 'QueryStart' and log_comment = 'select something " + + uuid + "'"); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getString(1), sql); Assert.assertFalse(rs.next()); @@ -122,21 +123,22 @@ public void testMutation() throws SQLException { Assert.assertEquals(conn.createStatement().executeUpdate("update test_mutation set b = 22 where b = 1"), 0); Assert.assertThrows(SQLException.class, - () -> stmt.executeUpdate("update non_exist_table set value=1 where key=1")); + () -> stmt.executeUpdate("update non_existing_table set value=1 where key=1")); - stmt.addBatch("select 1"); - stmt.addBatch("select * from non_exist_table"); - stmt.addBatch("select 2"); + stmt.addBatch("insert into test_mutation values('1',1)"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_mutation values('2',2)"); Assert.assertThrows(SQLException.class, () -> stmt.executeBatch()); } props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement()) { - stmt.addBatch("select 1"); - stmt.addBatch("select * from non_exist_table"); stmt.addBatch("insert into test_mutation values('a',1)"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_mutation values('b',2)"); stmt.addBatch("select 2"); - Assert.assertEquals(stmt.executeBatch(), new int[] { 0, ClickHouseStatement.EXECUTE_FAILED, 1, 0 }); + Assert.assertEquals(stmt.executeBatch(), + new int[] { 1, Statement.EXECUTE_FAILED, 1, Statement.EXECUTE_FAILED }); } } @@ -174,7 +176,8 @@ public void testAsyncInsert() throws SQLException { + "INSERT INTO test_async_insert VALUES(1, 'a'); " + "select * from test_async_insert"); ResultSet rs = stmt.getResultSet(); - Assert.assertFalse(rs.next()); + Assert.assertFalse(rs.next(), + "Server was probably busy at that time, so the row was inserted before your query"); } } @@ -205,6 +208,187 @@ public void testCancelQuery() throws Exception { } } + @Test(groups = "integration") + public void testExecute() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + // ddl + Assert.assertFalse(stmt.execute("drop table if exists non_existing_table"), "Should have no result set"); + Assert.assertEquals(stmt.getResultSet(), null); + Assert.assertTrue(stmt.getUpdateCount() >= 0, "Should have update count"); + // query + Assert.assertTrue(stmt.execute("select 1"), "Should have result set"); + ResultSet rs = stmt.getResultSet(); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertFalse(rs.next(), "Should have only one record"); + // mixed usage + stmt.addBatch("drop table if exists non_existing_table"); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("drop table if exists non_existing_table")); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + stmt.clearBatch(); + Assert.assertFalse(stmt.execute("drop table if exists non_existing_table"), "Should have no result set"); + Assert.assertEquals(stmt.getResultSet(), null); + Assert.assertTrue(stmt.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(stmt.execute("select 2"), "Should have result set"); + rs = stmt.getResultSet(); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertFalse(rs.next(), "Should have only one record"); + } + } + + @Test(groups = "integration") + public void testExecuteBatch() throws SQLException { + Properties props = new Properties(); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertThrows(SQLException.class, () -> stmt.executeBatch()); + stmt.addBatch("select 1"); + // mixed usage + Assert.assertThrows(SQLException.class, () -> stmt.execute("select 2")); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeLargeUpdate("drop table if exists non_existing_table")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeUpdate("drop table if exists non_existing_table")); + // query in batch + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + stmt.addBatch("select 1"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeLargeBatch()); + + Assert.assertFalse(stmt.execute("drop table if exists test_execute_batch; " + + "create table test_execute_batch(a Int32, b String)engine=Memory"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("insert into test_execute_batch values(3,'3')"); + Assert.assertEquals(stmt.executeBatch(), new int[] { 1, 1, 1 }); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("insert into test_execute_batch values(3,'3')"); + Assert.assertEquals(stmt.executeLargeBatch(), new long[] { 1L, 1L, 1L }); + + try (ResultSet rs = stmt.executeQuery("select * from test_execute_batch order by a")) { + int count = 0; + while (rs.next()) { + count++; + Assert.assertEquals(rs.getInt(1), count); + Assert.assertEquals(rs.getString(2), String.valueOf(count)); + } + Assert.assertEquals(count, 3); + } + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeLargeBatch()); + } + + props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("drop table non_existing_table"); + Assert.assertEquals(stmt.executeBatch(), + new int[] { 1, Statement.EXECUTE_FAILED, 1, Statement.EXECUTE_FAILED }); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("drop table non_existing_table"); + Assert.assertEquals(stmt.executeLargeBatch(), + new long[] { 1L, Statement.EXECUTE_FAILED, 1L, Statement.EXECUTE_FAILED }); + try (ResultSet rs = stmt.executeQuery("select * from test_execute_batch order by a")) { + int count = 0; + while (rs.next()) { + count++; + Assert.assertEquals(rs.getInt(1), count); + Assert.assertEquals(rs.getString(2), String.valueOf(count)); + } + Assert.assertEquals(count, 2); + } + } + } + + @Test(groups = "integration") + public void testExecuteQuery() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("select 1"); + Assert.assertTrue(rs == stmt.getResultSet(), "Should be the exact same result set"); + Assert.assertEquals(stmt.getUpdateCount(), -1); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertFalse(rs.next(), "Should have only one record"); + + stmt.addBatch("select 1"); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + stmt.clearBatch(); + rs = stmt.executeQuery("select 2"); + Assert.assertTrue(rs == stmt.getResultSet(), "Should be the exact same result set"); + Assert.assertEquals(stmt.getUpdateCount(), -1); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertFalse(rs.next(), "Should have only one record"); + + // never return null result set + rs = stmt.executeQuery("drop table if exists non_existing_table"); + Assert.assertNotNull(rs, "Should never be null"); + Assert.assertNull(stmt.getResultSet(), "Should be null"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertFalse(rs.next(), "Should has no row"); + } + } + + @Test(groups = "integration") + public void testExecuteUpdate() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("drop table if exists test_execute_query; " + + "create table test_execute_query(a Int32, b String)engine=Memory"), "Should not have result set"); + + Assert.assertTrue(stmt.executeUpdate("insert into test_execute_query values(1,'1')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + Assert.assertTrue(stmt.executeLargeUpdate("insert into test_execute_query values(1,'1')") >= 0L, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + + stmt.addBatch("select 1"); + Assert.assertThrows(SQLException.class, + () -> stmt.executeUpdate("insert into test_execute_query values(1,'1')")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeLargeUpdate("insert into test_execute_query values(1,'1')")); + stmt.clearBatch(); + + Assert.assertTrue(stmt.executeUpdate("insert into test_execute_query values(2,'2')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + Assert.assertTrue(stmt.executeLargeUpdate("insert into test_execute_query values(2,'2')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + } + } + @Test(groups = "integration") public void testSimpleAggregateFunction() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -248,12 +432,14 @@ public void testWrapperObject() throws SQLException { } @Test(groups = "integration") - public void testQuery() throws SQLException { + public void testQuerySystemLog() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties())) { ClickHouseStatement stmt = conn.createStatement(); stmt.setMaxRows(10); + stmt.setLargeMaxRows(11L); ResultSet rs = stmt.executeQuery("select * from numbers(100)"); + int rows = 0; try (ResultSet colRs = conn.getMetaData().getColumns(null, "system", "query_log", "")) { while (colRs.next()) { continue; @@ -261,15 +447,16 @@ public void testQuery() throws SQLException { } while (rs.next()) { - continue; + rows++; } + Assert.assertEquals(rows, 11); // batch query - stmt.addBatch("select 1"); - stmt.addBatch("select 2"); - stmt.addBatch("select 3"); + stmt.addBatch("drop table if exists non_existing_table1"); + stmt.addBatch("drop table if exists non_existing_table2"); + stmt.addBatch("drop table if exists non_existing_table3"); int[] results = stmt.executeBatch(); - Assert.assertEquals(results, new int[] { 0, 0, 0 }); + Assert.assertEquals(results, new int[] { 1, 1, 1 }); } } From fd4b47f72bc40e5b2c0c443f9c6c1e7d895079aa Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 21 Feb 2022 22:48:02 +0800 Subject: [PATCH 4/6] Port IO classes from 0.3.3 --- .../client/ClickHouseByteBuffer.java | 158 ++++++ .../client/ClickHouseInputStream.java | 523 +++++++++++++----- .../client/ClickHouseOutputStream.java | 324 +++++++++++ .../client/data/ClickHouseLZ4InputStream.java | 95 ++-- .../client/data/ClickHousePipedStream.java | 3 +- .../client/ClickHouseByteBufferTest.java | 57 ++ .../client/ClickHouseInputStreamTest.java | 6 +- .../client/ClickHouseOutputStreamTest.java | 65 +++ .../data/ClickHousePipedStreamTest.java | 8 +- .../client/http/ClickHouseHttpClient.java | 5 +- .../client/http/ClickHouseHttpConnection.java | 7 +- .../client/http/ClickHouseHttpResponse.java | 92 +-- .../http/ClickHouseResponseHandler.java | 5 +- 13 files changed, 1075 insertions(+), 273 deletions(-) create mode 100644 clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseByteBuffer.java create mode 100644 clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java create mode 100644 clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseByteBufferTest.java create mode 100644 clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseByteBuffer.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseByteBuffer.java new file mode 100644 index 000000000..69b7c2d4c --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseByteBuffer.java @@ -0,0 +1,158 @@ +package com.clickhouse.client; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Lite version of {@link java.nio.ByteBuffer}. + */ +public class ClickHouseByteBuffer implements Serializable { + private static final long serialVersionUID = -8178041799873465082L; + + /** + * Empty byte array. + */ + public static final byte[] EMPTY_BYTES = new byte[0]; + + /** + * Empty and read-only byte buffer. + */ + public static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(EMPTY_BYTES).asReadOnlyBuffer(); + + /** + * Creates an empty byte buffer. + * + * @return empty byte buffer + */ + public static ClickHouseByteBuffer newInstance() { + return new ClickHouseByteBuffer(EMPTY_BYTES, 0, 0); + } + + /** + * Wraps given byte array as byte buffer. + * + * @param bytes byte array + * @return byte buffer + */ + public static ClickHouseByteBuffer of(byte[] bytes) { + return bytes == null || bytes.length == 0 ? newInstance() : new ClickHouseByteBuffer(bytes, 0, bytes.length); + } + + /** + * Wraps given byte array as byte buffer. + * + * @param bytes byte array + * @param offset offset + * @param length length + * @return byte buffer + */ + public static ClickHouseByteBuffer of(byte[] bytes, int offset, int length) { + if (bytes == null || bytes.length == 0 || length == 0) { + return newInstance(); + } else { + validate(bytes, offset, length); + } + + return new ClickHouseByteBuffer(bytes, offset, length); + } + + static void validate(byte[] bytes, int offset, int length) { + int len = ClickHouseChecker.nonNull(bytes, "Byte array").length; + if (ClickHouseChecker.between(offset, "Offset", 0, len) + + ClickHouseChecker.between(length, "Length", 0, len) > len) { + throw new IllegalArgumentException( + ClickHouseUtils.format("Offset(%d) plus length(%d) should not greater than %d", offset, length, + len)); + } + } + + protected byte[] array; + protected int position; + protected int length; + + protected ClickHouseByteBuffer(byte[] bytes, int offset, int length) { + this.array = bytes; + this.position = offset; + this.length = length; + } + + public boolean isEmpty() { + return length < 1; + } + + public ClickHouseByteBuffer reset() { + array = EMPTY_BYTES; + position = 0; + length = 0; + return this; + } + + public ClickHouseByteBuffer update(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + reset(); + } else { + array = bytes; + position = 0; + length = bytes.length; + } + + return this; + } + + public ClickHouseByteBuffer update(byte[] bytes, int offset, int length) { + if (bytes == null || bytes.length == 0 || length == 0) { + return reset(); + } else { + validate(bytes, offset, length); + } + + this.array = bytes; + this.position = offset; + this.length = length; + return this; + } + + public byte[] array() { + return array; + } + + public int position() { + return position; + } + + public int length() { + return length; + } + + public int limit() { + return position + length; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = prime + Arrays.hashCode(array); + result = prime * result + position; + result = prime * result + length; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj == null || getClass() != obj.getClass()) { + return false; + } + + ClickHouseByteBuffer other = (ClickHouseByteBuffer) obj; + return Arrays.equals(array, other.array) && position == other.position && length == other.length; + } + + @Override + public String toString() { + return new StringBuilder().append(getClass().getSimpleName()).append("array=").append(array) + .append(", position=").append(position).append(", length=").append(length).append(')').toString(); + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java index 2e07935b4..7c09681d9 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java @@ -19,11 +19,17 @@ public abstract class ClickHouseInputStream extends InputStream { /** * Empty byte array. */ - public static final byte[] EMPTY_BYTES = new byte[0]; + @Deprecated + public static final byte[] EMPTY_BYTES = ClickHouseByteBuffer.EMPTY_BYTES; /** * Empty and read-only byte buffer. */ - public static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap(EMPTY_BYTES).asReadOnlyBuffer(); + @Deprecated + public static final ByteBuffer EMPTY_BUFFER = ClickHouseByteBuffer.EMPTY_BUFFER; + + static final int MIN_BUFFER_SIZE = 1; + static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; + static final String INCOMPLETE_READ_ERROR = "Reached end of input stream after reading %d of %d bytes"; static final class BlockingInputStream extends ClickHouseInputStream { private final BlockingQueue queue; @@ -31,24 +37,24 @@ static final class BlockingInputStream extends ClickHouseInputStream { // too much to maintain a 2-level buffer for reading? private ByteBuffer buffer; - private boolean closed; - BlockingInputStream(BlockingQueue queue, int timeout) { + BlockingInputStream(BlockingQueue queue, int timeout, Runnable afterClose) { + super(afterClose); + this.queue = ClickHouseChecker.nonNull(queue, "Queue"); - this.timeout = timeout; + this.timeout = timeout > 0 ? timeout : 0; this.buffer = null; - this.closed = false; } private void ensureOpen() throws IOException { if (closed) { throw new IOException( - ClickHouseUtils.format("Blocking stream(queue: %d, buffer: %d) has been closed", + ClickHouseUtils.format("Blocking input stream(queue: %d, buffer: %d) has been closed", queue.size(), buffer != null ? buffer.remaining() : 0)); } - if (buffer == null || (buffer != EMPTY_BUFFER && !buffer.hasRemaining())) { + if (buffer == null || (buffer != ClickHouseByteBuffer.EMPTY_BUFFER && !buffer.hasRemaining())) { updateBuffer(); } } @@ -73,43 +79,69 @@ private int updateBuffer() throws IOException { @Override public int available() throws IOException { - if (closed || buffer == EMPTY_BUFFER) { + if (closed || buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { return 0; } return (buffer == null || !buffer.hasRemaining()) ? updateBuffer() : buffer.remaining(); } - @Override - public boolean isClosed() { - return closed; - } - @Override public void close() throws IOException { // it's caller's responsiblity to consume all data in the queue, which will // unblock writer - closed = true; buffer = null; + super.close(); } @Override - public byte readByte() throws IOException { + public int peek() throws IOException { ensureOpen(); - if (buffer == EMPTY_BUFFER) { - close(); - throw new EOFException(); + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { + return -1; } - return buffer.get(); + int b = 0xFF & buffer.get(); + ((Buffer) buffer).position(buffer.position() - 1); + return b; + } + + @Override + public long pipe(ClickHouseOutputStream output) throws IOException { + long count = 0L; + if (output == null || output.isClosed()) { + return count; + } + + ensureOpen(); + + while (buffer != ClickHouseByteBuffer.EMPTY_BUFFER) { + int remain = buffer.remaining(); + if (remain > 0) { + if (buffer.hasArray()) { + byte[] bytes = buffer.array(); + int pos = buffer.position(); + output.write(bytes, pos, remain); + ((Buffer) buffer).position(pos + remain); + } else { + byte[] bytes = new byte[remain]; + buffer.get(bytes); + output.write(bytes); + } + count += remain; + } + updateBuffer(); + } + + return count; } @Override public int read() throws IOException { ensureOpen(); - if (buffer == EMPTY_BUFFER) { + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { return -1; } @@ -120,20 +152,19 @@ public int read() throws IOException { public int read(byte[] b, int off, int len) throws IOException { ensureOpen(); - int counter = 0; + int offset = off; while (len > 0) { - if (buffer == EMPTY_BUFFER) { - return counter > 0 ? counter : -1; + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { + return off > offset ? off - offset : -1; } int remain = buffer.remaining(); if (remain >= len) { buffer.get(b, off, len); - counter += len; + off += len; len = 0; } else { buffer.get(b, off, remain); - counter += remain; off += remain; len -= remain; @@ -141,28 +172,75 @@ public int read(byte[] b, int off, int len) throws IOException { } } - return counter; + return off - offset; + } + + @Override + public ClickHouseByteBuffer read(int len) throws IOException { + if (len <= 0) { + return byteBuffer.reset(); + } + + ensureOpen(); + + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { + closeQuietly(); + throw new EOFException(); + } else if (buffer.remaining() >= len && buffer.hasArray()) { + int position = buffer.position(); + byteBuffer.update(buffer.array(), position, len); + ((Buffer) buffer).position(position + len); + } else { + byteBuffer.update(readBytes(len)); + } + return byteBuffer; } @Override - public String readString(int byteLength, Charset charset) throws IOException { + public byte readByte() throws IOException { ensureOpen(); - if (byteLength < 1) { - return ""; + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { + closeQuietly(); + throw new EOFException(); } - if (charset == null) { - charset = StandardCharsets.UTF_8; + return buffer.get(); + } + + @Override + public byte[] readBytes(int length) throws IOException { + if (length < 1) { + return ClickHouseByteBuffer.EMPTY_BYTES; } - if (!buffer.isReadOnly() && byteLength > 8 && buffer.remaining() > byteLength) { - int pos = buffer.position(); - ((Buffer) buffer).position(pos + byteLength); - return charset.decode(ByteBuffer.wrap(buffer.array(), pos, byteLength)).toString(); + ensureOpen(); + + byte[] bytes = new byte[length]; + int offset = 0; + int len = length; + while (len > 0) { + if (buffer == ClickHouseByteBuffer.EMPTY_BUFFER) { + closeQuietly(); + throw offset == 0 ? new EOFException() + : new IOException(ClickHouseUtils.format(INCOMPLETE_READ_ERROR, offset, length)); + } + + int remain = buffer.remaining(); + if (remain >= len) { + buffer.get(bytes, offset, len); + offset += len; + len = 0; + } else { + buffer.get(bytes, offset, remain); + offset += remain; + len -= remain; + + updateBuffer(); + } } - return new String(readBytes(byteLength), charset); + return bytes; } @Override @@ -172,7 +250,7 @@ public long skip(long n) throws IOException { // peforms better but this is a bit tricky if (n == Long.MAX_VALUE) { long counter = buffer.remaining(); - while (buffer != EMPTY_BUFFER && buffer.limit() > 0) { + while (buffer != ClickHouseByteBuffer.EMPTY_BUFFER && buffer.limit() > 0) { counter += buffer.limit(); updateBuffer(); } @@ -190,68 +268,106 @@ static final class WrappedInputStream extends ClickHouseInputStream { private int position; private int limit; - private boolean closed; - WrappedInputStream(InputStream input, int bufferSize) { - in = ClickHouseChecker.nonNull(input, "InputStream"); - buffer = new byte[bufferSize]; + WrappedInputStream(InputStream input, int bufferSize, Runnable afterClose) { + super(afterClose); + + this.in = ClickHouseChecker.nonNull(input, "InputStream"); + this.buffer = new byte[ClickHouseChecker.between(bufferSize, "BufferSize", MIN_BUFFER_SIZE, + MAX_BUFFER_SIZE)]; + position = 0; limit = 0; - closed = false; } private void ensureOpen() throws IOException { if (closed) { - throw new IOException(ClickHouseUtils.format("Wrapped stream(%s) has been closed", in)); + throw new IOException(ClickHouseUtils.format("Wrapped input stream(%s) has been closed", in)); } } - private int updateBuffer() throws IOException { + /** + * Updates internal buffer backed by byte array. + * + * @return true if buffer has at least one byte available for read; false if + * input stream has been closed or reached end of stream + * @throws IOException when failed to read data from input stream + */ + private boolean updateBuffer() throws IOException { if (closed) { - return -1; + return false; } + byte[] buf = buffer; + int len = buf.length; + int offset = 0; + if (position > 0 && (offset = limit - position) > 0) { + for (int i = 0; i < offset; i++) { + buf[i] = buf[position + i]; + } + } + + while (offset < len) { + int read = in.read(buf, offset, len - offset); + if (read == -1) { + break; + } else { + offset += read; + } + } + + limit = offset; position = 0; - int count = in.read(buffer); - limit = count > 0 ? count : 0; - return count; + return limit > position; } @Override public int available() throws IOException { - return !closed && (position < limit || updateBuffer() > 0) ? limit - position : 0; + return limit > position || updateBuffer() ? limit - position : 0; } @Override - public byte readByte() throws IOException { - if (position >= limit && updateBuffer() < 0) { - try { - close(); - } catch (IOException e) { - // ignore - } - throw new EOFException(); + public void close() throws IOException { + if (closed) { + return; } - return buffer[position++]; + try { + in.close(); + } finally { + position = 0; + limit = 0; + super.close(); + } } @Override - public boolean isClosed() { - return closed; + public int peek() throws IOException { + return limit > position || updateBuffer() ? 0xFF & buffer[position] : -1; } @Override - public void close() throws IOException { - if (!closed) { - try { - in.close(); - } finally { - closed = true; - position = 0; - limit = 0; - } + public long pipe(ClickHouseOutputStream output) throws IOException { + long count = 0L; + if (output == null || output.isClosed()) { + return count; + } + + ensureOpen(); + + int remain = limit - position; + if (remain > 0) { + output.write(buffer, position, remain); + count += remain; + position = limit; } + + while ((remain = in.read(buffer)) != -1) { + output.write(buffer, 0, remain); + count += remain; + } + + return count; } @Override @@ -259,7 +375,7 @@ public int read() throws IOException { ensureOpen(); int value = -1; - if (position < limit || updateBuffer() > 0) { + if (position < limit || updateBuffer()) { value = 0xFF & buffer[position++]; } return value; @@ -267,22 +383,43 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { - if (position >= limit && updateBuffer() < 0) { - return -1; + if ((len | off | b.length) < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (off == b.length) { + throw new IOException("Nothing to read"); + } else if (buffer == b) { + // in case b is the byte array return from ClickHouseByteBuffer.array() + throw new IllegalArgumentException( + "Please pass a different byte array instead of internal buffer for reading"); + } else if (position + len <= limit) { + System.arraycopy(buffer, position, b, off, len); + position += len; + return len; + } else if (len <= buffer.length) { + if (!updateBuffer()) { + return -1; + } + System.arraycopy(buffer, 0, b, off, limit); + position = limit; + return limit; } ensureOpen(); int counter = 0; - while (counter < len) { - int size = Math.min(limit - position, len - counter); - System.arraycopy(buffer, position, b, off, size); - position += size; - off += size; - counter += size; + int remain = limit - position; + if (remain > 0) { + System.arraycopy(buffer, position, b, off, remain); + counter += remain; + off += remain; + } - if (position >= limit && updateBuffer() < 0) { + while (counter < len) { + int read = in.read(b, off, len - off); + if (read == -1) { break; + } else { + off += read; } } @@ -290,56 +427,79 @@ public int read(byte[] b, int off, int len) throws IOException { } @Override - public byte[] readBytes(int length) throws IOException { - if (length <= 0) { - return EMPTY_BYTES; + public ClickHouseByteBuffer read(int len) throws IOException { + if (len <= 0) { + return byteBuffer.reset(); } ensureOpen(); - byte[] bytes = new byte[length]; - int counter = 0; - while (counter < length) { - if (position >= limit && updateBuffer() < 0) { - try { - close(); - } catch (IOException e) { - // ignore - } - throw counter == 0 ? new EOFException() - : new IOException(ClickHouseUtils - .format("Reached end of input stream after reading %d of %d bytes", counter, - bytes.length)); - } - - int size = Math.min(limit - position, length - counter); - System.arraycopy(buffer, position, bytes, counter, size); - position += size; - counter += size; + if (position >= limit && !updateBuffer()) { + closeQuietly(); + throw new EOFException(); } - return bytes; + int newLimit = position + len; + if (limit >= newLimit) { + byteBuffer.update(buffer, position, len); + position = newLimit; + } else { + byteBuffer.update(readBytes(len)); + } + return byteBuffer; } @Override - public String readString(int byteLength, Charset charset) throws IOException { - ensureOpen(); - - if (byteLength < 1) { - return ""; + public byte readByte() throws IOException { + if (position < limit || updateBuffer()) { + return buffer[position++]; + } else { + closeQuietly(); + throw new EOFException(); } + } - if (charset == null) { - charset = StandardCharsets.UTF_8; + @Override + public byte[] readBytes(int length) throws IOException { + if (length < 1) { + return ClickHouseByteBuffer.EMPTY_BYTES; + } else if (position + length <= limit) { + byte[] bytes = new byte[length]; + System.arraycopy(buffer, position, bytes, 0, length); + position += length; + return bytes; + } else if (length <= buffer.length) { + if (!updateBuffer()) { + closeQuietly(); + throw new EOFException( + ClickHouseUtils.format("Failed to read %d bytes due to end of stream", length)); + } else if (length > limit) { + throw new EOFException(ClickHouseUtils.format("Reached end of stream after reading %d bytes of %d", + limit, length)); + } + byte[] bytes = new byte[length]; + System.arraycopy(buffer, position, bytes, 0, length); + position += length; + return bytes; } - if (limit - position > byteLength) { - int offset = position; - position += byteLength; - return new String(buffer, offset, byteLength, charset); - } + ensureOpen(); - return new String(readBytes(byteLength), charset); + byte[] bytes = new byte[length]; + int counter = 0; + while (counter < length) { + if (limit > position || updateBuffer()) { + int size = Math.min(limit - position, length - counter); + System.arraycopy(buffer, position, bytes, counter, size); + position += size; + counter += size; + } else { + closeQuietly(); + throw counter == 0 ? new EOFException() + : new IOException(ClickHouseUtils.format(INCOMPLETE_READ_ERROR, counter, bytes.length)); + } + } + return bytes; } @Override @@ -348,9 +508,7 @@ public long skip(long n) throws IOException { long counter = 0L; while (n > 0L) { - if (position >= limit && updateBuffer() < 0) { - break; - } else { + if (limit > position || updateBuffer()) { int remain = limit - position; if (n > remain) { n -= remain; @@ -361,6 +519,8 @@ public long skip(long n) throws IOException { position += n; n = 0L; } + } else { + break; } } @@ -376,7 +536,20 @@ public long skip(long n) throws IOException { * @return wrapped input */ public static ClickHouseInputStream of(BlockingQueue queue, int timeout) { - return new BlockingInputStream(queue, timeout); + return new BlockingInputStream(queue, timeout, null); + } + + /** + * Wraps the given blocking queue. + * + * @param queue non-null blocking queue + * @param timeout read timeout in milliseconds + * @param afterClose custom handler will be invoked right after closing the + * input stream + * @return wrapped input + */ + public static ClickHouseInputStream of(BlockingQueue queue, int timeout, Runnable afterClose) { + return new BlockingInputStream(queue, timeout, afterClose); } /** @@ -387,7 +560,7 @@ public static ClickHouseInputStream of(BlockingQueue queue, int time * {@link ClickHouseInputStream} */ public static ClickHouseInputStream of(InputStream input) { - return of(input, (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue()); + return of(input, (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue(), null); } /** @@ -400,10 +573,65 @@ public static ClickHouseInputStream of(InputStream input) { * {@link ClickHouseInputStream} */ public static ClickHouseInputStream of(InputStream input, int bufferSize) { + return of(input, bufferSize, null); + } + + /** + * Wraps the given input stream. + * + * @param input non-null input stream + * @param bufferSize buffer size which is always greater than zero(usually 4096 + * or larger) + * @param afterClose custom handler will be invoked right after closing the + * input stream + * @return wrapped input, or the same input if it's instance of + * {@link ClickHouseInputStream} + */ + public static ClickHouseInputStream of(InputStream input, int bufferSize, Runnable afterClose) { return input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input - : new WrappedInputStream(input, bufferSize); + : new WrappedInputStream(input, bufferSize, afterClose); } + protected final Runnable afterClose; + protected final ClickHouseByteBuffer byteBuffer; + + protected boolean closed; + + protected ClickHouseInputStream(Runnable afterClose) { + this.afterClose = afterClose; + this.byteBuffer = ClickHouseByteBuffer.newInstance(); + + this.closed = false; + } + + protected void closeQuietly() { + try { + close(); + } catch (IOException e) { + // ignore + } + } + + /** + * Peeks one byte. It's similar as {@link #read()} except it never changes + * cursor. + * + * @return the next byte of data, or -1 if the end of the stream is reached + * @throws IOException when failed to read value from input stream or reached + * end of the stream + */ + public abstract int peek() throws IOException; + + /** + * Reads all bytes and write into given output stream. + * + * @param output non-null output stream + * @return bytes being written into output stream + * @throws IOException when failed to read value from input stream or reached + * end of the stream + */ + public abstract long pipe(ClickHouseOutputStream output) throws IOException; + /** * Reads an unsigned byte from the input stream. Unlike {@link #read()}, it will * throw {@link IOException} if the input stream has been closed. @@ -430,7 +658,7 @@ public int readUnsignedByte() throws IOException { /** * Reads {@code length} bytes from the input stream. It behaves in the same - * way as {@link java.io.DataInput#readFully(byte[])}, and it will throw + * way as {@link java.io.DataInput#readFully(byte[])}, except it will throw * {@link IOException} when the input stream has been closed. * * @param length number of bytes to read @@ -440,31 +668,31 @@ public int readUnsignedByte() throws IOException { */ public byte[] readBytes(int length) throws IOException { if (length <= 0) { - return EMPTY_BYTES; + return ClickHouseByteBuffer.EMPTY_BYTES; + } else if (closed) { + throw new IOException("Stream has been closed"); } byte[] bytes = new byte[length]; - - for (int l = length, c = 0, n = 0; l > 0; l -= n) { - n = read(bytes, c, l); - if (n != -1) { - c += n; + int offset = 0; + while (offset < length) { + int read = read(bytes, offset, length - offset); + if (read == -1) { + closeQuietly(); + throw offset == 0 ? new EOFException() + : new IOException(ClickHouseUtils.format(INCOMPLETE_READ_ERROR, offset, length)); } else { - try { - close(); - } catch (IOException e) { - // ignore - } - - throw c == 0 ? new EOFException() - : new IOException(ClickHouseUtils - .format("Reached end of input stream after reading %d of %d bytes", c, length)); + offset += read; } } return bytes; } + public ClickHouseByteBuffer read(int len) throws IOException { + return len <= 0 ? byteBuffer.reset() : byteBuffer.update(readBytes(len)); + } + /** * Reads string from the input stream. {@link #readVarInt()} will be called * automatically to understand byte length of the string. @@ -493,7 +721,8 @@ public String readString(int byteLength, Charset charset) throws IOException { return ""; } - return new String(readBytes(byteLength), charset != null ? charset : StandardCharsets.UTF_8); + ClickHouseByteBuffer buf = read(byteLength); + return new String(buf.array, buf.position, buf.length, charset != null ? charset : StandardCharsets.UTF_8); } /** @@ -574,5 +803,19 @@ public int readVarInt() throws IOException { * * @return true if the input stream has been closed; false otherwise */ - public abstract boolean isClosed(); + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + // don't want to hold the last byte array reference for too long + byteBuffer.reset(); + if (afterClose != null) { + afterClose.run(); + } + } + } } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java new file mode 100644 index 000000000..7314d0775 --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java @@ -0,0 +1,324 @@ +package com.clickhouse.client; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import com.clickhouse.client.config.ClickHouseClientOption; + +public abstract class ClickHouseOutputStream extends OutputStream { + static class WrappedOutputStream extends ClickHouseOutputStream { + private final byte[] buffer; + private final OutputStream out; + + private int count; + + private void flushBuffer() throws IOException { + if (count > 0) { + out.write(buffer, 0, count); + count = 0; + } + } + + protected WrappedOutputStream(OutputStream out, int bufferSize, Runnable afterClose) { + super(afterClose); + + this.buffer = new byte[bufferSize <= 0 ? 8192 : bufferSize]; + this.out = ClickHouseChecker.nonNull(out, "OutputStream"); + + this.count = 0; + } + + protected void ensureOpen() throws IOException { + if (closed) { + throw new IOException("Cannot operate on a closed output stream"); + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + try { + flushBuffer(); + out.close(); + } finally { + super.close(); + } + } + + @Override + public void flush() throws IOException { + ensureOpen(); + + flushBuffer(); + out.flush(); + } + + @Override + public ClickHouseOutputStream writeByte(byte b) throws IOException { + ensureOpen(); + + if (count >= buffer.length) { + flushBuffer(); + } + buffer[count++] = b; + return this; + } + + @Override + public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) throws IOException { + ensureOpen(); + + int len = buffer.length; + if (length >= len) { + flushBuffer(); + out.write(bytes, offset, length); + } else { + if (length > len - count) { + flushBuffer(); + } + System.arraycopy(bytes, offset, buffer, count, length); + count += length; + } + return this; + } + } + + /** + * Wraps the given output stream. + * + * @param output non-null output stream + * @return wrapped output, or the same output if it's instance of + * {@link ClickHouseOutputStream} + */ + public static ClickHouseOutputStream of(OutputStream output) { + return of(output, (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue()); + } + + /** + * Wraps the given output stream. + * + * @param output non-null output stream + * @param bufferSize buffer size which is always greater than zero(usually 8192 + * or larger) + * @return wrapped output, or the same output if it's instance of + * {@link ClickHouseOutputStream} + */ + public static ClickHouseOutputStream of(OutputStream output, int bufferSize) { + return of(output, bufferSize, null); + } + + /** + * Wraps the given output stream. + * + * @param output non-null output stream + * @param bufferSize buffer size which is always greater than zero(usually 8192 + * or larger) + * @param afterClose custom handler will be invoked right after closing the + * output stream + * @return wrapped output, or the same output if it's instance of + * {@link ClickHouseOutputStream} + */ + public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Runnable afterClose) { + return output instanceof ClickHouseOutputStream ? (ClickHouseOutputStream) output + : new WrappedOutputStream(output, bufferSize, afterClose); + } + + protected final Runnable afterClose; + + protected boolean closed; + + protected ClickHouseOutputStream(Runnable afterClose) { + this.afterClose = afterClose; + this.closed = false; + } + + @Override + public final void write(int b) throws IOException { + writeByte((byte) (0xFF & b)); + } + + @Override + public final void write(byte[] b) throws IOException { + writeBytes(b, 0, b.length); + } + + @Override + public final void write(byte[] b, int off, int len) throws IOException { + writeBytes(b, off, len); + } + + /** + * Checks if the output stream has been closed or not. + * + * @return true if the output stream has been closed; false otherwise + */ + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + if (afterClose != null) { + afterClose.run(); + } + } + } + + /** + * Writes a single byte into output stream. + * + * @param b byte to write + * @return current output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public abstract ClickHouseOutputStream writeByte(byte b) throws IOException; + + /** + * Writes bytes into output stream. + * + * @param buffer non-null byte buffer + * @param offset relative offset of the byte buffer + * @param length bytes to write + * @return current output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeBytes(ByteBuffer buffer, int offset, int length) throws IOException { + if (buffer == null || offset < 0 || length < 0) { + throw new IllegalArgumentException("Non-null ByteBuffer and positive offset and length are required"); + } + + byte[] bytes = new byte[length]; + // read-only ByteBuffer won't allow us to call its array() method for unwrapping + buffer.get(bytes, offset, length); + return writeBytes(bytes, 0, length); + } + + /** + * Writes bytes into output stream. + * + * @param bytes non-null byte array + * @param offset offset of the byte array + * @param length bytes to write + * @return current output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public abstract ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) throws IOException; + + /** + * Writes bytes into output stream. + * + * @param buffer wrapped byte array with offset and limit + * @return current output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeBytes(ClickHouseByteBuffer buffer) throws IOException { + if (buffer == null || buffer.isEmpty()) { + return this; + } + + return writeBytes(buffer.array(), buffer.position(), buffer.limit() - buffer.position()); + } + + /** + * Writes string into the output stream. Nothing will happen when {@code value} + * is + * null or empty. + * + * @param value string to write + * @param charset charset, null is treated as {@link StandardCharsets#UTF_8} + * @return this output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeString(String value, Charset charset) throws IOException { + if (value == null || value.isEmpty()) { + return writeByte((byte) 0); + } else { + byte[] bytes = value.getBytes(charset != null ? charset : StandardCharsets.UTF_8); + int len = bytes.length; + writeVarInt(len); + return writeBytes(bytes, 0, len); + } + } + + /** + * Writes ascii string into output stream. {@link #writeVarInt(int)} will be + * called + * automatically before writing the string. + * + * @param value ascii string to write + * @return this output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeAsciiString(String value) throws IOException { + return writeString(value, StandardCharsets.US_ASCII); + } + + /** + * Writes unicode string into output stream. {@link #writeVarInt(int)} will be + * called + * automatically before writing the string. + * + * @param value unicode string to write + * @return this output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeUnicodeString(String value) throws IOException { + return writeString(value, StandardCharsets.UTF_8); + } + + /** + * Writes varint into output stream. + * + * @param value varint + * @return this output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeVarInt(int value) throws IOException { + return writeUnsignedVarInt(value); + } + + /** + * Writes varint into output stream. + * + * @param value varint + * @return this output stream + * @throws IOException when failed to write value into output stream, not able + * to sent all bytes, or opereate on a closed stream + */ + public ClickHouseOutputStream writeUnsignedVarInt(long value) throws IOException { + // https://github.com/ClickHouse/ClickHouse/blob/abe314feecd1647d7c2b952a25da7abf5c19f352/src/IO/VarInt.h#L187 + int i = 0; + for (; i < 9; i++) { + byte b = (byte) (value & 0x7F); + + if (value > 0x7F) { + b |= 0x80; + } + + value >>= 7; + writeByte(b); + + if (value == 0) { + break; + } + } + + return this; + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java index 0cee8bfed..b54cebaae 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java @@ -3,11 +3,11 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; +import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseChecker; import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseUtils; import net.jpountz.lz4.LZ4Factory; @@ -28,7 +28,6 @@ public class ClickHouseLZ4InputStream extends ClickHouseInputStream { private byte[] currentBlock; private int position; - private boolean closed; private boolean checkNext() throws IOException { if (!closed && position >= currentBlock.length) { @@ -43,7 +42,7 @@ private byte[] readNextBlock() throws IOException { // checksum(16 bytes) + 1 magic byte + header(8 bytes) if (!readFully(header, 0, HEADER_LENGTH)) { - return EMPTY_BYTES; + return ClickHouseByteBuffer.EMPTY_BYTES; } else if (header[16] != MAGIC) { // 1 byte - 0x82 (shows this is LZ4) throw new IOException( @@ -91,23 +90,48 @@ private boolean readFully(byte[] b, int off, int len) throws IOException { } public ClickHouseLZ4InputStream(InputStream stream) { + super(null); + this.decompressor = factory.fastDecompressor(); this.stream = ClickHouseChecker.nonNull(stream, "InputStream"); this.header = new byte[HEADER_LENGTH]; - this.currentBlock = EMPTY_BYTES; + this.currentBlock = ClickHouseByteBuffer.EMPTY_BYTES; this.position = 0; this.closed = false; } + @Override + public int peek() throws IOException { + return checkNext() ? 0xFF & currentBlock[position] : -1; + } + + @Override + public long pipe(ClickHouseOutputStream output) throws IOException { + long count = 0L; + if (output == null || output.isClosed()) { + return count; + } + + int remain = currentBlock.length - position; + if (remain > 0) { + output.write(currentBlock, position, remain); + position = currentBlock.length; + count += remain; + } + + while (checkNext()) { + output.write(currentBlock); + count += currentBlock.length; + } + + return count; + } + @Override public byte readByte() throws IOException { if (!checkNext()) { - try { - close(); - } catch (IOException e) { - // ignore - } + closeQuietly(); throw new EOFException(); } @@ -163,42 +187,33 @@ public int read(byte[] b, int off, int len) throws IOException { } @Override - public void close() throws IOException { - try { - stream.close(); - } finally { - closed = true; - } - } + public ClickHouseByteBuffer read(int len) throws IOException { + if (len <= 0) { + byteBuffer.reset(); + } else { + if (!checkNext()) { + throw new EOFException(); + } - @Override - public boolean isClosed() { - return closed; + int newLimit = position + len; + if (currentBlock.length >= newLimit) { + byteBuffer.update(currentBlock, position, len); + position = newLimit; + } else { + byteBuffer.update(readBytes(len)); + } + } + return byteBuffer; } @Override - public String readString(int byteLength, Charset charset) throws IOException { - if (byteLength < 1) { - return ""; - } else if (!checkNext()) { + public void close() throws IOException { + if (!closed) { try { - close(); - } catch (IOException e) { - // ignore + stream.close(); + } finally { + super.close(); } - throw new EOFException(); - } - - if (charset == null) { - charset = StandardCharsets.UTF_8; } - - if (currentBlock.length - position > byteLength) { - int offset = position; - position += byteLength; - return new String(currentBlock, offset, byteLength, charset); - } - - return new String(readBytes(byteLength), charset); } } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java index 86f468812..2496d0f55 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHousePipedStream.java @@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseUtils; @@ -81,7 +82,7 @@ public void close() throws IOException { flush(); - buffer = ClickHouseInputStream.EMPTY_BUFFER; + buffer = ClickHouseByteBuffer.EMPTY_BUFFER; try { if (timeout > 0) { if (!queue.offer(buffer, timeout, TimeUnit.MILLISECONDS)) { diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseByteBufferTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseByteBufferTest.java new file mode 100644 index 000000000..67fcbba7e --- /dev/null +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseByteBufferTest.java @@ -0,0 +1,57 @@ +package com.clickhouse.client; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ClickHouseByteBufferTest { + @Test(groups = { "unit" }) + public void testEmptyArray() { + Assert.assertEquals(ClickHouseByteBuffer.of(null), ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(null, -1, -1), ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[0]), ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[0], -1, -1), ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, 0, 0), ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, -1, 0), ClickHouseByteBuffer.newInstance()); + + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }).update(null), + ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }).update(null, -1, -1), + ClickHouseByteBuffer.newInstance()); + } + + @Test(groups = { "unit" }) + public void testInvalidValue() { + Assert.assertThrows(IllegalArgumentException.class, + () -> ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, -1, -1)); + Assert.assertThrows(IllegalArgumentException.class, + () -> ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, 0, -1)); + Assert.assertThrows(IllegalArgumentException.class, + () -> ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, 3, 1)); + } + + @Test(groups = { "unit" }) + public void testNewInstance() { + ClickHouseByteBuffer buf1 = ClickHouseByteBuffer.newInstance(); + Assert.assertEquals(buf1.array(), ClickHouseByteBuffer.EMPTY_BYTES); + Assert.assertEquals(buf1.position(), 0); + Assert.assertEquals(buf1.length(), 0); + Assert.assertEquals(buf1.limit(), 0); + + ClickHouseByteBuffer buf2 = ClickHouseByteBuffer.newInstance(); + Assert.assertEquals(buf1.array(), ClickHouseByteBuffer.EMPTY_BYTES); + Assert.assertEquals(buf1.position(), 0); + Assert.assertEquals(buf1.length(), 0); + Assert.assertEquals(buf1.limit(), 0); + + Assert.assertFalse(buf1 == buf2, "Should be different instances"); + Assert.assertEquals(buf1, buf2); + } + + @Test(groups = { "unit" }) + public void testUpdate() { + Assert.assertEquals(ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, 1, 2).reset(), + ClickHouseByteBuffer.newInstance()); + Assert.assertEquals(ClickHouseByteBuffer.newInstance().update(new byte[] { 1, 2, 3 }, 1, 2), + ClickHouseByteBuffer.of(new byte[] { 1, 2, 3 }, 1, 2)); + } +} diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseInputStreamTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseInputStreamTest.java index bdc91f32f..f41d39b3a 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseInputStreamTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseInputStreamTest.java @@ -105,7 +105,7 @@ public void testNullOrEmptyBlockingInput() throws IOException { Assert.assertThrows(IOException.class, () -> empty.readBytes(1)); Assert.assertEquals(empty.isClosed(), false); - queue.offer(ClickHouseInputStream.EMPTY_BUFFER); + queue.offer(ClickHouseByteBuffer.EMPTY_BUFFER); Assert.assertEquals(empty.available(), 0); Assert.assertEquals(empty.read(), -1); Assert.assertEquals(empty.read(), -1); @@ -132,7 +132,7 @@ public void testBlockingInput() throws IOException { queue.offer(ByteBuffer.wrap(bytes)); i += bytes.length - 1; } - queue.offer(ClickHouseInputStream.EMPTY_BUFFER); + queue.offer(ClickHouseByteBuffer.EMPTY_BUFFER); ClickHouseInputStream in = ClickHouseInputStream.of(queue, 100); for (int i = 0; i < values.length; i++) { @@ -166,7 +166,7 @@ public void testBlockingInputAsync() throws IOException { queue.offer(ByteBuffer.wrap(bytes)); i += bytes.length - 1; } - queue.offer(ClickHouseInputStream.EMPTY_BUFFER); + queue.offer(ClickHouseByteBuffer.EMPTY_BUFFER); }).start(); ClickHouseInputStream in = ClickHouseInputStream.of(queue, 0); for (int i = 0; i < values.length; i++) { diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java new file mode 100644 index 000000000..69322f72d --- /dev/null +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java @@ -0,0 +1,65 @@ +package com.clickhouse.client; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ClickHouseOutputStreamTest { + @Test(groups = { "unit" }) + public void testWriteString() throws IOException { + ByteArrayOutputStream inner = new ByteArrayOutputStream(); + ClickHouseOutputStream out = ClickHouseOutputStream.of(inner); + out.writeAsciiString(null); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[1]); + out.writeAsciiString(""); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[2]); + out.writeUnicodeString(null); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[3]); + out.writeUnicodeString(""); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[4]); + + inner = new ByteArrayOutputStream(); + out = ClickHouseOutputStream.of(inner); + out.writeAsciiString("12"); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[] { 2, 0x31, 0x32 }); + + inner = new ByteArrayOutputStream(); + out = ClickHouseOutputStream.of(inner); + out.writeUnicodeString("壹贰"); + out.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[] { 6, -27, -93, -71, -24, -76, -80 }); + } + + @Test(groups = { "unit" }) + public void testNullOrClosedOutput() throws IOException { + Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseOutputStream.of(null)); + ByteArrayOutputStream inner = new ByteArrayOutputStream(); + OutputStream out = new BufferedOutputStream(inner); + ClickHouseOutputStream empty = ClickHouseOutputStream.of(out); + Assert.assertEquals(inner.toByteArray(), new byte[0]); + Assert.assertEquals(empty.isClosed(), false); + empty.writeByte((byte) 1); + empty.writeBytes(new byte[] { 1, 2, 3 }, 1, 2); + empty.writeBytes(ByteBuffer.wrap(new byte[] { 4, 5, 6 }).asReadOnlyBuffer(), 0, 3); + empty.flush(); + Assert.assertEquals(inner.toByteArray(), new byte[] { 1, 2, 3, 4, 5, 6 }); + out.close(); + Assert.assertEquals(empty.isClosed(), false); + empty.close(); + Assert.assertEquals(empty.isClosed(), true); + Assert.assertThrows(IOException.class, () -> empty.flush()); + empty.close(); + Assert.assertEquals(empty.isClosed(), true); + Assert.assertThrows(IOException.class, () -> empty.write(1)); + } +} diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHousePipedStreamTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHousePipedStreamTest.java index 66cc6bf73..235c8d009 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHousePipedStreamTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/data/ClickHousePipedStreamTest.java @@ -9,7 +9,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseByteBuffer; import org.testng.Assert; import org.testng.annotations.Test; @@ -50,7 +50,7 @@ public void testRead() throws Exception { } stream.queue.clear(); - stream.queue.put(ClickHouseInputStream.EMPTY_BUFFER); + stream.queue.put(ClickHouseByteBuffer.EMPTY_BUFFER); Assert.assertEquals(stream.queue.size(), 1); try (InputStream in = stream.getInput()) { Assert.assertEquals(in.read(), -1); @@ -58,7 +58,7 @@ public void testRead() throws Exception { stream.queue.put((ByteBuffer) ((Buffer) buf).rewind()); // stream.queue.put(buf); - stream.queue.put(ClickHouseInputStream.EMPTY_BUFFER); + stream.queue.put(ClickHouseByteBuffer.EMPTY_BUFFER); Assert.assertEquals(stream.queue.size(), 2); try (InputStream in = stream.getInput()) { Assert.assertEquals(in.read(), 3); @@ -122,7 +122,7 @@ public void testReadBytes() throws Exception { buf = ByteBuffer.allocate(2).put(new byte[] { (byte) 3, (byte) 4 }); stream.queue.put((ByteBuffer) ((Buffer) buf).rewind()); - stream.queue.put(ClickHouseInputStream.EMPTY_BUFFER); + stream.queue.put(ClickHouseByteBuffer.EMPTY_BUFFER); Assert.assertEquals(stream.queue.size(), 2); try (InputStream in = stream.getInput()) { Assert.assertEquals(in.read(bytes, 0, 3), 2); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpClient.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpClient.java index 02c4ac00c..9d97d5f35 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpClient.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpClient.java @@ -87,9 +87,8 @@ protected ClickHouseResponse postRequest(ClickHouseRequest sealedRequest) thr log.debug("Query: %s", sql); ClickHouseHttpResponse httpResponse = conn.post(sql, sealedRequest.getInputStream().orElse(null), sealedRequest.getExternalTables(), null); - return ClickHouseStreamResponse.of(httpResponse.getConfig(sealedRequest), httpResponse, - sealedRequest.getSettings(), null, - httpResponse.summary); + return ClickHouseStreamResponse.of(httpResponse.getConfig(sealedRequest), httpResponse.getInputStream(), + sealedRequest.getSettings(), null, httpResponse.summary); } @Override diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index df2c34d19..3e033884e 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -20,7 +20,6 @@ import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; -import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseUtils; @@ -237,13 +236,13 @@ protected OutputStream getRequestOutputStream(OutputStream out) throws IOExcepti return out; } - protected ClickHouseInputStream getResponseInputStream(InputStream in) throws IOException { + protected InputStream getResponseInputStream(InputStream in) throws IOException { if (config.isCompressServerResponse()) { // TODO support more algorithms ClickHouseCompression algorithm = config.getCompressAlgorithmForServerResponse(); switch (algorithm) { case GZIP: - in = ClickHouseInputStream.of(new GZIPInputStream(in)); + in = new GZIPInputStream(in); break; case LZ4: in = new ClickHouseLZ4InputStream(in); @@ -253,7 +252,7 @@ protected ClickHouseInputStream getResponseInputStream(InputStream in) throws IO } } - return ClickHouseInputStream.of(in, config.getMaxBufferSize()); + return in; } /** diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java index 5abbc1259..31d247868 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java @@ -1,8 +1,7 @@ package com.clickhouse.client.http; -import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; -import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; @@ -17,7 +16,7 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseOption; -public class ClickHouseHttpResponse extends ClickHouseInputStream { +public class ClickHouseHttpResponse { private static long getLongValue(Map map, String key) { String value = map.get(key); if (value != null) { @@ -40,7 +39,15 @@ private static long getLongValue(Map map, String key) { protected final ClickHouseResponseSummary summary; - private boolean closed; + protected void closeConnection() { + if (!connection.isReusable()) { + try { + connection.close(); + } catch (Exception e) { + // ignore + } + } + } protected ClickHouseConfig getConfig(ClickHouseRequest request) { ClickHouseConfig config = request.getConfig(); @@ -54,14 +61,14 @@ protected ClickHouseConfig getConfig(ClickHouseRequest request) { return config; } - public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInputStream input, - String serverDisplayName, String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) { + public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream input, String serverDisplayName, + String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) { if (connection == null || input == null) { throw new IllegalArgumentException("Non-null connection and input stream are required"); } this.connection = connection; - this.input = input; + this.input = ClickHouseInputStream.of(input, connection.config.getMaxBufferSize(), this::closeConnection); this.serverDisplayName = !ClickHouseChecker.isNullOrEmpty(serverDisplayName) ? serverDisplayName : connection.server.getHost(); @@ -78,76 +85,9 @@ public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInp this.format = format != null ? format : connection.config.getFormat(); this.timeZone = timeZone != null ? timeZone : connection.config.getServerTimeZone(); - - closed = false; - } - - @Override - public byte readByte() throws IOException { - return input.readByte(); - } - - @Override - public int read() throws IOException { - return input.read(); - } - - @Override - public int available() throws IOException { - return input.available(); - } - - @Override - public boolean isClosed() { - return closed; - } - - @Override - public void close() throws IOException { - IOException error = null; - - try { - input.close(); - } catch (IOException e) { - error = e; - } - closed = true; - - if (!connection.isReusable()) { - try { - connection.close(); - } catch (Exception e) { - // ignore - } - } - - if (error != null) { - throw error; - } - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return input.read(b, off, len); - } - - @Override - public long skip(long n) throws IOException { - return input.skip(n); - } - - @Override - public byte[] readBytes(int length) throws IOException { - return input.readBytes(length); } - @Override - public String readString(int byteLength, Charset charset) throws IOException { - return input.readString(byteLength, charset); + public ClickHouseInputStream getInputStream() { + return input; } } diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseResponseHandler.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseResponseHandler.java index 6c46b4b23..e739d3250 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseResponseHandler.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/ClickHouseResponseHandler.java @@ -13,11 +13,12 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; +import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseInputStream; public class ClickHouseResponseHandler implements BodySubscriber { // An immutable ByteBuffer sentinel to mark that the last byte was received. - private static final List LAST_LIST = List.of(ClickHouseInputStream.EMPTY_BUFFER); + private static final List LAST_LIST = List.of(ClickHouseByteBuffer.EMPTY_BUFFER); private final BlockingQueue buffers; private final ClickHouseInputStream in; @@ -72,7 +73,7 @@ public void onNext(List item) { @Override public void onError(Throwable throwable) { - buffers.offer(ClickHouseInputStream.EMPTY_BUFFER); + buffers.offer(ClickHouseByteBuffer.EMPTY_BUFFER); } @Override From 105de1330beb818a4712aab632a4c7d0f2b8fc60 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 21 Feb 2022 23:56:40 +0800 Subject: [PATCH 5/6] Validate JDK 11 HttpClient-based impementation as well --- .github/workflows/build.yml | 3 ++- .../test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 15e8fd27d..c0151718d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -34,7 +34,8 @@ jobs: matrix: # most recent LTS releases as well as latest stable builds clickhouse: ["21.3", "21.8", "latest"] - protocol: ["http", "grpc"] + # http2 here represents http protocol + JDK HttpClient(http_connection_provider=HTTP_CLIENT) + protocol: ["http", "http2", "grpc"] exclude: - clickhouse: "21.3" protocol: grpc diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java index 700e95fc9..4b0f3b855 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/JdbcIntegrationTest.java @@ -10,13 +10,15 @@ import com.clickhouse.client.BaseIntegrationTest; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.http.config.ClickHouseHttpOption; public abstract class JdbcIntegrationTest extends BaseIntegrationTest { private static final String CLASS_PREFIX = "ClickHouse"; private static final String CLASS_SUFFIX = "Test"; + protected static final String CUSTOM_PROTOCOL_NAME = System.getProperty("protocol", "http").toUpperCase(); protected static final ClickHouseProtocol DEFAULT_PROTOCOL = ClickHouseProtocol - .valueOf(System.getProperty("protocol", "http").toUpperCase()); + .valueOf(CUSTOM_PROTOCOL_NAME.startsWith("HTTP") ? "HTTP" : CUSTOM_PROTOCOL_NAME); protected final String dbName; @@ -48,6 +50,9 @@ protected String buildJdbcUrl(ClickHouseProtocol protocol, String prefix, String builder.append(url); } + if ("HTTP2".equals(CUSTOM_PROTOCOL_NAME)) { + builder.append('?').append(ClickHouseHttpOption.CONNECTION_PROVIDER.getKey()).append("=HTTP_CLIENT"); + } return builder.toString(); } From db4e219c2f99d0c70fbbe42a7ad35ded7c7c384b Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Tue, 22 Feb 2022 09:48:30 +0800 Subject: [PATCH 6/6] Call HttpURLConnection.disconnect when closing response, and avoid double buffering --- .../client/data/ClickHouseLZ4InputStream.java | 6 ++++- .../client/http/ClickHouseHttpConnection.java | 24 +++++++++++++++---- .../client/http/ClickHouseHttpResponse.java | 17 +++---------- .../client/http/HttpUrlConnectionImpl.java | 4 ++-- .../client/http/HttpClientConnectionImpl.java | 4 ++-- 5 files changed, 32 insertions(+), 23 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java index b54cebaae..bcb57aa6b 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseLZ4InputStream.java @@ -90,7 +90,11 @@ private boolean readFully(byte[] b, int off, int len) throws IOException { } public ClickHouseLZ4InputStream(InputStream stream) { - super(null); + this(stream, null); + } + + public ClickHouseLZ4InputStream(InputStream stream, Runnable afterClose) { + super(afterClose); this.decompressor = factory.fastDecompressor(); this.stream = ClickHouseChecker.nonNull(stream, "InputStream"); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index 3e033884e..da0ddc85d 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -20,6 +20,7 @@ import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseUtils; @@ -203,6 +204,14 @@ protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest r this.defaultHeaders = Collections.unmodifiableMap(map); } + protected void closeQuietly() { + try { + close(); + } catch (Exception e) { + // ignore + } + } + protected String getBaseUrl() { String baseUrl; int index = url.indexOf('?'); @@ -236,23 +245,30 @@ protected OutputStream getRequestOutputStream(OutputStream out) throws IOExcepti return out; } - protected InputStream getResponseInputStream(InputStream in) throws IOException { + protected ClickHouseInputStream getResponseInputStream(InputStream in) throws IOException { + Runnable afterClose = null; + if (!isReusable()) { + afterClose = this::closeQuietly; + } + ClickHouseInputStream chInput; if (config.isCompressServerResponse()) { // TODO support more algorithms ClickHouseCompression algorithm = config.getCompressAlgorithmForServerResponse(); switch (algorithm) { case GZIP: - in = new GZIPInputStream(in); + chInput = ClickHouseInputStream.of(new GZIPInputStream(in), config.getMaxBufferSize(), afterClose); break; case LZ4: - in = new ClickHouseLZ4InputStream(in); + chInput = new ClickHouseLZ4InputStream(in, afterClose); break; default: throw new UnsupportedOperationException("Unsupported compression algorithm: " + algorithm); } + } else { + chInput = ClickHouseInputStream.of(in, config.getMaxBufferSize(), afterClose); } - return in; + return chInput; } /** diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java index 31d247868..3c200460f 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpResponse.java @@ -1,6 +1,5 @@ package com.clickhouse.client.http; -import java.io.InputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -39,16 +38,6 @@ private static long getLongValue(Map map, String key) { protected final ClickHouseResponseSummary summary; - protected void closeConnection() { - if (!connection.isReusable()) { - try { - connection.close(); - } catch (Exception e) { - // ignore - } - } - } - protected ClickHouseConfig getConfig(ClickHouseRequest request) { ClickHouseConfig config = request.getConfig(); if (format != null && format != config.getFormat()) { @@ -61,14 +50,14 @@ protected ClickHouseConfig getConfig(ClickHouseRequest request) { return config; } - public ClickHouseHttpResponse(ClickHouseHttpConnection connection, InputStream input, String serverDisplayName, - String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) { + public ClickHouseHttpResponse(ClickHouseHttpConnection connection, ClickHouseInputStream input, + String serverDisplayName, String queryId, String summary, ClickHouseFormat format, TimeZone timeZone) { if (connection == null || input == null) { throw new IllegalArgumentException("Non-null connection and input stream are required"); } this.connection = connection; - this.input = ClickHouseInputStream.of(input, connection.config.getMaxBufferSize(), this::closeConnection); + this.input = input; this.serverDisplayName = !ClickHouseChecker.isNullOrEmpty(serverDisplayName) ? serverDisplayName : connection.server.getHost(); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java index 587e27de2..2eadc788b 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java @@ -117,8 +117,8 @@ private void checkResponse(HttpURLConnection conn) throws IOException { // TODO get exception from response header, for example: // X-ClickHouse-Exception-Code: 47 StringBuilder builder = new StringBuilder(); - try (Reader reader = new BufferedReader( - new InputStreamReader(getResponseInputStream(conn.getErrorStream()), StandardCharsets.UTF_8))) { + try (Reader reader = new InputStreamReader(getResponseInputStream(conn.getErrorStream()), + StandardCharsets.UTF_8)) { int c = 0; while ((c = reader.read()) != -1) { builder.append((char) c); diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java index 8658e985c..8b6bfb4f4 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java @@ -76,8 +76,7 @@ private HttpResponse checkResponse(HttpResponse r) thr // TODO get exception from response header, for example: // X-ClickHouse-Exception-Code: 47 StringBuilder builder = new StringBuilder(); - try (Reader reader = new BufferedReader( - new InputStreamReader(getResponseInputStream(r.body()), StandardCharsets.UTF_8))) { + try (Reader reader = new InputStreamReader(getResponseInputStream(r.body()), StandardCharsets.UTF_8)) { int c = 0; while ((c = reader.read()) != -1) { builder.append((char) c); @@ -278,5 +277,6 @@ public boolean ping(int timeout) { @Override public void close() { + // nothing } }