Skip to content

Commit

Permalink
Merge pull request #896 from Altinity/895-ignore-ddl-based-on-regex
Browse files Browse the repository at this point in the history
895 ignore ddl based on regex
  • Loading branch information
subkanthi authored Nov 22, 2024
2 parents 81b5480 + ae6c568 commit 0978481
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 3 deletions.
5 changes: 4 additions & 1 deletion sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ database.connectionTimeZone: "UTC"

# Configuration to override the clickhouse database name for a given MySQL database name. If this configuration is not
# provided, the MySQL database name will be used as the ClickHouse database name.
#clickhouse.database.override.map: "employees:employees2, products:productsnew
clickhouse.database.override.map: "test:ch_test"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"
Expand All @@ -147,6 +147,9 @@ database.connectionTimeZone: "UTC"
#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
#disable.ddl: "false"

#ignore.ddl.regex: If set, the connector will ignore DDL events that match the regex.
ignore.ddl.regex: "(?i)(ANALYZE PARTITION).*"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -46,6 +48,8 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -92,6 +96,10 @@ public class DebeziumChangeEventCapture {

DebeziumOffsetStorage debeziumOffsetStorage;

@Getter
@Setter
private String lastIgnoredDDL;

public DebeziumChangeEventCapture() {
singleThreadDebeziumEventExecutor = Executors.newFixedThreadPool(1);
this.debeziumOffsetStorage = new DebeziumOffsetStorage();
Expand Down Expand Up @@ -122,7 +130,7 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService(writer, config, databaseName);
mySQLDDLParserService.parseSql(DDL, "", clickHouseQuery, isDropOrTruncate);

if (checkIfDDLNeedsToBeIgnored(props, sr, isDropOrTruncate)) {
if (checkIfDDLNeedsToBeIgnored(DDL,props, sr, isDropOrTruncate)) {
log.info("Ignored Source DB DDL: " + DDL + " Snapshot:" + isSnapshotDDL(sr));
return;
}
Expand Down Expand Up @@ -313,7 +321,7 @@ private boolean isSnapshotDDL(SourceRecord sr) {
* @param sr
* @return
*/
private boolean checkIfDDLNeedsToBeIgnored(Properties props, SourceRecord sr, AtomicBoolean isDropOrTruncate) {
private boolean checkIfDDLNeedsToBeIgnored(String DDL, Properties props, SourceRecord sr, AtomicBoolean isDropOrTruncate) {

String disableDDLProperty = props.getProperty(SinkConnectorLightWeightConfig.DISABLE_DDL);
if (disableDDLProperty != null && disableDDLProperty.equalsIgnoreCase("true")) {
Expand All @@ -329,6 +337,26 @@ private boolean checkIfDDLNeedsToBeIgnored(Properties props, SourceRecord sr, At
enableSnapshotDDLPropertyFlag = true;
}

// Also if the DDL matches the regex, then ignore it.
String ignoreDDLRegexProperty = props.getProperty(SinkConnectorLightWeightConfig.IGNORE_DDL_REGEX);
// The IGNORE_DDL_REGEX will be a list of regex separated by 2 pipe(##).
// Example: ^ALTER TABLE .* ADD COLUMN .*##^ALTER TABLE .* DROP COLUMN .*##^ALTER TABLE .* MODIFY COLUMN .*
// Separate the list.

if(ignoreDDLRegexProperty != null && !ignoreDDLRegexProperty.isEmpty()) {
String[] separatedIgnoreDDLRegexList = ignoreDDLRegexProperty.split("\\|\\|");

for (String regex : separatedIgnoreDDLRegexList) {
Pattern p = Pattern.compile(regex);
Matcher m = p.matcher(DDL);
if (m.find()) {
lastIgnoredDDL = DDL;
log.info("Ignoring DDL: " + DDL + " as it matches the regex: " + regex);
return true;
}
}
}

String disableDropAndTruncateProperty = props.getProperty(SinkConnectorLightWeightConfig.DISABLE_DROP_TRUNCATE);
if(disableDropAndTruncateProperty != null && disableDropAndTruncateProperty.equalsIgnoreCase("true") && isDropOrTruncate.get()== true) {
log.debug("Ignoring Drop or Truncate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class SinkConnectorLightWeightConfig {
public static final String DDL_RETRY = "ddl.retry";


// Add variable to ignore DDL with regex.
public static final String IGNORE_DDL_REGEX = "ignore.ddl.regex";


// Create a Map of all the configuration variables
// with the value as a description of the variable
// Createa a Map of all the configuration variables
Expand All @@ -35,6 +39,7 @@ public class SinkConnectorLightWeightConfig {
configVariables.put(ENABLE_SNAPSHOT_DDL, "Enable snapshot DDL (Enables execution of DDL that are received during snapshot) (Applies only to MySQL)");
configVariables.put(CLI_PORT, "Sink connector Client CLI port");
configVariables.put(DDL_RETRY, "If this configuration is set to true, the sink connector will retry executing DDL after a failure");
configVariables.put(IGNORE_DDL_REGEX, "If this configuration is set, the sink connector will ignore DDL statements that match the regex");
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import org.apache.log4j.BasicConfigurator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import com.altinity.clickhouse.debezium.embedded.ITCommon;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;


@Testcontainers
@DisplayName("Integration Test that validates DDL Ignore Regex")
public class DDLIgnoreRegExIT {
protected MySQLContainer mySqlContainer;
static ClickHouseContainer clickHouseContainer;

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
//.withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
// clickHouseContainer.start();
Thread.sleep(15000);
}

static {
clickHouseContainer = new org.testcontainers.clickhouse.ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
//.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

clickHouseContainer.start();
}
// Add


@Test
public void testDDLIgnoreRegex() throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);

DebeziumChangeEventCapture debeziumChangeEventCapture = new DebeziumChangeEventCapture();
executorService.execute(() -> {
try {

java.util.Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer);
// Add the ignore DDL regex.
props.put(SinkConnectorLightWeightConfig.IGNORE_DDL_REGEX, "(?i)(ANALYZE PARTITION).*||^CREATE DEFINER.*(?:\\r?\\n.*)*");

engine.set(debeziumChangeEventCapture);
engine.get().setup(props, new SourceRecordParserService()
, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

// MySQL DDL
String createTableWPartition = "CREATE TABLE sales ( id INT NOT NULL, sale_date DATE NOT NULL, amount DECIMAL(10, 2), PRIMARY KEY (id, sale_date) ) PARTITION BY RANGE (YEAR(sale_date)) ( PARTITION p2020 VALUES LESS THAN (2021), PARTITION p2021 VALUES LESS THAN (2022), PARTITION p2022 VALUES LESS THAN (2023), PARTITION pfuture VALUES LESS THAN MAXVALUE )";
ITCommon.connectToMySQL(mySqlContainer).createStatement().executeUpdate(createTableWPartition);

// Wait for the DDL to be captured by the engine.
Thread.sleep(10000);


String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "employees");
ClickHouseConnection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection);


// Thread.sleep(5000);
// Run MySQL DDL to run analyze partition.
String analyzePartitionDDL = "alter table sales analyze partition p2022";
ITCommon.connectToMySQL(mySqlContainer).createStatement().executeUpdate(analyzePartitionDDL);
Thread.sleep(10000);

// Validate that the last DDL that was ignored is the one that was ignored.
Assert.assertEquals(debeziumChangeEventCapture.getLastIgnoredDDL(), analyzePartitionDDL);

Thread.sleep(10000);

String createTableAccountDDL = "CREATE TABLE account (\n" +
" id INT AUTO_INCREMENT PRIMARY KEY NOT NULL,\n" +
" account_number VARCHAR(20) NOT NULL,\n" +
" amount DECIMAL(10, 2) NOT NULL\n" +
")";
ITCommon.connectToMySQL(mySqlContainer).createStatement().executeUpdate(createTableAccountDDL);

Thread.sleep(10000);

// Run the CREATE TRIGGER DDL
String createTriggerDDL = "CREATE TRIGGER ins_transaction BEFORE INSERT ON account\n" +
" FOR EACH ROW\n" +
" SET\n" +
" @deposits = @deposits + IF(NEW.amount>0,NEW.amount,0),\n" +
" @withdrawals = @withdrawals + IF(NEW.amount<0,-NEW.amount,0);";

ITCommon.connectToMySQL(mySqlContainer).createStatement().executeUpdate(createTriggerDDL);
Thread.sleep(10000);

// Validate that the last DDL that was ignored is the one that was ignored.
Assert.assertTrue(debeziumChangeEventCapture.getLastIgnoredDDL().contains("CREATE DEFINER"));
if(engine.get() != null) {
engine.get().stop();
}
executorService.shutdown();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ public void testCreateTableWithRangeByColumnsPartition() {
log.info("Create table " + clickHouseQuery);
}

// @Test
// public void testAlterTableWithAnalyzePartition() {

// String alterTableQuery = "alter table std_txn_agg analyze partition p20231229";
// StringBuffer clickHouseQuery = new StringBuffer();
// mySQLDDLParserService.parseSql(alterTableQuery, "Persons", clickHouseQuery);
// Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("ALTER TABLE employees.std_txn_agg ANALYZE PARTITION p20231229"));
// log.info("Alter table " + clickHouseQuery);
// }

@Test
public void testCreateTableWithParitionRange() {
String createQuery = "create table t(\n" +
Expand Down

0 comments on commit 0978481

Please sign in to comment.