Skip to content

Commit

Permalink
add alters
Browse files Browse the repository at this point in the history
  • Loading branch information
Selfeer committed May 16, 2024
1 parent 6315bf6 commit cf112d0
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 27 deletions.
2 changes: 1 addition & 1 deletion sink-connector/tests/integration/helpers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def set_envs_on_node(self, envs, node=None):
node.command(f"unset {key}", exitcode=0)


from helpers.cluster import Cluster
from integration.helpers.cluster import Cluster


@TestStep(Given)
Expand Down
1 change: 1 addition & 0 deletions sink-connector/tests/integration/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def regression(
"schemaregistry": ("schemaregistry",),
"sink": ("sink",),
"zookeeper": ("zookeeper",),
"kafka": ("kafka",),
}

self.context.clickhouse_version = clickhouse_version
Expand Down
204 changes: 180 additions & 24 deletions sink-connector/tests/integration/tests/multiple_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from integration.tests.steps.clickhouse import (
validate_data_in_clickhouse_table,
check_if_table_was_created,
check_column,
)


Expand Down Expand Up @@ -89,30 +90,6 @@ def create_databases(self, databases=None):
create_source_and_destination_databases(database_name=database_name)


@TestOutline
def create_tables_on_multiple_databases(self, databases=None, validate_values=True):
"""Create tables on multiple databases."""
if databases is None:
databases = []

with By("creating databases from a list"):
create_databases(databases=databases)

with And("creating tables with data on multiple databases"):
with Pool(4) as pool:
for database_name in databases:
Scenario(
test=create_table_and_insert_values,
parallel=True,
executor=pool,
)(
table_name=f"table_{getuid()}",
database_name=database_name,
validate_values=validate_values,
)
join()


@TestStep(Given)
def insert_on_database1(self, table_name):
"""Create a table on the database_1."""
Expand Down Expand Up @@ -223,6 +200,156 @@ def insert_on_all_databases_except_database_4(self):
)


@TestScenario
def add_column_on_a_database(self, database):
"""Check that the column is added on the table when we add a column on a database."""
table_name = f"table_{getuid()}"
column = "new_col"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I add a column on the table"):
add_column(table_name=table_name, database=database, column_name=column)

with And("I insert values in the new column"):
insert_values(
table_name=table_name,
values="'test_name'",
database=database,
columns=f"({column})",
)

with Then("I check that the column was added on the table"):
check_column(table_name=table_name, database=database, column_name=column)


@TestScenario
def rename_column_on_a_database(self, database):
"""Check that the column is renamed on the table when we rename a column on a database."""
table_name = f"table_{getuid()}"
column = "col1"
new_column = "new_col_renamed"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I rename a column on the table"):
rename_column(
table_name=table_name,
database=database,
column_name=column,
new_column_name=new_column,
)

with Then("I check that the column was renamed on the table"):
check_column(table_name=table_name, database=database, column_name=new_column)


@TestScenario
def change_column_on_a_database(self, database):
"""Check that the column is changed on the table when we change a column on a database."""
table_name = f"table_{getuid()}"
column = "col1"
new_column = "new_col"
new_column_type = "varchar(255)"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I change a column on the table"):
change_column(
table_name=table_name,
database=database,
column_name=column,
new_column_name=new_column,
new_column_type=new_column_type,
)

with Then("I check that the column was changed on the table"):
check_column(table_name=table_name, database=database, column_name=new_column)


@TestScenario
def modify_column_on_a_database(self, database):
"""Check that the column is modified on the table when we modify a column on a database."""
table_name = f"table_{getuid()}"
column = "col1"
new_column_type = "varchar(255)"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I modify a column on the table"):
modify_column(
table_name=table_name,
database=database,
column_name=column,
new_column_type=new_column_type,
)

with Then("I check that the column was modified on the table"):
check_column(
table_name=table_name,
database=database,
column_name=column,
column_type=new_column_type,
)


@TestScenario
def drop_column_on_a_database(self, database):
"""Check that the column is dropped from the table when we drop a column on a database."""
table_name = f"table_{getuid()}"
column = "col1"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I drop a column on the table"):
drop_column(table_name=table_name, database=database, column_name=column)

with Then("I check that the column was dropped from the table"):
check_column(table_name=table_name, database=database, column_name="")


@TestScenario
@Requirements(
RQ_SRS_030_ClickHouse_MySQLToClickHouseReplication_PrimaryKey_Simple("1.0")
)
def add_primary_key_on_a_database(self, database):
"""Check that the primary key is added to the table when we add a primary key on a database."""
table_name = f"table_{getuid()}"
column = "col1"

with Given(f"creating connection between kafka and sink connector"):
init_sink_connector(topics=f"SERVER5432.{database}.{table_name}")

with And("I create a table on multiple databases"):
create_table_and_insert_values(table_name=table_name, database_name=database)

with When("I add a primary key on the table"):
add_primary_key(table_name=table_name, database=database, column_name=column)

with Then("I check that the primary key was added to the table"):
check_column(table_name=table_name, database=database, column_name=column)


@TestSuite
def inserts(self):
"""Check that the inserts are correctly replicated on different number of databases.
Expand All @@ -238,6 +365,34 @@ def inserts(self):
Scenario(run=insert_on_all_databases)


@TestSuite
def alters(self):
"""Check that the tables are replicated when we alter them on different databases.
Combinations:
- ADD COLUMN
- RENAME COLUMN
- CHANGE COLUMN
- MODIFY COLUMN
- DROP COLUMN
- ADD PRIMARY KEY
"""
databases = self.context.list_of_databases

alter_statements = [
add_column_on_a_database,
rename_column_on_a_database,
change_column_on_a_database,
modify_column_on_a_database,
drop_column_on_a_database,
add_primary_key_on_a_database,
]

for database in databases:
for alter_statement in alter_statements:
Scenario(test=alter_statement)(database=database)


@TestFeature
@Name("multiple databases")
def module(
Expand Down Expand Up @@ -280,6 +435,7 @@ def module(
create_databases(databases=self.context.list_of_databases)

Feature(run=inserts)
Feature(run=alters)

# with Pool(parallel_cases) as executor:
# Feature(run=inserts, parallel=True, executor=executor)
Expand Down
8 changes: 6 additions & 2 deletions sink-connector/tests/integration/tests/steps/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,20 @@ def create_mysql_to_clickhouse_replicated_table(


@TestStep(When)
def insert_values(self, table_name, values, database=None, node=None):
def insert_values(self, table_name, values, database=None, node=None, columns=None):
"""Insert values into MySQL table"""

if columns is None:
columns = ""

if database is None:
database = "test"

if node is None:
node = self.context.cluster.node("mysql-master")

with By(f"inserting values into {table_name}"):
node.query(f"INSERT INTO {database}.{table_name} VALUES ({values})")
node.query(f"INSERT INTO {database}.{table_name} {columns} VALUES ({values})")


@TestStep(Given)
Expand Down

0 comments on commit cf112d0

Please sign in to comment.