From 9d18d7a4248d267bbf5ad57c831fbf7a252d4695 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Fri, 24 May 2024 13:16:29 +0800 Subject: [PATCH] fix: clickhouse not support alter_sync before v23.3 --- common/ck.go | 7 ++++ service/clickhouse/clickhouse_service.go | 26 +++++++------- service/cron/clickhouse.go | 44 +++++++++++++++--------- 3 files changed, 47 insertions(+), 30 deletions(-) diff --git a/common/ck.go b/common/ck.go index 968b1bba..a17f1e0f 100644 --- a/common/ck.go +++ b/common/ck.go @@ -391,3 +391,10 @@ func Execute(conf *model.CKManClickHouseConfig, sql string) error { return lastErr } + +func WithAlterSync(version string) string { + if CompareClickHouseVersion(version, "23.3") >= 0 { + return "SETTINGS alter_sync = 0" + } + return "" +} diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 47eb3592..ad8a88d6 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -439,11 +439,11 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error { for _, value := range params.Add { add := "" if value.After != "" { - add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s AFTER `%s` SETTINGS alter_sync = 0", - params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), value.After) + add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s AFTER `%s` %s", + params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), value.After, common.WithAlterSync(ck.Config.Version)) } else { - add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s SETTINGS alter_sync = 0", - params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " ")) + add = fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` ADD COLUMN IF NOT EXISTS `%s` %s %s %s", + params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), common.WithAlterSync(ck.Config.Version)) } log.Logger.Debugf(add) if err := ck.Conn.Exec(add); err != nil { @@ -461,8 +461,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error { rows.Close() } - modify := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY COLUMN IF EXISTS `%s` %s %s SETTINGS alter_sync = 0", - params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " ")) + modify := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY COLUMN IF EXISTS `%s` %s %s %s", + params.DB, local, params.Cluster, value.Name, value.Type, strings.Join(value.Options, " "), common.WithAlterSync(ck.Config.Version)) log.Logger.Debugf(modify) if err := ck.Conn.Exec(modify); err != nil { return errors.Wrap(err, "") @@ -471,8 +471,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error { // delete column for _, value := range params.Drop { - drop := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` DROP COLUMN IF EXISTS `%s` SETTINGS alter_sync = 0", - params.DB, local, params.Cluster, value) + drop := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` DROP COLUMN IF EXISTS `%s` %s", + params.DB, local, params.Cluster, value, common.WithAlterSync(ck.Config.Version)) log.Logger.Debugf(drop) if err := ck.Conn.Exec(drop); err != nil { return errors.Wrap(err, "") @@ -484,8 +484,8 @@ func (ck *CkService) AlterTable(params *model.AlterCkTableParams) error { if value.From == "" || value.To == "" { return errors.Errorf("form %s or to %s must not be empty", value.From, value.To) } - rename := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` RENAME COLUMN IF EXISTS `%s` TO `%s` SETTINGS alter_sync = 0", - params.DB, local, params.Cluster, value.From, value.To) + rename := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` RENAME COLUMN IF EXISTS `%s` TO `%s` %s", + params.DB, local, params.Cluster, value.From, value.To, common.WithAlterSync(ck.Config.Version)) log.Logger.Debugf(rename) if err := ck.Conn.Exec(rename); err != nil { @@ -590,14 +590,14 @@ func (ck *CkService) AlterTableTTL(req *model.AlterTblsTTLReq) error { if req.TTLType != "" { if req.TTLType == model.TTLTypeModify { if req.TTLExpr != "" { - ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY TTL %s SETTINGS alter_sync = 0", table.Database, local, ck.Config.Cluster, req.TTLExpr) + ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` MODIFY TTL %s %s", table.Database, local, ck.Config.Cluster, req.TTLExpr, common.WithAlterSync(ck.Config.Version)) log.Logger.Debugf(ttl) if err := ck.Conn.Exec(ttl); err != nil { return errors.Wrap(err, "") } } } else if req.TTLType == model.TTLTypeRemove { - ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` REMOVE TTL SETTINGS alter_sync = 0", table.Database, local, ck.Config.Cluster) + ttl := fmt.Sprintf("ALTER TABLE `%s`.`%s` ON CLUSTER `%s` REMOVE TTL %s", table.Database, local, ck.Config.Cluster, common.WithAlterSync(ck.Config.Version)) log.Logger.Debugf(ttl) if err := ck.Conn.Exec(ttl); err != nil { return errors.Wrap(err, "") @@ -1987,7 +1987,7 @@ func MoveExceptToOthers(conf *model.CKManClickHouseConfig, except, target, datab if err != nil { return err } - query = fmt.Sprintf("TRUNCATE TABLE `%s`.`%s` SETTINGS alter_sync = 0", database, table) + query = fmt.Sprintf("TRUNCATE TABLE `%s`.`%s` %s", database, table, common.WithAlterSync(conf.Version)) log.Logger.Debugf("[%s] %s", except, query) conn = common.GetConnection(except) err = conn.Exec(query) diff --git a/service/cron/clickhouse.go b/service/cron/clickhouse.go index ca3a894c..2ebbec1f 100644 --- a/service/cron/clickhouse.go +++ b/service/cron/clickhouse.go @@ -161,17 +161,17 @@ func syncLogicbyTable(clusters []string, database, localTable string) error { columnsExpr := strings.Join(columns, ",") // local table onCluster := fmt.Sprintf("ON CLUSTER `%s`", cluster) - if err = alterTable(ckService.Conn, database, localTable, onCluster, columnsExpr); err != nil { + if err = alterTable(ckService.Conn, database, localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil { return err } // distributed table - if err = alterTable(ckService.Conn, database, common.ClickHouseDistributedTablePrefix+localTable, onCluster, columnsExpr); err != nil { + if err = alterTable(ckService.Conn, database, common.ClickHouseDistributedTablePrefix+localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil { return err } // logic table - if err = alterTable(ckService.Conn, database, common.ClickHouseDistTableOnLogicPrefix+localTable, onCluster, columnsExpr); err != nil { + if err = alterTable(ckService.Conn, database, common.ClickHouseDistTableOnLogicPrefix+localTable, onCluster, columnsExpr, ckService.Config.Version); err != nil { return err } @@ -233,13 +233,15 @@ func SyncDistSchema() error { if err != nil { continue } - query := `SELECT + query := fmt.Sprintf(`SELECT database, name, (extractAllGroups(engine_full, '(Distributed\\(\')(.*)\',\\s+\'(.*)\',\\s+\'(.*)\'(.*)')[1])[2] AS cluster, (extractAllGroups(engine_full, '(Distributed\\(\')(.*)\',\\s+\'(.*)\',\\s+\'(.*)\'(.*)')[1])[4] AS local FROM system.tables -WHERE match(engine, 'Distributed') AND (database NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA'))` +WHERE match(engine, 'Distributed') AND (database NOT IN ('system', 'information_schema', 'INFORMATION_SCHEMA')) +AND cluster = '%s'`, conf.Cluster) + log.Logger.Debugf("[%s]%s", conf.Cluster, query) rows, err := ckService.Conn.Query(query) if err != nil { continue @@ -293,7 +295,7 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick if needAlter { log.Logger.Debugf("need alter table, table %s.%s have different columns on cluster %s", database, localTable, conf.Cluster) for host, cols := range tableLists { - if err := syncSchema(dbLists[host], allCols, cols, database, localTable, ""); err != nil { + if err := syncSchema(dbLists[host], allCols, cols, database, localTable, "", conf.Version); err != nil { return err } } @@ -310,8 +312,8 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick return errors.Wrap(err, host) } onCluster := fmt.Sprintf("ON CLUSTER %s", conf.Cluster) - if err = syncSchema(conn, allCols, distCols, database, distTable, onCluster); err != nil { - return errors.Wrap(err, host) + if err = syncSchema(conn, allCols, distCols, database, distTable, onCluster, conf.Version); err != nil { + return errors.Wrap(err, "dist table") } logicTable := common.ClickHouseDistTableOnLogicPrefix + localTable @@ -328,8 +330,16 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick return errors.Wrap(err, host) } - if err = syncSchema(conn, allCols, logicCols, database, logicTable, onCluster); err != nil { - return errors.Wrap(err, host) + if err = syncSchema(conn, allCols, logicCols, database, logicTable, onCluster, conf.Version); err != nil { + err = common.ClikHouseExceptionDecode(err) + var exception *client.Exception + if errors.As(err, &exception) { + // 逻辑表不存在没关系,不报错 + if exception.Code == 60 { + continue + } + } + return errors.Wrap(err, "logic table") } } @@ -347,11 +357,11 @@ func initCKConns(conf model.CKManClickHouseConfig) (err error) { return } -func alterTable(conn *common.Conn, database, table, onCluster, col string) error { +func alterTable(conn *common.Conn, database, table, onCluster, col, version string) error { query := fmt.Sprintf("ALTER TABLE `%s`.`%s` %s %s", database, table, onCluster, col) if onCluster != "" { - query += "SETTINGS alter_sync = 0" + query += " " + common.WithAlterSync(version) } log.Logger.Debug(query) return conn.Exec(query) @@ -375,20 +385,20 @@ func getColumns(conn *common.Conn, database, table string) (common.Map, error) { return tblMap, nil } -func syncSchema(conn *common.Conn, allCols, cols common.Map, database, table, oncluster string) error { +func syncSchema(conn *common.Conn, allCols, cols common.Map, database, table, oncluster, version string) error { needAdds := allCols.Difference(cols).(common.Map) var columns []string for k, v := range needAdds { columns = append(columns, fmt.Sprintf("ADD COLUMN IF NOT EXISTS `%s` %s ", k, v)) } - // 当前节点是全量的列, 无需更新 - if len(columns) == 0 { + // 当前节点是全量的列, 无需更新, columns 和allCols相等, 说明有表不存在,也不管了 + if len(columns) == 0 || len(columns) == len(allCols) { return nil } // local table - if err := alterTable(conn, database, table, oncluster, strings.Join(columns, ",")); err != nil { - return err + if err := alterTable(conn, database, table, oncluster, strings.Join(columns, ","), version); err != nil { + return errors.Wrapf(err, table) } return nil }