diff --git a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java index d068b8b6b..f8d70b531 100644 --- a/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java +++ b/sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImpl.java @@ -111,6 +111,32 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase String databaseName = tree.getText(); if(!databaseName.isEmpty()) { + // Check if the database is overridden + Map sourceToDestinationMap = new HashMap<>(); + + try { + if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null) + sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config. + getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString())); + } catch(Exception e) { + log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString()); + } + // databaseName might contain backticks. Remove them. + if(databaseName.contains("`")) { + databaseName = databaseName.replace("`", ""); + } + if(sourceToDestinationMap.containsKey(databaseName)) { + this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName))); + } else { + this.query.append(String.format(Constants.CREATE_DATABASE, databaseName)); + } + + boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables + .AUTO_CREATE_TABLES_REPLICATED.toString()); + if(isReplicatedReplacingMergeTree) { + this.query.append(" ON CLUSTER `{cluster}`"); + } + String overrideDatabaseName = overrideDatabaseName(tree.getText()); this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName)); diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java index e4d479774..7e65322d9 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySqlDDLParserListenerImplTest.java @@ -677,6 +677,21 @@ public void testCreateDatabase() { Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists test_ddl")); } + @Test + public void testCreateDatabaseReplicated() { + StringBuffer clickHouseQuery = new StringBuffer(); + + HashMap map = new HashMap<>(); + map.put(ClickHouseSinkConnectorConfigVariables.AUTO_CREATE_TABLES_REPLICATED.toString(), "true"); + ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(map); + + MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(config, "test"); + String sql = "create database if not exists repl_test_ddl"; + mySQLDDLParserService.parseSql(sql, "table1", clickHouseQuery); + + Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("create database if not exists repl_test_ddl on cluster `{cluster}`")); + } + @Test public void testDropColumn() { StringBuffer clickHouseQuery = new StringBuffer();