Skip to content

Commit

Permalink
Enhance parser to recognize idempotent operations
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Feb 13, 2021
1 parent 4c51d65 commit 91c0059
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient;

import ru.yandex.clickhouse.jdbc.parser.ClickHouseSqlParser;
import ru.yandex.clickhouse.jdbc.parser.ClickHouseSqlStatement;
import ru.yandex.clickhouse.jdbc.parser.StatementType;
import ru.yandex.clickhouse.response.ClickHouseResponse;
Expand All @@ -55,7 +54,6 @@ public class ClickHousePreparedStatementImpl extends ClickHouseStatementImpl imp

private final TimeZone dateTimeZone;
private final TimeZone dateTimeTimeZone;
private final ClickHouseSqlStatement parsedSql;
private final String sql;
private final List<String> sqlParts;
private final ClickHousePreparedStatementParameter[] binds;
Expand All @@ -68,9 +66,11 @@ public ClickHousePreparedStatementImpl(CloseableHttpClient client,
TimeZone serverTimeZone, int resultSetType) throws SQLException
{
super(client, connection, properties, resultSetType);
this.parsedSql = ClickHouseSqlParser.parseSingleStatement(sql, properties);
parseSingleStatement(sql);

this.sql = sql;
PreparedStatementParser parser = PreparedStatementParser.parse(sql);
PreparedStatementParser parser = PreparedStatementParser.parse(sql,
parsedStmt.getEndPosition(ClickHouseSqlStatement.KEYWORD_VALUES));
this.parameterList = parser.getParameters();
this.insertBatchMode = parser.isValuesMode();
this.sqlParts = parser.getParts();
Expand Down Expand Up @@ -352,8 +352,8 @@ public int[] executeBatch() throws SQLException {
@Override
public int[] executeBatch(Map<ClickHouseQueryParam, String> additionalDBParams) throws SQLException {
int valuePosition = -1;
if (parsedSql.getStatementType() == StatementType.INSERT && parsedSql.hasValues()) {
valuePosition = parsedSql.getStartPosition(ClickHouseSqlStatement.KEYWORD_VALUES);
if (parsedStmt.getStatementType() == StatementType.INSERT && parsedStmt.hasValues()) {
valuePosition = parsedStmt.getStartPosition(ClickHouseSqlStatement.KEYWORD_VALUES);
} else {
Matcher matcher = VALUES.matcher(sql);
if (matcher.find()) {
Expand Down Expand Up @@ -442,7 +442,6 @@ public ResultSetMetaData getMetaData() throws SQLException {
return currentResult.getMetaData();
}

ClickHouseSqlStatement parsedStmt = ClickHouseSqlParser.parseSingleStatement(sql, properties);
if (!parsedStmt.isQuery() || (!parsedStmt.isRecognized() && !isSelect(sql))) {
return null;
}
Expand Down
126 changes: 98 additions & 28 deletions src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class ClickHouseStatementImpl extends ConfigurableApi<ClickHouseStatement

private volatile String queryId;

protected ClickHouseSqlStatement parsedStmt;

/**
* Current database name may be changed by {@link java.sql.Connection#setCatalog(String)}
* between creation of this object and query execution, but javadoc does not allow
Expand All @@ -101,6 +103,43 @@ public class ClickHouseStatementImpl extends ConfigurableApi<ClickHouseStatement
@Deprecated
private static final String databaseKeyword = "CREATE DATABASE";

@Deprecated
protected void parseSingleStatement(String sql) throws SQLException {
this.parsedStmt = null;
ClickHouseSqlStatement[] stmts = ClickHouseSqlParser.parse(sql, properties);

if (stmts.length == 1) {
this.parsedStmt = stmts[0];
} else {
this.parsedStmt = new ClickHouseSqlStatement(sql, StatementType.UNKNOWN);
// throw new SQLException("Multiple statements are not supported.");
}

if (this.parsedStmt.isIdemponent()) {
httpContext.setAttribute("is_idempotent", Boolean.TRUE);
} else {
httpContext.removeAttribute("is_idempotent");
}
}

@Deprecated
private void parseSingleStatement(String sql, ClickHouseFormat preferredFormat) throws SQLException {
parseSingleStatement(sql);

if (parsedStmt.isQuery() && !parsedStmt.hasFormat()) {
String format = preferredFormat.name();
Map<String, Integer> positions = new HashMap<>();
positions.putAll(parsedStmt.getPositions());
positions.put(ClickHouseSqlStatement.KEYWORD_FORMAT, sql.length());

sql = new StringBuilder(parsedStmt.getSQL()).append("\nFORMAT ").append(format).append(';')
.toString();
parsedStmt = new ClickHouseSqlStatement(sql, parsedStmt.getStatementType(),
parsedStmt.getCluster(), parsedStmt.getDatabase(), parsedStmt.getTable(),
format, parsedStmt.getOutfile(), parsedStmt.getParameters(), positions);
}
}

public ClickHouseStatementImpl(CloseableHttpClient client, ClickHouseConnection connection,
ClickHouseProperties properties, int resultSetType) {
super(null);
Expand Down Expand Up @@ -141,21 +180,23 @@ public ResultSet executeQuery(String sql,
}
additionalDBParams.put(ClickHouseQueryParam.EXTREMES, "0");

InputStream is = getInputStream(sql, additionalDBParams, externalData, additionalRequestParams);
parseSingleStatement(sql, ClickHouseFormat.TabSeparatedWithNamesAndTypes);
if (!parsedStmt.isRecognized() && isSelect(sql)) {
Map<String, Integer> positions = new HashMap<>();
String dbName = extractDBName(sql);
String tableName = extractTableName(sql);
if (extractWithTotals(sql)) {
positions.put(ClickHouseSqlStatement.KEYWORD_TOTALS, 1);
}
parsedStmt = new ClickHouseSqlStatement(sql, StatementType.SELECT,
null, dbName, tableName, null, null, null, positions);
// httpContext.setAttribute("is_idempotent", Boolean.TRUE);
}

ClickHouseSqlStatement parsedStmt = ClickHouseSqlParser.parseSingleStatement(sql, properties);
InputStream is = getInputStream(sql, additionalDBParams, externalData, additionalRequestParams);

try {
if (parsedStmt.isQuery() || (!parsedStmt.isRecognized() && isSelect(sql))) {
if (!parsedStmt.isRecognized()) {
Map<String, Integer> positions = new HashMap<>();
String dbName = extractDBName(sql);
String tableName = extractTableName(sql);
if (extractWithTotals(sql)) {
positions.put(ClickHouseSqlStatement.KEYWORD_TOTALS, 1);
}
parsedStmt = new ClickHouseSqlStatement(sql, StatementType.SELECT,
null, dbName, tableName, null, null, positions);
}
if (parsedStmt.isQuery()) {
currentUpdateCount = -1;
currentResult = createResultSet(properties.isCompress()
? new ClickHouseLZ4Stream(is) : is, properties.getBufferSize(),
Expand Down Expand Up @@ -193,8 +234,15 @@ public ClickHouseResponse executeQueryClickhouseResponse(String sql, Map<ClickHo
public ClickHouseResponse executeQueryClickhouseResponse(String sql,
Map<ClickHouseQueryParam, String> additionalDBParams,
Map<String, String> additionalRequestParams) throws SQLException {
parseSingleStatement(sql, ClickHouseFormat.JSONCompact);
if (parsedStmt.isRecognized()) {
sql = parsedStmt.getSQL();
} else {
sql = addFormatIfAbsent(sql, ClickHouseFormat.JSONCompact);
}

InputStream is = getInputStream(
addFormatIfAbsent(sql, properties, ClickHouseFormat.JSONCompact),
sql,
additionalDBParams,
null,
additionalRequestParams
Expand Down Expand Up @@ -223,15 +271,27 @@ public ClickHouseRowBinaryInputStream executeQueryClickhouseRowBinaryStream(Stri

@Override
public ClickHouseRowBinaryInputStream executeQueryClickhouseRowBinaryStream(String sql, Map<ClickHouseQueryParam, String> additionalDBParams, Map<String, String> additionalRequestParams) throws SQLException {
parseSingleStatement(sql, ClickHouseFormat.RowBinary);
if (parsedStmt.isRecognized()) {
sql = parsedStmt.getSQL();
} else {
sql = addFormatIfAbsent(sql, ClickHouseFormat.RowBinary);
if (isSelect(sql)) {
parsedStmt = new ClickHouseSqlStatement(sql, StatementType.SELECT);
// httpContext.setAttribute("is_idempotent", Boolean.TRUE);
} else {
parsedStmt = new ClickHouseSqlStatement(sql, StatementType.UNKNOWN);
}
}

InputStream is = getInputStream(
addFormatIfAbsent(sql, properties, ClickHouseFormat.RowBinary),
sql,
additionalDBParams,
null,
additionalRequestParams
);
ClickHouseSqlStatement parsedStmt = ClickHouseSqlParser.parseSingleStatement(sql, properties);
try {
if (parsedStmt.isQuery() || (!parsedStmt.isRecognized() && isSelect(sql))) {
if (parsedStmt.isQuery()) {
currentUpdateCount = -1;
currentRowBinaryResult = new ClickHouseRowBinaryInputStream(properties.isCompress()
? new ClickHouseLZ4Stream(is) : is, getConnection().getTimeZone(), properties);
Expand All @@ -249,6 +309,8 @@ public ClickHouseRowBinaryInputStream executeQueryClickhouseRowBinaryStream(Stri

@Override
public int executeUpdate(String sql) throws SQLException {
parseSingleStatement(sql, ClickHouseFormat.TabSeparatedWithNamesAndTypes);

InputStream is = null;
try {
is = getInputStream(sql, null, null, null);
Expand Down Expand Up @@ -490,24 +552,20 @@ public ClickHouseResponseSummary getResponseSummary() {

@Deprecated
static String clickhousifySql(String sql) {
return clickhousifySql(sql, null);
}

static String clickhousifySql(String sql, ClickHouseProperties properties) {
return addFormatIfAbsent(sql, properties, ClickHouseFormat.TabSeparatedWithNamesAndTypes);
return addFormatIfAbsent(sql, ClickHouseFormat.TabSeparatedWithNamesAndTypes);
}

/**
* Adding FORMAT TabSeparatedWithNamesAndTypes if not added
* adds format only to select queries
*/
private static String addFormatIfAbsent(final String sql, ClickHouseProperties properties, ClickHouseFormat format) {
@Deprecated
private static String addFormatIfAbsent(final String sql, ClickHouseFormat format) {
String cleanSQL = sql.trim();
ClickHouseSqlStatement parsedStmt = ClickHouseSqlParser.parseSingleStatement(cleanSQL, properties);
if (!parsedStmt.isQuery() || (!parsedStmt.isRecognized() && !isSelect(cleanSQL))) {
if (!isSelect(cleanSQL)) {
return cleanSQL;
}
if (parsedStmt.hasFormat() || (!parsedStmt.isRecognized() && ClickHouseFormat.containsFormat(cleanSQL))) {
if (ClickHouseFormat.containsFormat(cleanSQL)) {
return cleanSQL;
}
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -542,6 +600,7 @@ static boolean isSelect(String sql) {
return false;
}

@Deprecated
private String extractTableName(String sql) {
String s = extractDBAndTableName(sql);
if (s.contains(".")) {
Expand All @@ -551,6 +610,7 @@ private String extractTableName(String sql) {
}
}

@Deprecated
private String extractDBName(String sql) {
String s = extractDBAndTableName(sql);
if (s.contains(".")) {
Expand All @@ -560,6 +620,7 @@ private String extractDBName(String sql) {
}
}

@Deprecated
private String extractDBAndTableName(String sql) {
if (Utils.startsWithIgnoreCase(sql, "select")) {
String withoutStrings = Utils.retainUnquoted(sql, '\'');
Expand All @@ -582,6 +643,7 @@ private String extractDBAndTableName(String sql) {
return "system.unknown";
}

@Deprecated
private boolean extractWithTotals(String sql) {
if (Utils.startsWithIgnoreCase(sql, "select")) {
String withoutStrings = Utils.retainUnquoted(sql, '\'');
Expand All @@ -596,15 +658,23 @@ private InputStream getInputStream(
List<ClickHouseExternalData> externalData,
Map<String, String> additionalRequestParams
) throws ClickHouseException {
sql = clickhousifySql(sql, properties);
boolean ignoreDatabase = false;
if (parsedStmt.isRecognized()) {
sql = parsedStmt.getSQL();
// TODO consider more scenarios like drop, show etc.
ignoreDatabase = parsedStmt.getStatementType() == StatementType.CREATE
&& parsedStmt.containsKeyword(ClickHouseSqlStatement.KEYWORD_DATABASE);
} else {
sql = clickhousifySql(sql);
ignoreDatabase = sql.trim().regionMatches(true, 0, databaseKeyword, 0, databaseKeyword.length());
}
log.debug("Executing SQL: {}", sql);

additionalClickHouseDBParams = addQueryIdTo(
additionalClickHouseDBParams == null
? new EnumMap<ClickHouseQueryParam, String>(ClickHouseQueryParam.class)
: additionalClickHouseDBParams);

boolean ignoreDatabase = sql.trim().regionMatches(true, 0, databaseKeyword, 0, databaseKeyword.length());
URI uri;
if (externalData == null || externalData.isEmpty()) {
uri = buildRequestUri(
Expand Down
24 changes: 8 additions & 16 deletions src/main/java/ru/yandex/clickhouse/PreparedStatementParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import ru.yandex.clickhouse.jdbc.parser.ClickHouseSqlParser;
import ru.yandex.clickhouse.jdbc.parser.ClickHouseSqlStatement;
import ru.yandex.clickhouse.jdbc.parser.StatementType;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.util.apache.StringUtils;

/**
Expand All @@ -36,15 +32,16 @@ private PreparedStatementParser() {

@Deprecated
static PreparedStatementParser parse(String sql) {
return parse(sql, null);
return parse(sql, -1);
}

static PreparedStatementParser parse(String sql, ClickHouseProperties properties) {
@Deprecated
static PreparedStatementParser parse(String sql, int valuesEndPosition) {
if (StringUtils.isBlank(sql)) {
throw new IllegalArgumentException("SQL may not be blank");
}
PreparedStatementParser parser = new PreparedStatementParser();
parser.parseSQL(sql, properties);
parser.parseSQL(sql, valuesEndPosition);
return parser;
}

Expand All @@ -66,7 +63,7 @@ private void reset() {
valuesMode = false;
}

private void parseSQL(String sql, ClickHouseProperties properties) {
private void parseSQL(String sql, int valuesEndPosition) {
reset();
List<String> currentParamList = new ArrayList<String>();
boolean afterBackSlash = false;
Expand All @@ -75,15 +72,10 @@ private void parseSQL(String sql, ClickHouseProperties properties) {
boolean inSingleLineComment = false;
boolean inMultiLineComment = false;
boolean whiteSpace = false;
ClickHouseSqlStatement parsedSql = ClickHouseSqlParser.parseSingleStatement(sql, properties);
int endPosition = 0;
if (parsedSql.getStatementType() == StatementType.INSERT) {
endPosition = parsedSql.getEndPosition(ClickHouseSqlStatement.KEYWORD_VALUES) - 1;
if (endPosition > 0) {
valuesMode = true;
} else {
endPosition = 0;
}
if (valuesEndPosition > 0) {
valuesMode = true;
endPosition = valuesEndPosition;
} else {
Matcher matcher = VALUES.matcher(sql);
if (matcher.find()) {
Expand Down
Loading

0 comments on commit 91c0059

Please sign in to comment.