Skip to content

Commit

Permalink
fix: non-default database support when archive table
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Jan 4, 2024
1 parent fd2ce46 commit 8fd01ec
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
16 changes: 8 additions & 8 deletions service/clickhouse/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err
var rowsCnt uint64
var compressed uint64
// get size-per-row
if rowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM %s", table)); err != nil {
if rowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM `%s`.`%s`", p.Database, table)); err != nil {
return
}
if rowsCnt == 0 {
Expand All @@ -228,14 +228,14 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err
colName := p.PattInfo[table][0]
colType := p.PattInfo[table][1]
var totalRowsCnt uint64
if totalRowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM %s WHERE `%s`>=%s AND `%s`<%s", table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))); err != nil {
if totalRowsCnt, err = p.SelectUint64(host, fmt.Sprintf("SELECT count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s", p.Database, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))); err != nil {
return
}
tblEstSize := totalRowsCnt * uint64(sizePerRow)
log.Logger.Infof("host %s: totol rows to export: %d, estimated size (in bytes): %d", host, totalRowsCnt, tblEstSize)
log.Logger.Infof("host %s: total rows to export: %d, estimated size (in bytes): %d", host, totalRowsCnt, tblEstSize)
atomic.AddUint64(&p.EstSize, tblEstSize)

sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM %s WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot"
sqlTmpl3 := "SELECT toStartOfInterval(`%s`, INTERVAL %s) AS slot, count() FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s GROUP BY slot ORDER BY slot"
var tryIntervals []string
if colType == "Date" {
// remove 4 hour, 1 hour
Expand All @@ -246,7 +246,7 @@ func (p *ArchiveParams) GetSlots(host, table string) (slots []time.Time, err err
for i, interval := range tryIntervals {
slots = slots[:0]
var rows1 *common.Rows
query1 := fmt.Sprintf(sqlTmpl3, colName, interval, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))
query1 := fmt.Sprintf(sqlTmpl3, colName, interval, p.Database, table, colName, formatDate(p.Begin, colType), colName, formatDate(p.End, colType))
log.Logger.Infof("host %s: query: %s", host, query1)
if rows1, err = conn.Query(query1); err != nil {
err = errors.Wrapf(err, "")
Expand Down Expand Up @@ -285,9 +285,9 @@ func (p *ArchiveParams) ExportSlot(host, table string, seq int, slotBeg, slotEnd
p.TmpTables = append(p.TmpTables, tmpTbl)
for _, engine := range engines {
queries := []string{
fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl),
fmt.Sprintf("CREATE TABLE %s AS %s ENGINE=%s", tmpTbl, table, engine),
fmt.Sprintf("INSERT INTO %s SELECT * FROM %s WHERE `%s`>=%s AND `%s`<%s", tmpTbl, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)),
fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", p.Database, tmpTbl),
fmt.Sprintf("CREATE TABLE `%s`.`%s` AS `%s`.`%s` ENGINE=%s", p.Database, tmpTbl, p.Database, table, engine),
fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE `%s`>=%s AND `%s`<%s", p.Database, tmpTbl, p.Database, table, colName, formatTimestamp(slotBeg, colType), colName, formatTimestamp(slotEnd, colType)),
}
conn := common.GetConnection(host)
if conn == nil {
Expand Down
2 changes: 1 addition & 1 deletion service/clickhouse/archive_hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (t *TargetHdfs) Done(_ string) {
for _, host := range t.Hosts {
conn := common.GetConnection(host)
for _, tmpTbl := range t.TmpTables {
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl)
query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl)
conn.Exec(query)
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/clickhouse/archive_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (t *TargetLocal) Done(fp string) {
for _, host := range t.Hosts {
conn := common.GetConnection(host)
for _, tmpTbl := range t.TmpTables {
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl)
query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl)
conn.Exec(query)
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/clickhouse/archive_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (t *TargetS3) Done(_ string) {
for _, host := range t.Hosts {
conn := common.GetConnection(host)
for _, tmpTbl := range t.TmpTables {
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", tmpTbl)
query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", t.Database, tmpTbl)
conn.Exec(query)
}
}
Expand Down

0 comments on commit 8fd01ec

Please sign in to comment.