Skip to content

Commit

Permalink
Merge pull request #366 from Altinity/ignore_alias_materialized_columns
Browse files Browse the repository at this point in the history
Added additional test cases for version check and MergeTree tables, Add backticks to column names(fix for names with spaces))
  • Loading branch information
subkanthi authored Nov 14, 2023
2 parents 1e4cdf1 + 5100879 commit ec42237
Show file tree
Hide file tree
Showing 29 changed files with 335 additions and 134 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testflow-sink-connector-lightweight.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
description: "sink connector version"
required: true
type: string
default: 2023-11-03
default: 2023-11-09

env:
# SINK_CONNECTOR_VERSION: "${{ inputs.sink_version }}"
Expand Down
Binary file modified sink-connector-client/sink-connector-client
Binary file not shown.
11 changes: 2 additions & 9 deletions sink-connector-lightweight/docker/config_postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@ offset.storage.jdbc.password: "root"
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
@@ -35,13 +37,14 @@ ORDER BY id
SETTINGS index_granularity = 8198"
offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
schema.history.internal: "io.debezium.storage.jdbc.history.JdbcSchemaHistory"
Expand All @@ -44,4 +37,4 @@ schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exis
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
enable.snapshot.ddl: "true"
auto.create.tables: "true"
database.dbname: "public"
database.dbname: "public"
2 changes: 1 addition & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.1</version>
<version>0.0.4</version>
<scope>compile</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,11 @@ private boolean checkIfDDLNeedsToBeIgnored(Properties props, SourceRecord sr, At
enableSnapshotDDLPropertyFlag = true;
}

// if(isDropOrTruncate.get()== false) {
// return false;
// }
String disableDropAndTruncateProperty = props.getProperty(SinkConnectorLightWeightConfig.DISABLE_DROP_TRUNCATE);
if(disableDropAndTruncateProperty != null && disableDropAndTruncateProperty.equalsIgnoreCase("true") && isDropOrTruncate.get()== true) {
log.debug("Ignoring Drop or Truncate");
return true;
}
if(isSnapshotDDL== true && enableSnapshotDDLPropertyFlag == false) {
// User wants to ignore snapshot
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class SinkConnectorLightWeightConfig {
// By default DDL is true, this flag is used to disable ddl.
public static final String DISABLE_DDL = "disable.ddl";

public static final String DISABLE_DROP_TRUNCATE = "disable.drop.truncate";


// Enable execution of snapshot ddl.
public static final String ENABLE_SNAPSHOT_DDL = "enable.snapshot.ddl";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.ddl.parser.ClickHouseDebeziumEmbeddedDDLBaseIT;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
public class CommandLineIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class CommandLineIT extends DDLBaseIT {
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.altinity.clickhouse.debezium.embedded.cdc;

import com.altinity.clickhouse.debezium.embedded.ddl.parser.ClickHouseDebeziumEmbeddedDDLBaseIT;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.DDLBaseIT;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.altinity.clickhouse.sink.connector.model.DBCredentials;
import com.google.common.collect.Maps;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.json.simple.parser.ParseException;
import static org.junit.Assert.assertTrue;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand All @@ -17,7 +16,7 @@
import java.util.Properties;

@Testcontainers
public class DebeziumChangeEventCaptureIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class DebeziumChangeEventCaptureIT extends DDLBaseIT {

private static final Logger log = LoggerFactory.getLogger(DebeziumChangeEventCaptureIT.class);
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLAddColumnIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class AddColumnIT extends DDLBaseIT {

@BeforeEach
public void startContainers() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.ResultSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLCreateTableIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class CreateTableDataTypesIT extends DDLBaseIT {

@BeforeEach
public void startContainers() throws InterruptedException {
Expand Down Expand Up @@ -56,7 +57,7 @@ public void testCreateTable() throws Exception {
}
});

Thread.sleep(20000);
Thread.sleep(30000);


BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
Expand Down Expand Up @@ -86,11 +87,63 @@ public void testCreateTable() throws Exception {
Assert.assertTrue(timestampTable.get("Mid_Value").equalsIgnoreCase("DateTime64(3)"));
Assert.assertTrue(timestampTable.get("Maximum_Value").equalsIgnoreCase("DateTime64(3)"));
Assert.assertTrue(timestampTable.get("Null_Value").equalsIgnoreCase("Nullable(DateTime64(3))"));

writer.getConnection().close();
//Thread.sleep(10000);

writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
// Validate temporal_types_DATE data.
ResultSet dateResult = writer.executeQueryWithResultSet("select * from temporal_types_DATE");

while(dateResult.next()) {
Assert.assertTrue(dateResult.getDate("Minimum_Value").toString().equalsIgnoreCase("1925-01-01"));
Assert.assertTrue(dateResult.getDate("Mid_Value").toString().equalsIgnoreCase("2022-09-29"));
Assert.assertTrue(dateResult.getDate("Maximum_Value").toString().equalsIgnoreCase("2283-11-11"));
}
// Validate temporal_types_DATETIME data.
ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME");

while(dateTimeResult.next()) {
System.out.println(dateTimeResult.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult.getTimestamp("Maximum_Value").toString());

Assert.assertTrue(dateTimeResult.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
Assert.assertTrue(dateTimeResult.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.0"));
Assert.assertTrue(dateTimeResult.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}

// // DATETIME1
// ResultSet dateTimeResult1 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME1");
// while(dateTimeResult.next()) {
// Assert.assertTrue(dateTimeResult1.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.000"));
// Assert.assertTrue(dateTimeResult1.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.000"));
// Assert.assertTrue(dateTimeResult1.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// }
//
// // DATETIME2
// ResultSet dateTimeResult2 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME1");
// while(dateTimeResult.next()) {
// Assert.assertTrue(dateTimeResult2.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.000"));
// Assert.assertTrue(dateTimeResult2.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.000"));
// Assert.assertTrue(dateTimeResult2.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// }
//
// // DATETIME3
// ResultSet dateTimeResult3 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME1");
// while(dateTimeResult.next()) {
// Assert.assertTrue(dateTimeResult3.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.000"));
// Assert.assertTrue(dateTimeResult3.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.000"));
// Assert.assertTrue(dateTimeResult3.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// }


if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

writer.getConnection().close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.Properties;

@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLBaseIT {
public class DDLBaseIT {
protected MySQLContainer mySqlContainer;

@Container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLChangeColumnIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class DDLChangeColumnIT extends DDLBaseIT {

@BeforeEach
public void startContainers() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLModifyColumnIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class DDLModifyColumnIT extends DDLBaseIT {

@BeforeEach
public void startContainers() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.sql.Connection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
public class DDLSchemaOnlyIT extends DDLBaseIT {

@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("alter_ddl_add_column.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(15000);
}

@Override
protected Properties getDebeziumProperties() throws Exception {

// Start the debezium embedded application.

Properties defaultProps = new Properties();
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");

defaultProps.putAll(defaultProperties);
Properties fileProps = new ConfigLoader().load("config.yml");
defaultProps.putAll(fileProps);

// **** OVERRIDE set to schema only
defaultProps.setProperty("snapshot.mode", "schema_only");
defaultProps.setProperty("disable.drop.truncate", "true");

defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
defaultProps.setProperty("database.user", "root");
defaultProps.setProperty("database.password", "adminpass");

defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("clickhouse.server.database", "employees");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));


return defaultProps;

}
@Test
public void testSchemaOnlyMode() throws Exception {

AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
new MySQLDDLParserService());
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(40000);//

Connection conn = connectToMySQL();
// alter table ship_class change column class_name class_name_new int;
// alter table ship_class change column tonange tonange_new decimal(10,10);

conn.prepareStatement("insert into dt values('2008-01-01 00:00:01', 'this is a test', 11, 2)").execute();


Thread.sleep(10000);


BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);

String result = writer.executeQuery("select timestamp from employees.dt");

System.out.println(result);
Assert.assertTrue(result.equalsIgnoreCase("2008-01-01 00:00:01"));
// Validate all ship_class columns.
// Assert.assertTrue(shipClassColumns.get("ship_spec").equalsIgnoreCase("Nullable(String)"));


if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

@Testcontainers

public class ClickHouseDebeziumEmbeddedEmployeesDBIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
public class EmployeesDBIT extends DDLBaseIT {


@BeforeEach
Expand Down
Loading

0 comments on commit ec42237

Please sign in to comment.