Skip to content

Commit

Permalink
Merge pull request #914 from Altinity/906-connector-does-not-create-r…
Browse files Browse the repository at this point in the history
…eplicate_schema_history-table

Added logic to create schema history table on connector startup.
  • Loading branch information
subkanthi authored Nov 15, 2024
2 parents 30faf6b + 9e4b161 commit 525821d
Showing 1 changed file with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,28 @@ private void createDatabaseForDebeziumStorage(ClickHouseSinkConnectorConfig conf
}
}

private void createSchemaHistoryTable(ClickHouseSinkConnectorConfig config, Properties props) {
String createSchemaHistoryTable = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
if(createSchemaHistoryTable == null || createSchemaHistoryTable.isEmpty() == true) {
log.warn("Skipping creating schema history table as the query was not provided in configuration");
return;
}

DBCredentials dbCredentials = parseDBConfiguration(config);
String jdbcUrl = BaseDbWriter.getConnectionString(dbCredentials.getHostName(), dbCredentials.getPort(),
"system");
ClickHouseConnection conn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",dbCredentials.getUserName(), dbCredentials.getPassword(), config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
"system", dbCredentials.getUserName(),
dbCredentials.getPassword(), config, conn);

try {
writer.executeQuery(createSchemaHistoryTable);
} catch(Exception e) {
log.error("Error creating schema history table", e);
}
}

/**
* Function to create view for show_replica_status
* @param config
Expand Down Expand Up @@ -743,7 +765,17 @@ public void connectorStarted() {
isReplicationRunning = true;
log.debug("Connector started");
// Create view.
createViewForShowReplicaStatus(config, props);
try {
createViewForShowReplicaStatus(config, props);
} catch(Exception e) {
log.error("Error creating view for replica status", e);
}

try {
createSchemaHistoryTable(config, props);
} catch(Exception e) {
log.error("Error creating schema history table", e);
}
}

@Override
Expand Down

0 comments on commit 525821d

Please sign in to comment.