Skip to content

Commit

Permalink
Merge pull request #375 from aadant/clickhouse_datetime_timezone
Browse files Browse the repository at this point in the history
Fix nullable fields and add clickhouse_datatime_timezone
  • Loading branch information
subkanthi authored Nov 14, 2023
2 parents ec42237 + 28b3855 commit 26fe9cf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
27 changes: 14 additions & 13 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def find_partitioning_options(source):
return partitioning_options


def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support):
def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone):

# do we have a table in the source

Expand Down Expand Up @@ -265,23 +265,23 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete
return (res, columns)


def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_support, use_regexp_parser):
def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_support, use_regexp_parser, datetime_timezone):
# do we have a table in the source
if not find_create_table(source):
return ('', [])

src = source
if use_regexp_parser:
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support)
#if use_regexp_parser == True:
# return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone)
# the progressive grammar trims the comment
partition_options = find_partitioning_options(source)

try:
return convert_to_clickhouse_table_antlr(src, rmt_delete_support, partition_options)
return convert_to_clickhouse_table_antlr(src, rmt_delete_support, partition_options, datetime_timezone)
except Exception as ex:
logging.info(f"Use regexp DDL converter")
logging.info(f"{ex}")
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support)
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone)


def get_unix_timezone_from_mysql_timezone(timezone):
Expand All @@ -304,10 +304,10 @@ def get_unix_timezone_from_mysql_timezone(timezone):
return tz


def load_schema(args, dry_run=False):
def load_schema(args, dry_run=False, datetime_timezone=None):

if args.mysqlshell:
return load_schema_mysqlshell(args, dry_run=dry_run)
return load_schema_mysqlshell(args, dry_run=dry_run, datetime_timezone=datetime_timezone)

schema_map = {}
# create database
Expand All @@ -334,7 +334,7 @@ def load_schema(args, dry_run=False):
with gzip.open(file, "r") as schema_file:
source = schema_file.read().decode('UTF-8')
logging.info(source)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone)
logging.info(table_source)
timezone = find_dump_timezone(source)
logging.info(f"Timezone {timezone}")
Expand All @@ -349,7 +349,7 @@ def load_schema(args, dry_run=False):
return (tz, schema_map)


def load_schema_mysqlshell(args, dry_run=False):
def load_schema_mysqlshell(args, dry_run=False, datetime_timezone=None):

schema_map = {}
# create database
Expand Down Expand Up @@ -380,7 +380,7 @@ def load_schema_mysqlshell(args, dry_run=False):
with open(file, "r") as schema_file:
source = schema_file.read()
logging.info(source)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone)
logging.info(table_source)
# timezone = find_dump_timezone(source)
logging.info(f"Timezone {timezone}")
Expand Down Expand Up @@ -562,7 +562,8 @@ def main():
action='store_true', default=False)
parser.add_argument('--rmt_delete_support', help='Use RMT deletes', dest='rmt_delete_support',
action='store_true', default=False)

parser.add_argument('--clickhouse_datetime_timezone',
help='Timezone for CH date times', required=False, default=None)
args = parser.parse_args()
schema = not args.data_only
data = not args.schema_only
Expand All @@ -576,7 +577,7 @@ def main():
'zstd'), "zstd should be in the PATH for util.dumpSchemas load"

if schema:
(timezone, schema_map) = load_schema(args, dry_run=args.dry_run)
(timezone, schema_map) = load_schema(args, dry_run=args.dry_run, datetime_timezone = args.clickhouse_datetime_timezone)
if data:
if timezone is None:
(timezone, schema_map) = load_schema(args, dry_run=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class CreateTableMySQLParserListener(MySqlParserListener):
def __init__(self, rmt_delete_support, partition_options):
def __init__(self, rmt_delete_support, partition_options, datetime_timezone=None):
self.buffer = ""
self.columns = ""
self.primary_key = ""
Expand All @@ -16,14 +16,18 @@ def __init__(self, rmt_delete_support, partition_options):
self.rename_list = []
self.rmt_delete_support = rmt_delete_support
self.partition_options = partition_options

self.datatime_timezone = datetime_timezone

def extract_original_text(self, ctx):
token_source = ctx.start.getTokenSource()
input_stream = token_source.inputStream
start, stop = ctx.start.start, ctx.stop.stop
return input_stream.getText(start, stop)

def add_timezone(self, dataTypeText):
if self.datatime_timezone is not None:
dataTypeText = dataTypeText[:-1]+",'"+self.datatime_timezone+"')"
return dataTypeText

def convertDataType(self, dataType):
dataTypeText = self.extract_original_text(dataType)
Expand All @@ -36,8 +40,10 @@ def convertDataType(self, dataType):
if isinstance(dataType, MySqlParser.DimensionDataTypeContext):
if dataType.DATETIME() or dataType.TIMESTAMP():
dataTypeText = 'DateTime64(0)'
dataTypeText = self.add_timezone(dataTypeText)
if dataType.lengthOneDimension():
dataTypeText = 'DateTime64'+dataType.lengthOneDimension().getText()
dataTypeText = self.add_timezone(dataTypeText)
elif dataType.TIME():
dataTypeText = "String"

Expand Down
4 changes: 2 additions & 2 deletions sink-connector/python/db_load/mysql_parser/mysql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
raise Exception(f"Syntax error at line {line} column {column}")


def convert_to_clickhouse_table_antlr(source, rmt_delete_support, partition_options=''):
def convert_to_clickhouse_table_antlr(source, rmt_delete_support, partition_options='', datetime_timezone=None):
columns = []
input_stream = InputStream(source)
lexer = MySqlLexer(input_stream)
stream = CommonTokenStream(lexer)
parser = MySqlParser(stream)
parser.addErrorListener( MyErrorListener() )
tree = parser.sqlStatements()
listener = CreateTableMySQLParserListener(rmt_delete_support, partition_options)
listener = CreateTableMySQLParserListener(rmt_delete_support, partition_options, datetime_timezone=datetime_timezone)
walker = ParseTreeWalker()
walker.walk(listener, tree)
logging.debug(Trees.toStringTree(tree, None, parser))
Expand Down

0 comments on commit 26fe9cf

Please sign in to comment.