From cf112d0bb1059785bafbd9d90a87401caa25c73e Mon Sep 17 00:00:00 2001 From: Selfeer Date: Fri, 17 May 2024 01:45:44 +0400 Subject: [PATCH] add alters --- .../tests/integration/helpers/common.py | 2 +- .../tests/integration/regression.py | 1 + .../integration/tests/multiple_databases.py | 204 +++++++++++++++--- .../tests/integration/tests/steps/sql.py | 8 +- 4 files changed, 188 insertions(+), 27 deletions(-) diff --git a/sink-connector/tests/integration/helpers/common.py b/sink-connector/tests/integration/helpers/common.py index dd480c55c..ff214b0de 100644 --- a/sink-connector/tests/integration/helpers/common.py +++ b/sink-connector/tests/integration/helpers/common.py @@ -590,7 +590,7 @@ def set_envs_on_node(self, envs, node=None): node.command(f"unset {key}", exitcode=0) -from helpers.cluster import Cluster +from integration.helpers.cluster import Cluster @TestStep(Given) diff --git a/sink-connector/tests/integration/regression.py b/sink-connector/tests/integration/regression.py index 41e0ac7f7..8c268e05c 100755 --- a/sink-connector/tests/integration/regression.py +++ b/sink-connector/tests/integration/regression.py @@ -76,6 +76,7 @@ def regression( "schemaregistry": ("schemaregistry",), "sink": ("sink",), "zookeeper": ("zookeeper",), + "kafka": ("kafka",), } self.context.clickhouse_version = clickhouse_version diff --git a/sink-connector/tests/integration/tests/multiple_databases.py b/sink-connector/tests/integration/tests/multiple_databases.py index 9054b146a..fb33c073f 100644 --- a/sink-connector/tests/integration/tests/multiple_databases.py +++ b/sink-connector/tests/integration/tests/multiple_databases.py @@ -5,6 +5,7 @@ from integration.tests.steps.clickhouse import ( validate_data_in_clickhouse_table, check_if_table_was_created, + check_column, ) @@ -89,30 +90,6 @@ def create_databases(self, databases=None): create_source_and_destination_databases(database_name=database_name) -@TestOutline -def create_tables_on_multiple_databases(self, databases=None, validate_values=True): - """Create tables on multiple databases.""" - if databases is None: - databases = [] - - with By("creating databases from a list"): - create_databases(databases=databases) - - with And("creating tables with data on multiple databases"): - with Pool(4) as pool: - for database_name in databases: - Scenario( - test=create_table_and_insert_values, - parallel=True, - executor=pool, - )( - table_name=f"table_{getuid()}", - database_name=database_name, - validate_values=validate_values, - ) - join() - - @TestStep(Given) def insert_on_database1(self, table_name): """Create a table on the database_1.""" @@ -223,6 +200,156 @@ def insert_on_all_databases_except_database_4(self): ) +@TestScenario +def add_column_on_a_database(self, database): + """Check that the column is added on the table when we add a column on a database.""" + table_name = f"table_{getuid()}" + column = "new_col" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I add a column on the table"): + add_column(table_name=table_name, database=database, column_name=column) + + with And("I insert values in the new column"): + insert_values( + table_name=table_name, + values="'test_name'", + database=database, + columns=f"({column})", + ) + + with Then("I check that the column was added on the table"): + check_column(table_name=table_name, database=database, column_name=column) + + +@TestScenario +def rename_column_on_a_database(self, database): + """Check that the column is renamed on the table when we rename a column on a database.""" + table_name = f"table_{getuid()}" + column = "col1" + new_column = "new_col_renamed" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I rename a column on the table"): + rename_column( + table_name=table_name, + database=database, + column_name=column, + new_column_name=new_column, + ) + + with Then("I check that the column was renamed on the table"): + check_column(table_name=table_name, database=database, column_name=new_column) + + +@TestScenario +def change_column_on_a_database(self, database): + """Check that the column is changed on the table when we change a column on a database.""" + table_name = f"table_{getuid()}" + column = "col1" + new_column = "new_col" + new_column_type = "varchar(255)" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I change a column on the table"): + change_column( + table_name=table_name, + database=database, + column_name=column, + new_column_name=new_column, + new_column_type=new_column_type, + ) + + with Then("I check that the column was changed on the table"): + check_column(table_name=table_name, database=database, column_name=new_column) + + +@TestScenario +def modify_column_on_a_database(self, database): + """Check that the column is modified on the table when we modify a column on a database.""" + table_name = f"table_{getuid()}" + column = "col1" + new_column_type = "varchar(255)" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I modify a column on the table"): + modify_column( + table_name=table_name, + database=database, + column_name=column, + new_column_type=new_column_type, + ) + + with Then("I check that the column was modified on the table"): + check_column( + table_name=table_name, + database=database, + column_name=column, + column_type=new_column_type, + ) + + +@TestScenario +def drop_column_on_a_database(self, database): + """Check that the column is dropped from the table when we drop a column on a database.""" + table_name = f"table_{getuid()}" + column = "col1" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I drop a column on the table"): + drop_column(table_name=table_name, database=database, column_name=column) + + with Then("I check that the column was dropped from the table"): + check_column(table_name=table_name, database=database, column_name="") + + +@TestScenario +@Requirements( + RQ_SRS_030_ClickHouse_MySQLToClickHouseReplication_PrimaryKey_Simple("1.0") +) +def add_primary_key_on_a_database(self, database): + """Check that the primary key is added to the table when we add a primary key on a database.""" + table_name = f"table_{getuid()}" + column = "col1" + + with Given(f"creating connection between kafka and sink connector"): + init_sink_connector(topics=f"SERVER5432.{database}.{table_name}") + + with And("I create a table on multiple databases"): + create_table_and_insert_values(table_name=table_name, database_name=database) + + with When("I add a primary key on the table"): + add_primary_key(table_name=table_name, database=database, column_name=column) + + with Then("I check that the primary key was added to the table"): + check_column(table_name=table_name, database=database, column_name=column) + + @TestSuite def inserts(self): """Check that the inserts are correctly replicated on different number of databases. @@ -238,6 +365,34 @@ def inserts(self): Scenario(run=insert_on_all_databases) +@TestSuite +def alters(self): + """Check that the tables are replicated when we alter them on different databases. + + Combinations: + - ADD COLUMN + - RENAME COLUMN + - CHANGE COLUMN + - MODIFY COLUMN + - DROP COLUMN + - ADD PRIMARY KEY + """ + databases = self.context.list_of_databases + + alter_statements = [ + add_column_on_a_database, + rename_column_on_a_database, + change_column_on_a_database, + modify_column_on_a_database, + drop_column_on_a_database, + add_primary_key_on_a_database, + ] + + for database in databases: + for alter_statement in alter_statements: + Scenario(test=alter_statement)(database=database) + + @TestFeature @Name("multiple databases") def module( @@ -280,6 +435,7 @@ def module( create_databases(databases=self.context.list_of_databases) Feature(run=inserts) + Feature(run=alters) # with Pool(parallel_cases) as executor: # Feature(run=inserts, parallel=True, executor=executor) diff --git a/sink-connector/tests/integration/tests/steps/sql.py b/sink-connector/tests/integration/tests/steps/sql.py index 10f9e745c..1f61b4de9 100644 --- a/sink-connector/tests/integration/tests/steps/sql.py +++ b/sink-connector/tests/integration/tests/steps/sql.py @@ -201,8 +201,12 @@ def create_mysql_to_clickhouse_replicated_table( @TestStep(When) -def insert_values(self, table_name, values, database=None, node=None): +def insert_values(self, table_name, values, database=None, node=None, columns=None): """Insert values into MySQL table""" + + if columns is None: + columns = "" + if database is None: database = "test" @@ -210,7 +214,7 @@ def insert_values(self, table_name, values, database=None, node=None): node = self.context.cluster.node("mysql-master") with By(f"inserting values into {table_name}"): - node.query(f"INSERT INTO {database}.{table_name} VALUES ({values})") + node.query(f"INSERT INTO {database}.{table_name} {columns} VALUES ({values})") @TestStep(Given)