Skip to content

Commit

Permalink
Added logic to CREATE on Cluster when replicated mode is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Oct 23, 2024
1 parent b6bad48 commit 21bd1d6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}

boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables
.AUTO_CREATE_TABLES_REPLICATED.toString());
if(isReplicatedReplacingMergeTree) {
this.query.append(" ON CLUSTER `{cluster}`");
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,21 @@ public void testCreateDatabase() {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl"));
}

@Test
public void testCreateDatabaseReplicated() {
StringBuffer clickHouseQuery = new StringBuffer();

HashMap<String, String> map = new HashMap<>();
map.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true");
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(map);

MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config, "test");
String sql = "create database if not exists repl_test_ddl";
mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery);

Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists repl_test_ddl on cluster `{cluster}`"));
}

@Test
public void testDropColumn() {
StringBuffer clickHouseQuery = new StringBuffer();
Expand Down

0 comments on commit 21bd1d6

Please sign in to comment.