Skip to content

Commit

Permalink
Added integration test for testing data types in schema only mode
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi committed Nov 21, 2023
1 parent 8ede79d commit 1d5dbb5
Show file tree
Hide file tree
Showing 9 changed files with 748 additions and 31 deletions.
2 changes: 1 addition & 1 deletion sink-connector-lightweight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@
<dependency>
<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.4</version>
<version>0.0.5</version>
<scope>compile</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.sql.ResultSet;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

@Testcontainers
@DisplayName("Test that tests replication of data types and validates datetime, date limits when the timezone is set to America/Chicago in ClickHouse")
public class DateTimeWithTimeZoneIT {
protected MySQLContainer mySqlContainer;

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withCopyFileToContainer(MountableFile.forClasspathResource("config.xml"), "/etc/clickhouse-server/config.d/config.xml")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);


@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("datetime.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));

BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(15000);
}

@Test
public void testCreateTable() throws Exception {
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

Properties props = getDebeziumProperties();
props.setProperty("database.include.list", "datatypes");
props.setProperty("clickhouse.server.database", "datatypes");

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
new MySQLDDLParserService());
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(30000);


BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);


writer.getConnection().close();
//Thread.sleep(10000);

writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);

// Validate temporal_types_DATETIME data.
ResultSet dateTimeResult = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME");

while(dateTimeResult.next()) {
System.out.println("DATE TIME");
System.out.println(dateTimeResult.getTimestamp("Minimum_Value").toString());
System.out.println(dateTimeResult.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult.getTimestamp("Maximum_Value").toString());

// Assert.assertTrue(dateTimeResult.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
// Assert.assertTrue(dateTimeResult.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:47:46.0"));
// Assert.assertTrue(dateTimeResult.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}

// DATETIME1
ResultSet dateTimeResult1 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME1");
while(dateTimeResult1.next()) {
System.out.println("DATE TIME 1");
System.out.println(dateTimeResult1.getTimestamp("Minimum_Value").toString());
System.out.println(dateTimeResult1.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult1.getTimestamp("Maximum_Value").toString());

// Assert.assertTrue(dateTimeResult1.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
// Assert.assertTrue(dateTimeResult1.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:48:25.1"));
// Assert.assertTrue(dateTimeResult1.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}

// DATETIME2
ResultSet dateTimeResult2 = writer.executeQueryWithResultSet("select * from temporal_types_DATETIME2");
while(dateTimeResult2.next()) {
System.out.println("DATE TIME 2");
System.out.println(dateTimeResult2.getTimestamp("Minimum_Value").toString());
System.out.println(dateTimeResult2.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult2.getTimestamp("Maximum_Value").toString());

// Assert.assertTrue(dateTimeResult2.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
// Assert.assertTrue(dateTimeResult2.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:49:05.12"));
// Assert.assertTrue(dateTimeResult2.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
}

// DATETIME3
ResultSet dateTimeResult3 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME3");
while(dateTimeResult3.next()) {
System.out.println("DATE TIME 3");

System.out.println(dateTimeResult3.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult3.getTimestamp("Maximum_Value").toString());
System.out.println(dateTimeResult3.getTimestamp("Minimum_Value").toString());

// Assert.assertTrue(dateTimeResult3.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:49:22.123"));
// Assert.assertTrue(dateTimeResult3.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// Assert.assertTrue(dateTimeResult3.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));
}


// DATETIME4
ResultSet dateTimeResult4 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME4");
while(dateTimeResult4.next()) {
System.out.println("DATE TIME 4");

System.out.println(dateTimeResult4.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult4.getTimestamp("Maximum_Value").toString());
System.out.println(dateTimeResult4.getTimestamp("Minimum_Value").toString());

// Assert.assertTrue(dateTimeResult4.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:50:12.123"));
// Assert.assertTrue(dateTimeResult4.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// Assert.assertTrue(dateTimeResult4.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));

}


// DATETIME5
ResultSet dateTimeResult5 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME5");
while(dateTimeResult5.next()) {
System.out.println("DATE TIME 5");

System.out.println(dateTimeResult5.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult5.getTimestamp("Maximum_Value").toString());
System.out.println(dateTimeResult5.getTimestamp("Minimum_Value").toString());

// Assert.assertTrue(dateTimeResult5.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:50:28.123"));
// Assert.assertTrue(dateTimeResult5.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// Assert.assertTrue(dateTimeResult5.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));

}

// DATETIME6
ResultSet dateTimeResult6 = writer.executeQueryWithResultSet("select * from employees.temporal_types_DATETIME6");
while(dateTimeResult6.next()) {
System.out.println("DATE TIME 6");

System.out.println(dateTimeResult6.getTimestamp("Mid_Value").toString());
System.out.println(dateTimeResult6.getTimestamp("Maximum_Value").toString());
System.out.println(dateTimeResult6.getTimestamp("Minimum_Value").toString());

//Assert.assertTrue(dateTimeResult6.getTimestamp("Mid_Value").toString().equalsIgnoreCase("2022-09-29 01:50:56.123"));
// Assert.assertTrue(dateTimeResult6.getTimestamp("Maximum_Value").toString().equalsIgnoreCase("2283-11-11 23:59:59.999"));
// Assert.assertTrue(dateTimeResult6.getTimestamp("Minimum_Value").toString().equalsIgnoreCase("1925-01-01 00:00:00.0"));

}

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

writer.getConnection().close();
}

protected Properties getDebeziumProperties() throws Exception {

// Start the debezium embedded application.

Properties defaultProps = new Properties();
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");

defaultProps.putAll(defaultProperties);
Properties fileProps = new ConfigLoader().load("config.yml");
defaultProps.putAll(fileProps);

defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
defaultProps.setProperty("database.user", "root");
defaultProps.setProperty("database.password", "adminpass");

defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", clickHouseContainer.getUsername());
defaultProps.setProperty("clickhouse.server.password", clickHouseContainer.getPassword());
defaultProps.setProperty("clickhouse.server.database", "employees");

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));

defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));


return defaultProps;

}
}
Loading

0 comments on commit 1d5dbb5

Please sign in to comment.