From 8fd01ec121c378b54bbc2b93eb3749826b8d4f56 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Tue, 2 Jan 2024 16:23:21 +0800 Subject: [PATCH] fix: non-default database support when archive table --- service/clickhouse/archive.go | 16 ++++++++-------- service/clickhouse/archive_hdfs.go | 2 +- service/clickhouse/archive_local.go | 2 +- service/clickhouse/archive_s3.go | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/service/clickhouse/archive.go b/service/clickhouse/archive.go index ef667ec9..cf8d6f28 100644 --- a/service/clickhouse/archive.go +++ b/service/clickhouse/archive.go @@ -205,7 +205,7 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err var rowsCnt uint64 var compressed uint64 // get size-per-row - if rowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM %s", table)); err != nil { + if rowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM `%s`.`%s`", p.Database, table)); err != nil { return } if rowsCnt == 0 { @@ -228,14 +228,14 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err colName := p.PattInfo[table][0] colType := p.PattInfo[table][1] var totalRowsCnt uint64 - if totalRowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM %s WHERE `%s`>=%s AND `%s`<%s", table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))); err != nil { + if totalRowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s", p.Database, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))); err != nil { return } tblEstSize := totalRowsCnt * uint64(sizePerRow) - log.Logger.Infof("host %s: totol rows to export: %d, estimated size (in bytes): %d", host, totalRowsCnt, tblEstSize) + log.Logger.Infof("host %s: total rows to export: %d, estimated size (in bytes): %d", host, totalRowsCnt, tblEstSize) atomic.AddUint64(&p.EstSize, tblEstSize) - sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM %s WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot" + sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot" var tryIntervals []string if colType == "Date" { // remove 4 hour, 1 hour @@ -246,7 +246,7 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err for i, interval := range tryIntervals { slots = slots[:0] var rows1 *common.Rows - query1 := fmt.Sprintf(sqlTmpl3, colName, interval, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType)) + query1 := fmt.Sprintf(sqlTmpl3, colName, interval, p.Database, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType)) log.Logger.Infof("host %s: query: %s", host, query1) if rows1, err = conn.Query(query1); err != nil { err = errors.Wrapf(err, "") @@ -285,9 +285,9 @@ func (p *ArchiveParams) ExportSlot(host, table string, seq int, slotBeg, slotEnd p.TmpTables = append(p.TmpTables, tmpTbl) for _, engine := range engines { queries := []string{ - fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl), - fmt.Sprintf("CREATE TABLE %s AS %s ENGINE=%s", tmpTbl, table, engine), - fmt.Sprintf("INSERT INTO %s SELECT * FROM %s WHERE `%s`>=%s AND `%s`<%s", tmpTbl, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)), + fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", p.Database, tmpTbl), + fmt.Sprintf("CREATE TABLE `%s`.`%s` AS `%s`.`%s` ENGINE=%s", p.Database, tmpTbl, p.Database, table, engine), + fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s", p.Database, tmpTbl, p.Database, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)), } conn := common.GetConnection(host) if conn == nil { diff --git a/service/clickhouse/archive_hdfs.go b/service/clickhouse/archive_hdfs.go index dfc4f563..f272e1d1 100644 --- a/service/clickhouse/archive_hdfs.go +++ b/service/clickhouse/archive_hdfs.go @@ -165,7 +165,7 @@ func (t *TargetHdfs) Done(_ string) { for _, host := range t.Hosts { conn := common.GetConnection(host) for _, tmpTbl := range t.TmpTables { - query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl) + query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl) conn.Exec(query) } } diff --git a/service/clickhouse/archive_local.go b/service/clickhouse/archive_local.go index 77c982ca..ea921921 100644 --- a/service/clickhouse/archive_local.go +++ b/service/clickhouse/archive_local.go @@ -183,7 +183,7 @@ func (t *TargetLocal) Done(fp string) { for _, host := range t.Hosts { conn := common.GetConnection(host) for _, tmpTbl := range t.TmpTables { - query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl) + query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl) conn.Exec(query) } } diff --git a/service/clickhouse/archive_s3.go b/service/clickhouse/archive_s3.go index c0d000bf..5d7baafa 100644 --- a/service/clickhouse/archive_s3.go +++ b/service/clickhouse/archive_s3.go @@ -195,7 +195,7 @@ func (t *TargetS3) Done(_ string) { for _, host := range t.Hosts { conn := common.GetConnection(host) for _, tmpTbl := range t.TmpTables { - query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl) + query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl) conn.Exec(query) } }