Skip to content

Commit

Permalink
Merge branch 'develop' into 2.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi authored May 16, 2024
2 parents 9e41644 + 4b80b72 commit 883d273
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 55 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ First two are good tutorials on MySQL and PostgreSQL respectively.

### Development

* [Development](doc/development.md)
* [Testing](doc/TESTING.md)

## Roadmap
Expand Down
29 changes: 29 additions & 0 deletions doc/development.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
### Build Sink Connector from sources.


Requirements
- Java JDK 17 (https://openjdk.java.net/projects/jdk/11/)
- Maven (mvn) (https://maven.apache.org/download.cgi)
- Docker and Docker-compose


1. Clone the ClickHouse Sink connector repository:
```bash
git clone [email protected]:Altinity/clickhouse-sink-connector.git
```

2. Build the ClickHouse Sink connector Library:
This builds the requirement for sink connector lightweight`<sink-connector-library-version>0.0.8</sink-connector-library-version>`

```bash
cd sink-connector
mvn install -DskipTests=true
```

3. Build the ClickHouse Lightweight connector:
```bash
cd ../sink-connector-lightweight
mvn install -DskipTests=true
```

The JAR file will be created in the `target` directory.
101 changes: 99 additions & 2 deletions sink-connector/python/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import warnings
import os
import configparser
import pymysql
import pymysql as mysql
import pandas as pd

binary_datatypes = ('blob', 'varbinary', 'point', 'geometry', 'bit', 'binary', 'linestring',
'geomcollection', 'multilinestring', 'multipolygon', 'multipoint', 'polygon')
Expand Down Expand Up @@ -58,13 +61,26 @@ def get_partitions_from_regex(conn, mysql_database, include_tables_regex, exclud
if include_partitions_regex is not None:
include_regex_clause = f"and partition_name rlike '{include_partitions_regex}'"

strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name, PARTITION_NAME as partition_name from information_schema.partitions where table_schema = '{mysql_database}' {include_regex_clause} and (table_schema, table_name) IN ({table_sql}) order by 1,2,3"
strCommand = f"select TABLE_SCHEMA as table_schema, TABLE_NAME as table_name, PARTITION_NAME as partition_name, PARTITION_EXPRESSION as partition_expression from information_schema.partitions where table_schema = '{mysql_database}' {include_regex_clause} and (table_schema, table_name) IN ({table_sql}) order by 1,2,3"
(rowset, rowcount) = execute_mysql(conn, strCommand)
x = rowset

return x


def mysql_execute_df(conn, sql):
logging.debug(sql)
cursor = conn.connection.cursor()
try:
cursor.execute( sql )
names = [ x[0] for x in cursor.description]
rows = cursor.fetchall()
return pd.DataFrame( rows, columns=names)
finally:
if cursor is not None:
cursor.close()


def execute_mysql(conn, strSql):
"""
# -- =======================================================================
Expand Down Expand Up @@ -96,4 +112,85 @@ def resolve_credentials_from_config(config_file):
mysql_user = config['client']['user']
mysql_password = config['client']['password']
logging.debug(f"mysql_user {mysql_user} mysql_password ****")
return (mysql_user, mysql_password)
return (mysql_user, mysql_password)


def estimate_table_count(conn, mysql_table, where, pk, min_pk, max_pk):
sql = f"explain select * from {mysql_table} where {where} and {pk} between {min_pk} and {max_pk}"
df = mysql_execute_df(conn, sql)
head = df.head(1)
count = int(head['rows'].iloc[0])
return count


def get_min_max_pk_value(conn, mysql_table, pk, where):
sql = f"select min({pk}) as min_pk, max({pk}) as max_pk from {mysql_table} where {where}"
df = mysql_execute_df(conn, sql)
head = df.head(1)
if head['max_pk'].isnull().values.any():
return (None, None)
min_pk = int(head['min_pk'].iloc[0])
max_pk = int(head['max_pk'].iloc[0])
return (min_pk, max_pk)


def mysql_columns(conn, mysql_database, mysql_table, pk, ignore_pk=True):
if ignore_pk:
not_equal_pk = f"and column_name <> '{pk}'"
else:
not_equal_pk = ""
sql = f"select column_name as COLUMN_NAME from information_schema.columns where table_schema='{mysql_database}' and table_name = '{mysql_table}' {not_equal_pk} order by ORDINAL_POSITION"
df = mysql_execute_df(conn, sql)
logging.debug('Columns \n' + df.to_string(index=False))
list = df['COLUMN_NAME'].to_list()
return list


def mysql_pk_columns(conn, mysql_database, mysql_table, is_integer=True):
where_integer = ""
if is_integer:
where_integer = " and data_type like '%int%'"
sql = f"select column_name as COLUMN_NAME from information_schema.columns where table_schema='{mysql_database}' and table_name = '{mysql_table}' and column_key='PRI' {where_integer} order by ORDINAL_POSITION"
df = mysql_execute_df(conn, sql)
logging.debug('PK columns \n' + df.to_string(index=False))
list = df['COLUMN_NAME'].to_list()
return list


def divide_table_into_even_chunks(conn, mysql_table, chunk_size, pk, where):
if not pk:
yield {}
return
(min_pk_value, max_pk_value) = get_min_max_pk_value(conn, mysql_table, pk, where)
if min_pk_value is None:
logging.debug(f"No data in {mysql_table}")
return
table_rowcount = estimate_table_count(conn, mysql_table, where, pk, min_pk_value, max_pk_value)
lower_bound = int(min_pk_value)

nb_chunks = int(table_rowcount / chunk_size)+1
logging.debug(f"nb_chunks = {nb_chunks}")
logging.debug(f"lower_bound {lower_bound} max_pk_value {max_pk_value}")
chunk_step = int((max_pk_value - min_pk_value) / nb_chunks)+1

upper_bound = lower_bound - 1
logging.debug(f"lower_bound {lower_bound} max_pk_value {max_pk_value} chunk_step {chunk_step}")

while lower_bound <= max_pk_value:
lower_bound = upper_bound + 1
upper_bound = lower_bound + chunk_step
chunk = {}
chunk['min_pk'] = lower_bound
chunk['max_pk'] = upper_bound

# we need to make sure there is data in the chunk, as CH wants data on insert in a pipe
# we can use that to find a better minimum
sql = f"select {pk} from {mysql_table} where {where} and {pk} between {lower_bound} and {upper_bound} order by {pk} limit 1"
df = mysql_execute_df(conn, sql)
index = df.index
number_of_rows = len(index)
if number_of_rows > 0 :
head = df.head(1)
chunk['min_pk'] = int(head[pk].iloc[0])
yield chunk
logging.debug(f"Estimated table row count {table_rowcount}")
22 changes: 11 additions & 11 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def get_table_checksum_query(conn, table):
nullables.append(column_name)
select += " case when "+column_name+" is null then '' else "
if first_column:
select += " '#' || "
select += " "

if not first_column:
select += "'#'||"
Expand All @@ -142,7 +142,7 @@ def get_table_checksum_query(conn, table):
elif "Decimal" in data_type:
# custom function due to https://github.com/ClickHouse/ClickHouse/issues/30934
# requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale))))
select += "format_decimal("+column_name + \
select += "toDecimalString("+column_name + \
","+str(numeric_scale)+")"
elif "DateTime64(0" in data_type:
select += f"toString({column_name})"
Expand Down Expand Up @@ -195,15 +195,15 @@ def get_table_checksum_query(conn, table):
return (query, select, order_by_columns, external_column_types)


def select_table_statements(table, query, select_query, order_by, external_column_types):
def select_table_statements(table, query, select_query, order_by, external_column_types, _where):
statements = []
external_table_name = args.clickhouse_database+"."+table
limit = ""
if args.debug_limit:
limit = " limit "+args.debug_limit
where = "1=1"
if args.where:
where = args.where
if _where:
where = _where

# skip deleted rows
if args.sign_column != '':
Expand All @@ -227,7 +227,7 @@ def select_table_statements(table, query, select_query, order_by, external_colum
) as t""".format(select_query=select_query, schema=args.clickhouse_database, table=table, where=where, order_by=order_by, limit=limit)

if args.debug_output:
sql = """select {select_query} as "hash" from {schema}.{table} final where {where} order by {order_by} {limit}""".format(
sql = """select {select_query} as "hash" from {schema}.{table} final where {where} {limit} settings do_not_merge_across_partitions_select_final=1""".format(
select_query=select_query, schema=args.clickhouse_database, table=table, where=where, order_by=order_by, limit=limit)
statements.append(sql)
return statements
Expand All @@ -246,7 +246,7 @@ def get_tables_from_regex(conn):
return x


def calculate_checksum(table, clickhouse_user, clickhouse_password):
def calculate_checksum(table, clickhouse_user, clickhouse_password, where):
if args.ignore_tables_regex:
rex_ignore_tables = re.compile(args.ignore_tables_regex, re.IGNORECASE)
if rex_ignore_tables.match(table):
Expand All @@ -262,8 +262,8 @@ def calculate_checksum(table, clickhouse_user, clickhouse_password):

# we need to count the values in CH first
sql = "select count(*) cnt from "+args.clickhouse_database+"."+table
if args.where:
sql = sql + " where " + args.where
if where:
sql = sql + " where " + where

conn = get_connection(clickhouse_user, clickhouse_password)
(rowset, rowcount) = execute_sql(conn, sql)
Expand All @@ -277,7 +277,7 @@ def calculate_checksum(table, clickhouse_user, clickhouse_password):
(query, select_query, distributed_by,
external_table_types) = get_table_checksum_query(conn, table)
statements = select_table_statements(
table, query, select_query, distributed_by, external_table_types)
table, query, select_query, distributed_by, external_table_types, where)
compute_checksum(table, clickhouse_user, clickhouse_password, statements)


Expand Down Expand Up @@ -378,7 +378,7 @@ def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for table in tables:
futures.append(executor.submit(calculate_checksum, table[0], clickhouse_user, clickhouse_password))
futures.append(executor.submit(calculate_checksum, table[0], clickhouse_user, clickhouse_password, args.where))
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
Expand Down
Loading

0 comments on commit 883d273

Please sign in to comment.