Skip to content

Commit

Permalink
Merge pull request #877 from Altinity/database_override_it_rrmt
Browse files Browse the repository at this point in the history
Database override it rrmt
  • Loading branch information
subkanthi authored Oct 22, 2024
2 parents 4c91874 + fb27eab commit 57c59fe
Show file tree
Hide file tree
Showing 52 changed files with 403 additions and 174 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<maven.compiler.target>17</maven.compiler.target>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.debezium>2.5.0.Beta1</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<version.junit>5.9.1</version.junit>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
4 changes: 2 additions & 2 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.7.0.Beta2</version>
<version>2.7.2.Final</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -326,7 +326,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public static void main(String[] args) throws Exception {

setupMonitoringThread(new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props)), props);

embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, false);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, false);

try {
DebeziumEmbeddedRestApi.startRestApi(props, injector, debeziumChangeEventCapture, userProperties);
Expand Down Expand Up @@ -141,8 +140,7 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector

Thread.sleep(500);
// embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class), props, true);
return null;
});

Expand All @@ -151,15 +149,15 @@ public static CompletableFuture<String> startDebeziumEventLoop(Injector injector


public static void start(DebeziumRecordParserService recordParserService,
DDLParserService ddlParserService, Properties props, boolean forceStart) throws Exception {
Properties props, boolean forceStart) throws Exception {

if(forceStart == true) {
// Reload the configuration file.
log.info(String.format("******* Reloading configuration file (%s) from disk ******", configurationFile));
loadPropertiesFile(configurationFile);
}
debeziumChangeEventCapture = new DebeziumChangeEventCapture();
debeziumChangeEventCapture.setup(props, recordParserService, ddlParserService, forceStart);
debeziumChangeEventCapture.setup(props, recordParserService, forceStart);
}

public static void stop() throws IOException {
Expand Down Expand Up @@ -210,8 +208,7 @@ public void run() {
log.info("******* Restarting Event Loop ********");
debeziumChangeEventCapture.stop();
Thread.sleep(3000);
start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(DDLParserService.class), props, true);
start(injector.getInstance(DebeziumRecordParserService.class), props, true);
} catch (IOException e) {
log.error("**** ERROR: Restarting Event Loop ****", e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public static void startRestApi(Properties props, Injector injector,
try {
debeziumChangeEventCapture.deleteSchemaHistory(config, finalProps1);
} catch (Exception e) {
log.error("Client - Error deleting offsets", e);
log.error("Client - Error deleting schema history", e);
ctx.result(e.toString());
ctx.status(HttpStatus.INTERNAL_SERVER_ERROR);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -164,10 +166,20 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
updateMetrics(DDL, writer);
}

/**
* Function to get the database name from the SourceRecord.
* If the database name is not present in the SourceRecord, then
* the database name is set to "system".
* Also if a database is overridden in the configuration, then
* the database name is set to the overridden database name.
* @param sr
* @return
*/
private String getDatabaseName(SourceRecord sr) {
if (sr != null && sr.key() instanceof Struct) {
String recordDbName = (String) ((Struct) sr.key()).get("databaseName");
if (recordDbName != null && !recordDbName.isEmpty()) {

return recordDbName;
}
}
Expand Down Expand Up @@ -411,8 +423,8 @@ private Pair<String, String> getDebeziumOffsetStorageDatabaseName(Properties pro
* @return
*/
private Pair<String, String> getDebeziumSchemaHistoryDatabaseName(Properties props) {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name());
String tableName = props.getProperty(SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING +
JdbcSchemaHistoryConfig.PROP_TABLE_NAME.name());
return splitTableName(tableName);
}

Expand Down Expand Up @@ -605,10 +617,11 @@ public void deleteSchemaHistory(ClickHouseSinkConnectorConfig config, Properties
databaseName, dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);

// Get topic.prefix from config
String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableNameDatabaseName.getRight() + "."
+ tableNameDatabaseName.getLeft(),writer);
// Get topic.prefix from properies
String topicPrefix = props.getProperty(CommonConnectorConfig.TOPIC_PREFIX.name());
// String topicPrefix = config.getString(CommonConnectorConfig.TOPIC_PREFIX.name());
// Jdbc adds the database name to the table name, so we need to remove it
new DebeziumOffsetStorage().deleteSchemaHistoryTable(topicPrefix, tableName, writer);

}
/**
Expand Down Expand Up @@ -770,7 +783,7 @@ public void connectorStopped() {
* @param debeziumRecordParserService
*/
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {
boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;

import com.clickhouse.logging.Logger;
import com.clickhouse.logging.LoggerFactory;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
Expand Down Expand Up @@ -32,6 +34,7 @@ public class DebeziumOffsetStorage {
public static final String SOURCE_PASSWORD = "source_password";


private static final Logger log = LoggerFactory.getLogger(DebeziumOffsetStorage.class);

public String getOffsetKey(Properties props) {
String connectorName = props.getProperty("name");
Expand Down Expand Up @@ -62,7 +65,8 @@ public void deleteSchemaHistoryTable(String offsetKey,
BaseDbWriter writer) throws SQLException {


String debeziumStorageStatusQuery = String.format("delete from %s where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='\"%s\"" , tableName, offsetKey);
String debeziumStorageStatusQuery = String.format("delete from `%s` where JSONExtractRaw(JSONExtractRaw(history_data,'source'), 'server')='%s'" , tableName, offsetKey);
log.info("Deleting schema history table query: " + debeziumStorageStatusQuery);
writer.executeQuery(debeziumStorageStatusQuery);
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,8 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
this.databaseName = sourceToDestinationMap.get(databaseName);
} else {
this.databaseName = databaseName;
}
this.databaseName = overrideDatabaseName(databaseName);

this.query = transformedQuery;
this.tableName = tableName;
Expand All @@ -74,6 +66,23 @@ public MySqlDDLParserListenerImpl(BaseDbWriter writer, StringBuffer transformedQ
this.userProvidedTimeZone = parseTimeZone();
}

/**
* Function to override the database name.
* @param databaseName
* @return
*/
private String overrideDatabaseName(String databaseName) {

// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}

if(sourceToDestinationMap.containsKey(databaseName)) {
return sourceToDestinationMap.get(databaseName);
}
return databaseName;
}

public ZoneId parseTimeZone() {
String userProvidedTimeZone = config.getString(ClickHouseSinkConnectorConfigVariables
Expand Down Expand Up @@ -102,25 +111,9 @@ public void enterCreateDatabase(MySqlParser.CreateDatabaseContext createDatabase

String databaseName = tree.getText();
if(!databaseName.isEmpty()) {
// Check if the database is overridden
Map<String, String> sourceToDestinationMap = new HashMap<>();

try {
if (this.config.getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()) != null)
sourceToDestinationMap = Utils.parseSourceToDestinationDatabaseMap(this.config.
getString(ClickHouseSinkConnectorConfigVariables.CLICKHOUSE_DATABASE_OVERRIDE_MAP.toString()));
} catch(Exception e) {
log.error("enterCreateDatabase: Error parsing source to destination database map:" + e.toString());
}
// databaseName might contain backticks. Remove them.
if(databaseName.contains("`")) {
databaseName = databaseName.replace("`", "");
}
if(sourceToDestinationMap.containsKey(databaseName)) {
this.query.append(String.format(Constants.CREATE_DATABASE, sourceToDestinationMap.get(databaseName)));
} else {
this.query.append(String.format(Constants.CREATE_DATABASE, databaseName));
}

String overrideDatabaseName = overrideDatabaseName(tree.getText());
this.query.append(String.format(Constants.CREATE_DATABASE, overrideDatabaseName));
}
}
}
Expand Down Expand Up @@ -219,7 +212,10 @@ private Set<String> parseCreateTable(MySqlParser.CreateTableContext ctx, StringB
this.tableName = tree.getText();
// If tableName already includes the database name don't include database name in this.query
if(tableName.contains(".")) {
this.query.append(tableName);
// split tableName into databaseName and tableName
String[] tableNameSplit = tableName.split("\\.");
this.query.append(this.databaseName).append(".").append(tableNameSplit[1]);
//this.query.append(tableName);
} else
this.query.append(databaseName).append(".").append(tree.getText());

Expand Down Expand Up @@ -736,8 +732,12 @@ public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext)
originalTableName = renameTableContextChildren.get(0).getText();
newTableName = renameTableContextChildren.get(2).getText();
// If the table name already includes the database name dont include it in the query.
if(originalTableName.contains(".")) {
this.query.append(originalTableName).append(" to ").append(newTableName);
if(originalTableName.contains(".") && newTableName.contains(".")) {
// Split database and table name.
String[] databaseAndTableNameArray = originalTableName.split("\\.");
String[] newDatabaseAndTableNameArray = newTableName.split("\\.");
this.query.append(this.databaseName).append(".").append(databaseAndTableNameArray[1]).append(" to ").
append(this.databaseName).append(".").append(newDatabaseAndTableNameArray[1]);
} else
this.query.append(databaseName).append(".").append(originalTableName).append(" to ").
append(databaseName).append(".").append(newTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testPgOutputPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "system"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ static public Properties getDebeziumProperties(MySQLContainer mySqlContainer, Cl
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
//defaultProps.setProperty("ddl.retry", "true");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public void testMultipleDatabases() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void testMySQLGeneratedColumns() throws Exception {
Properties props = getDebeziumProperties(mySqlContainer, clickHouseContainer);

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()),
"employees"), false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void testMultipleDatabases() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(props, new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "test_db"),false);
engine.get().setup(props, new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ public void testAutoCreateTable(String clickHouseServerVersion) throws Exception
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "datatypes"),false);
engine.get().setup(ITCommon.getDebeziumPropertiesForSchemaOnly(mySqlContainer, clickHouseContainer), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ public void testDecoderBufsPlugin() throws Exception {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>()), "employees"), false);
engine.get().setup(getProperties(), new SourceRecordParserService(), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 57c59fe

Please sign in to comment.