Skip to content

Commit

Permalink
Merge pull request #338 from aadant/fixing_temporal_types
Browse files Browse the repository at this point in the history
clickhouse_loader.py : fixing temporal and binary data types
  • Loading branch information
aadant authored Nov 3, 2023
2 parents 6b61c6e + f335905 commit e203792
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
9 changes: 6 additions & 3 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def get_table_checksum_query(table):
excluded_columns = "','".join(args.exclude_columns)
excluded_columns = [f'{column}' for column in excluded_columns.split(',')]
#excluded_columns = "'"+excluded_columns+"'"
#logging.info(f"Excluded columns, {excluded_columns}")
logging.info(f"Excluded columns, {excluded_columns}")
excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns))
checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position"

Expand Down Expand Up @@ -197,6 +197,8 @@ def get_table_checksum_query(table):
# requires this function : CREATE OR REPLACE FUNCTION format_decimal AS (x, scale) -> if(locate(toString(x),'.')>0,concat(toString(x),repeat('0',toUInt8(scale-(length(toString(x))-locate(toString(x),'.'))))),concat(toString(x),'.',repeat('0',toUInt8(scale))))
select += "format_decimal("+column_name + \
","+str(numeric_scale)+")"
elif "DateTime64(0)" == data_type:
select += f"toString({column_name})"
elif "DateTime" in data_type:
select += f"trim(TRAILING '.' from (trim(TRAILING '0' FROM toString({column_name}))))"
else:
Expand Down Expand Up @@ -255,7 +257,8 @@ def select_table_statements(table, query, select_query, order_by, external_colum
where = args.where

# skip deleted rows
where+= f" and {args.sign_column} > 0 "
if args.sign_column != '':
where+= f" and {args.sign_column} > 0 "

sql = """ select
count(*) as "cnt",
Expand Down Expand Up @@ -380,7 +383,7 @@ def main():
action='store_true', default=False)
# TODO change this to standard MaterializedMySQL columns https://github.com/Altinity/clickhouse-sink-connector/issues/78
parser.add_argument('--exclude_columns', help='columns exclude',
nargs='*', default=['_sign,_version'])
nargs='*', default=['_sign,_version,is_deleted'])
parser.add_argument('--threads', type=int,
help='number of parallel threads', default=1)

Expand Down
39 changes: 22 additions & 17 deletions sink-connector/python/db_compare/mysql_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def compute_checksum(table, statements, conn):
conn.close()


def get_table_checksum_query(table, conn):
def get_table_checksum_query(table, conn, binary_encoding):

(rowset, rowcount) = execute_mysql(conn, "select COLUMN_NAME as column_name, column_type as data_type, IS_NULLABLE as is_nullable from information_schema.columns where table_schema='" +
args.mysql_database+"' and table_name = '"+table+"' order by ordinal_position")
Expand All @@ -68,6 +68,8 @@ def get_table_checksum_query(table, conn):
nullables = []
data_types = {}
first_column = True
min_date_value = args.min_date_value
max_date_value = args.max_date_value
for row in rowset:
column_name = '`'+row['column_name']+'`'
data_type = row['data_type']
Expand All @@ -78,29 +80,26 @@ def get_table_checksum_query(table, conn):

if is_nullable == 'YES':
nullables.append(column_name)

if 'datetime' == data_type or 'datetime(1)'== data_type or 'datetime(2)' == data_type or 'datetime(3)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999' AS datetime(3))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999', 1, length({column_name})) then substr(TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999' AS datetime(3))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000' AS datetime(3)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'datetime(4)' == data_type or 'datetime(5)' == data_type or 'datetime(6)' == data_type:
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/
select += f"case when {column_name} > substr('2283-11-11 23:59:59.999999', 1, length({column_name})) then TRIM(TRAILING '0' FROM CAST('2283-11-11 23:59:59.999999' AS datetime(6))) else case when {column_name} <= '1925-01-01 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('1925-01-01 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
# CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime64/ii
select += f"case when {column_name} > substr('{max_date_value} 23:59:59.999999', 1, length({column_name})) then substr(TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{max_date_value} 23:59:59.999999' AS datetime(6)))),1,length({column_name})) else case when {column_name} <= '{min_date_value} 00:00:00' then TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM CAST('{min_date_value} 00:00:00.000000' AS datetime(6)))) else TRIM(TRAILING '.' FROM TRIM(TRAILING '0' FROM {column_name})) end end"
elif 'time' == data_type or 'time(1)' == data_type or 'time(2)' == data_type or 'time(3)' == data_type or 'time(4)' == data_type or 'time(5)' == data_type or 'time(6)' == data_type:
select += f"cast({column_name} as time(6))"
select += f"substr(cast({column_name} as time(6)),1,length({column_name}))"
elif 'timestamp' == data_type or 'timestamp(1)' == data_type or 'timestamp(2)' == data_type or 'timestamp(3)' == data_type or 'timestamp(4)' == data_type or 'timestamp(5)' == data_type or 'timestamp(6)' == data_type:
select += f"TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char))))"
#elif 'datetime' in data_type:
# # CH datetime range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/datetime/
# select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1970-01-01' then CAST('1925-01-01 00:00:00' AS {data_type}) else {column_name} end end"*/
# elif "float" in data_type:
# select += f"CAST({column_name} as DECIMAL(64,8))"
select += f"substr(TRIM(TRAILING '.' from (TRIM(TRAILING '0' from cast({column_name} as char)))),1,length({column_name}))"
else:
if 'date' == data_type:
if 'date' == data_type: # Date are converted to Date32 in CH
# CH date range is not the same as MySQL https://clickhouse.com/docs/en/sql-reference/data-types/date
select += f"case when {column_name} >='2283-11-11' then CAST('2283-11-11' AS {data_type}) else case when {column_name} <= '1925-01-01' then CAST('1925-01-01' AS {data_type}) else {column_name} end end"
select += f"case when {column_name} >='{max_date_value}' then CAST('{max_date_value}' AS {data_type}) else case when {column_name} <= '{min_date_value}' then CAST('{min_date_value}' AS {data_type}) else {column_name} end end"
else:
if is_binary_datatype(data_type):
select += "lower(hex(cast("+column_name+"as binary)))"
binary_encode = "lower(hex(cast("+column_name+"as binary)))"
if binary_encoding == 'base64':
binary_encode = "replace(to_base64(cast("+column_name+" as binary)),'\\n','')"
select += binary_encode
else:
select += column_name + ""
first_column = False
Expand Down Expand Up @@ -204,7 +203,7 @@ def calculate_sql_checksum(table):
statements = []

(query, select_query, distributed_by,
external_table_types) = get_table_checksum_query(table, conn)
external_table_types) = get_table_checksum_query(table, conn, args.binary_encoding)
statements = select_table_statements(
table, query, select_query, distributed_by, external_table_types)
compute_checksum(table, statements, conn)
Expand Down Expand Up @@ -239,7 +238,7 @@ def record_factory(*args, **kwargs):

def main():

parser = argparse.ArgumentParser(description='''
parser = argparse.ArgumentParser(description='''Compute a ClickHouse compatible checksum.
''')
# Required
parser.add_argument('--mysql_host', help='MySQL host', required=True)
Expand All @@ -261,6 +260,12 @@ def main():
help='Output the raw format to a file called out.txt', required=False)
parser.add_argument(
'--debug_limit', help='Limit the debug output in lines', required=False)
parser.add_argument(
'--binary_encoding', help='either hex or base64 to encode MySQL binary content', default='hex', required=False)
parser.add_argument(
'--min_date_value', help='Minimum Date32/DateTime64 date', default='1900-01-01', required=False)
parser.add_argument(
'--max_date_value', help='Maximum Date32/Datetime64 date', default='2299-12-31', required=False)
parser.add_argument('--debug', dest='debug',
action='store_true', default=False)
parser.add_argument('--exclude_columns', help='columns exclude',
Expand Down
14 changes: 11 additions & 3 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,10 +487,18 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
if structure != "":
structure += ", "
structure +=" "+column_name + " "
if column['nullable'] == True:
structure +=" Nullable(String)"
datatype = column['datatype']
mysql_datetype = column['mysql_datatype']
if 'timestamp' in mysql_datetype.lower():
if column['nullable'] == True:
structure +=f" Nullable({datatype})"
else:
structure +=f" {datatype}"
else:
structure +=" String"
if column['nullable'] == True:
structure +=" Nullable(String)"
else:
structure +=" String"

cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """
futures.append(executor.submit(execute_load, cmd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def translateColumnDefinition(self, column_name, columnDefinition):
# data type modifier (NULL / NOT NULL / PRIMARY KEY)
notNull = False
notSymbol = True
nullable = True
for child in columnDefinition.getChildren():
if child.getRuleIndex() == MySqlParser.RULE_columnConstraint:

Expand All @@ -65,19 +66,28 @@ def translateColumnDefinition(self, column_name, columnDefinition):
if nullNotNull:
text = self.extract_original_text(child)
column_buffer += " " + text
if "NULL" == text:
nullable = True
notNull = True
continue

if nullNotNull.NOT():
notSymbol = True
if nullNotNull.NULL_LITERAL() and notSymbol:
if (nullNotNull.NULL_LITERAL() or nullNotNull.NULL_SPEC_LITERAL()) and notSymbol:
notNull = True

nullable = False
else:
notNull = False
nullable = True

if isinstance(child, MySqlParser.PrimaryKeyColumnConstraintContext) and child.PRIMARY():
self.primary_key = column_name
# column without nullable info are default nullable in MySQL, while they are not null in ClickHouse
if not notNull:
column_buffer += " NULL"
nullable = True

return (column_buffer, dataType, not notNull)
return (column_buffer, dataType, nullable)


def exitColumnDeclaration(self, ctx):
Expand All @@ -90,14 +100,16 @@ def exitColumnDeclaration(self, ctx):

# columns have an identifier and a column definition
columnDefinition = ctx.columnDefinition()
dataType = columnDefinition.dataType()
originalDataTypeText = self.extract_original_text(dataType)

(columnDefinition_buffer, dataType, nullable) = self.translateColumnDefinition(column_name, columnDefinition)

column_buffer += columnDefinition_buffer

self.columns.append(column_buffer)
dataTypeText = self.convertDataType(dataType)
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable}
columnMap = {'column_name': column_name, 'datatype': dataTypeText, 'nullable': nullable, 'mysql_datatype':originalDataTypeText}
logging.info(str(columnMap))
self.columns_map.append(columnMap)

Expand Down

0 comments on commit e203792

Please sign in to comment.