Skip to content

Commit

Permalink
replaced roaring64 with map
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Jul 13, 2022
1 parent 53f4e1c commit 4257d32
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/housepower/clickhouse_sinker/config"
"github.com/housepower/clickhouse_sinker/model"
"github.com/housepower/clickhouse_sinker/pool"
Expand Down Expand Up @@ -65,7 +64,7 @@ type ClickHouse struct {
distMetricTbls []string
distSeriesTbls []string

bmSeries *roaring64.Bitmap
bmSeries map[uint64]bool
numFlying int32
mux sync.Mutex
taskDone *sync.Cond
Expand Down Expand Up @@ -117,9 +116,10 @@ func (c *ClickHouse) writeSeries(rows model.Rows, conn clickhouse.Conn) (err err
var seriesRows model.Rows
c.mux.Lock()
for _, row := range rows {
mgmtID := (*row)[c.IdxSerID+1].(int64)
if c.bmSeries.CheckedAdd(uint64(mgmtID)) {
mgmtID := uint64((*row)[c.IdxSerID+1].(int64))
if _, found := c.bmSeries[mgmtID]; !found {
seriesRows = append(seriesRows, row)
c.bmSeries[mgmtID] = true
}
}
c.mux.Unlock()
Expand Down Expand Up @@ -208,7 +208,7 @@ func (c *ClickHouse) initBmSeries(conn clickhouse.Conn) (err error) {
if c.cfg.Clickhouse.Cluster != "" {
tbl = c.distSeriesTbls[0]
}
c.bmSeries = roaring64.New()
c.bmSeries = make(map[uint64]bool)
if !c.taskCfg.LoadSeriesAtStartup {
util.Logger.Info(fmt.Sprintf("skipped loading series from %v", tbl), zap.String("task", c.taskCfg.Name))
return
Expand All @@ -227,9 +227,9 @@ func (c *ClickHouse) initBmSeries(conn clickhouse.Conn) (err error) {
err = errors.Wrapf(err, "")
return err
}
c.bmSeries.Add(mgmtID)
c.bmSeries[mgmtID] = true
}
util.Logger.Info(fmt.Sprintf("loaded %d series from %v", c.bmSeries.GetCardinality(), tbl), zap.String("task", c.taskCfg.Name))
util.Logger.Info(fmt.Sprintf("loaded %d series from %v", len(c.bmSeries), tbl), zap.String("task", c.taskCfg.Name))
return
}

Expand Down

0 comments on commit 4257d32

Please sign in to comment.