Skip to content

Commit

Permalink
Merge pull request #787 from zhicwu/enhance-jdbc
Browse files Browse the repository at this point in the history
Fix minor issues found after 0.3.2 pre-release
  • Loading branch information
zhicwu authored Dec 30, 2021
2 parents 08058fe + 70358b1 commit 1795272
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class ClickHouseColumn implements Serializable {
private static final String KEYWORD_NULLABLE = "Nullable";
private static final String KEYWORD_LOW_CARDINALITY = "LowCardinality";
private static final String KEYWORD_AGGREGATE_FUNCTION = ClickHouseDataType.AggregateFunction.name();
private static final String KEYWORD_SIMPLE_AGGREGATE_FUNCTION = ClickHouseDataType.SimpleAggregateFunction.name();
private static final String KEYWORD_ARRAY = ClickHouseDataType.Array.name();
private static final String KEYWORD_TUPLE = ClickHouseDataType.Tuple.name();
private static final String KEYWORD_MAP = ClickHouseDataType.Map.name();
Expand Down Expand Up @@ -140,8 +141,10 @@ protected static int readColumn(String args, int startIndex, int len, String nam
brackets++;
}

if (args.startsWith(KEYWORD_AGGREGATE_FUNCTION, i)) {
int index = args.indexOf('(', i + KEYWORD_AGGREGATE_FUNCTION.length());
String matchedKeyword;
if (args.startsWith((matchedKeyword = KEYWORD_AGGREGATE_FUNCTION), i)
|| args.startsWith((matchedKeyword = KEYWORD_SIMPLE_AGGREGATE_FUNCTION), i)) {
int index = args.indexOf('(', i + matchedKeyword.length());
if (index < i) {
throw new IllegalArgumentException("Missing function parameters");
}
Expand All @@ -160,8 +163,8 @@ protected static int readColumn(String args, int startIndex, int len, String nam
nestedColumns.add(ClickHouseColumn.of("", p));
}
}
column = new ClickHouseColumn(ClickHouseDataType.AggregateFunction, name, args.substring(startIndex, i),
nullable, lowCardinality, params, nestedColumns);
column = new ClickHouseColumn(ClickHouseDataType.valueOf(matchedKeyword), name,
args.substring(startIndex, i), nullable, lowCardinality, params, nestedColumns);
column.aggFuncType = aggFunc;
} else if (args.startsWith(KEYWORD_ARRAY, i)) {
int index = args.indexOf('(', i + KEYWORD_ARRAY.length());
Expand Down Expand Up @@ -395,7 +398,9 @@ private ClickHouseColumn(ClickHouseDataType dataType, String columnName, String
}

public boolean isAggregateFunction() {
// || dataType == ClickHouseDataType.SimpleAggregateFunction;
return dataType == ClickHouseDataType.AggregateFunction;

}

public boolean isArray() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public enum ClickHouseDataType {
"NATIONAL CHARACTER", "NATIONAL CHARACTER LARGE OBJECT", "NATIONAL CHARACTER VARYING", "NCHAR",
"NCHAR LARGE OBJECT", "NCHAR VARYING", "NVARCHAR", "TEXT", "TINYBLOB", "TINYTEXT", "VARCHAR", "VARCHAR2"),
AggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0), // implementation-defined intermediate state
SimpleAggregateFunction(String.class, true, true, false, 0, 0, 0, 0, 0),
Array(Object.class, true, true, false, 0, 0, 0, 0, 0), Map(Map.class, true, true, false, 0, 0, 0, 0, 0),
Nested(Object.class, true, true, false, 0, 0, 0, 0, 0), Tuple(List.class, true, true, false, 0, 0, 0, 0, 0),
Point(Object.class, false, true, true, 33, 0, 0, 0, 0), // same as Tuple(Float64, Float64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.TimeZone;
Expand Down Expand Up @@ -126,40 +125,41 @@ public ClickHouseDateTimeValue copy(boolean deep) {

@Override
public byte asByte() {
return isNullOrEmpty() ? (byte) 0 : (byte) getValue().toEpochSecond(ZoneOffset.UTC);
return isNullOrEmpty() ? (byte) 0 : (byte) getValue().atZone(tz.toZoneId()).toEpochSecond();
}

@Override
public short asShort() {
return isNullOrEmpty() ? (short) 0 : (short) getValue().toEpochSecond(ZoneOffset.UTC);
return isNullOrEmpty() ? (short) 0 : (short) getValue().atZone(tz.toZoneId()).toEpochSecond();
}

@Override
public int asInteger() {
return isNullOrEmpty() ? 0 : (int) getValue().toEpochSecond(ZoneOffset.UTC);
return isNullOrEmpty() ? 0 : (int) getValue().atZone(tz.toZoneId()).toEpochSecond();
}

@Override
public long asLong() {
return isNullOrEmpty() ? 0L : getValue().toEpochSecond(ZoneOffset.UTC);
return isNullOrEmpty() ? 0L : getValue().atZone(tz.toZoneId()).toEpochSecond();
}

@Override
public float asFloat() {
return isNullOrEmpty() ? 0F
: getValue().toEpochSecond(ZoneOffset.UTC) + getValue().getNano() / ClickHouseValues.NANOS.floatValue();
: getValue().atZone(tz.toZoneId()).toEpochSecond()
+ getValue().getNano() / ClickHouseValues.NANOS.floatValue();
}

@Override
public double asDouble() {
return isNullOrEmpty() ? 0D
: getValue().toEpochSecond(ZoneOffset.UTC)
: getValue().atZone(tz.toZoneId()).toEpochSecond()
+ getValue().getNano() / ClickHouseValues.NANOS.doubleValue();
}

@Override
public BigInteger asBigInteger() {
return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().toEpochSecond(ZoneOffset.UTC));
return isNullOrEmpty() ? null : BigInteger.valueOf(getValue().atZone(tz.toZoneId()).toEpochSecond());
}

@Override
Expand All @@ -168,7 +168,7 @@ public BigDecimal asBigDecimal(int scale) {
BigDecimal v = null;
if (value != null) {
int nanoSeconds = value.getNano();
v = new BigDecimal(BigInteger.valueOf(value.toEpochSecond(ZoneOffset.UTC)), scale);
v = new BigDecimal(BigInteger.valueOf(value.atZone(tz.toZoneId()).toEpochSecond()), scale);
if (scale != 0 && nanoSeconds != 0) {
v = v.add(BigDecimal.valueOf(nanoSeconds).divide(ClickHouseValues.NANOS).setScale(scale,
RoundingMode.HALF_UP));
Expand Down
15 changes: 15 additions & 0 deletions clickhouse-jdbc/legacy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,8 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res
if (parsedStmt.hasTempTable()) {
// non-insert queries using temp table
ps = new TableBasedPreparedStatement(this,
clientRequest.write().query(parsedStmt.getSQL(), newQueryId()),
parsedStmt.getTempTables(), resultSetType,
resultSetConcurrency, resultSetHoldability);
clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), parsedStmt,
resultSetType, resultSetConcurrency, resultSetHoldability);
} else if (parsedStmt.getStatementType() == StatementType.INSERT) {
if (!ClickHouseChecker.isNullOrBlank(parsedStmt.getInput())) {
// insert query using input function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map.Entry;

import com.clickhouse.client.ClickHouseChecker;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseRequest;
Expand All @@ -30,6 +31,7 @@
import com.clickhouse.jdbc.SqlExceptionUtils;
import com.clickhouse.jdbc.JdbcWrapper;
import com.clickhouse.jdbc.parser.ClickHouseSqlStatement;
import com.clickhouse.jdbc.parser.StatementType;

public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseStatement {
private static final Logger log = LoggerFactory.getLogger(ClickHouseStatementImpl.class);
Expand All @@ -50,7 +52,7 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt
private int maxFieldSize;
private int maxRows;
private boolean poolable;
private String queryId;
private volatile String queryId;
private int queryTimeout;

private ClickHouseResultSet currentResult;
Expand Down Expand Up @@ -149,6 +151,7 @@ protected int executeInsert(String sql, InputStream input) throws SQLException {
try (ClickHouseResponse resp = request.write().query(sql, queryId = connection.newQueryId())
.format(ClickHouseFormat.RowBinary).data(input).execute()
.get()) {
updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp);
summary = resp.getSummary();
} catch (InterruptedException e) {
log.error("can not close stream: %s", e.getMessage());
Expand Down Expand Up @@ -197,6 +200,9 @@ protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse
rs = currentResult;
} else {
currentUpdateCount = response.getSummary().getUpdateCount();
if (currentUpdateCount <= 0) {
currentUpdateCount = 1;
}
response.close();
}

Expand Down Expand Up @@ -362,11 +368,19 @@ public void setQueryTimeout(int seconds) throws SQLException {

@Override
public void cancel() throws SQLException {
if (this.queryId == null || isClosed()) {
final String qid;
if ((qid = this.queryId) == null || isClosed()) {
return;
}

executeQuery(String.format("KILL QUERY WHERE query_id='%s'", queryId));
ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid))
.whenComplete((summary, exception) -> {
if (exception != null) {
log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage());
} else if (summary != null) {
log.debug("Killed query [%s]", qid);
}
});
}

@Override
Expand Down Expand Up @@ -500,8 +514,10 @@ public int[] executeBatch() throws SQLException {
int len = batchStmts.size();
int[] results = new int[len];
for (int i = 0; i < len; i++) {
try (ClickHouseResponse r = executeStatement(batchStmts.get(i), null, null, null)) {
results[i] = (int) r.getSummary().getWrittenRows();
ClickHouseSqlStatement s = batchStmts.get(i);
try (ClickHouseResponse r = executeStatement(s, null, null, null)) {
updateResult(s, r);
results[i] = currentUpdateCount <= 0 ? 0 : currentUpdateCount;
} catch (Exception e) {
results[i] = EXECUTE_FAILED;
log.error("Faled to execute task %d of %d", i + 1, len, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public int executeUpdate() throws SQLException {
ensureParams();

addBatch();
return executeBatch()[0];
int row = getUpdateCount();
return row > 0 ? row : 0;
}

@Override
Expand Down
Loading

0 comments on commit 1795272

Please sign in to comment.