From 21bd1d6e784c426fc8da94414e83190f1f531db1 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 23 Oct 2024 15:59:42 -0400 Subject: [PATCH] Added logic to CREATE on Cluster when replicated mode is enabled --- .../ddl/parser/MySqlDDLParserListenerImpl.java | 7 +++++++ .../parser/MySqlDDLParserListenerImplTest.java | 15 +++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index b930f4a0c..8c2f85c33 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -121,6 +121,13 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase } else { this.query.append(String.format(Constants.CREATE_DATABASE, databaseName)); } + + boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables + .AUTO_CREATE_TABLES_REPLICATED.toString()); + if(isReplicatedReplacingMergeTree) { + this.query.append(" ON CLUSTER `{cluster}`"); + } + } } } diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index eb688a0f1..4cc43f87d 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -642,6 +642,21 @@ public void testCreateDatabase() { Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl")); } + @Test + public void testCreateDatabaseReplicated() { + StringBuffer clickHouseQuery = new StringBuffer(); + + HashMap map = new HashMap<>(); + map.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true"); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(map); + + MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config, "test"); + String sql = "create database if not exists repl_test_ddl"; + mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery); + + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists repl_test_ddl on cluster `{cluster}`")); + } + @Test public void testDropColumn() { StringBuffer clickHouseQuery = new StringBuffer();