Skip to content

Commit

Permalink
skip bad rows
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Nov 4, 2021
1 parent f7e85b9 commit 8690bd9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
12 changes: 10 additions & 2 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ func (c *ClickHouse) writeSeries(rows model.Rows, conn *sql.DB) (err error) {
}
c.mux.Unlock()
if len(seriesRows) != 0 {
if err = writeRows(c.promSerSQL, seriesRows, c.IdxSerID, len(c.Dims), conn); err != nil {
var numBad int
if numBad, err = writeRows(c.promSerSQL, seriesRows, c.IdxSerID, len(c.Dims), conn); err != nil {
return
}
if numBad != 0 {
statistics.ParseMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(numBad))
}
}
return
}
Expand All @@ -143,9 +147,13 @@ func (c *ClickHouse) write(batch *model.Batch, sc *pool.ShardConn, dbVer *int) (
return
}
}
if err = writeRows(c.prepareSQL, *batch.Rows, 0, numDims, conn); err != nil {
var numBad int
if numBad, err = writeRows(c.prepareSQL, *batch.Rows, 0, numDims, conn); err != nil {
return
}
if numBad != 0 {
statistics.ParseMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(numBad))
}
statistics.FlushMsgsTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize))
return
}
Expand Down
47 changes: 41 additions & 6 deletions output/clickhouse_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/ClickHouse/clickhouse-go"
"github.com/RoaringBitmap/roaring"
"github.com/housepower/clickhouse_sinker/model"
"github.com/housepower/clickhouse_sinker/pool"
"github.com/housepower/clickhouse_sinker/util"
Expand All @@ -28,9 +29,10 @@ func shouldReconnect(err error, sc *pool.ShardConn) bool {
return true
}

func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *sql.DB) (err error) {
func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *sql.DB) (numBad int, err error) {
var stmt *sql.Stmt
var tx *sql.Tx
var errExec error
if tx, err = conn.Begin(); err != nil {
err = errors.Wrapf(err, "conn.Begin %s", prepareSQL)
return
Expand All @@ -40,15 +42,48 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *s
return
}
defer stmt.Close()
for _, row := range rows {
var bmBad *roaring.Bitmap
for i, row := range rows {
if _, err = stmt.Exec((*row)[idxBegin:idxEnd]...); err != nil {
err = errors.Wrapf(err, "stmt.Exec")
break
if bmBad == nil {
errExec = errors.Wrapf(err, "stmt.Exec")
bmBad = roaring.NewBitmap()
}
bmBad.AddInt(i)
}
}
if err != nil {
if errExec != nil {
stmt.Close()
_ = tx.Rollback()
return err
numBad = int(bmBad.GetCardinality())
util.Logger.Warn(fmt.Sprintf("writeRows skipped %d rows of %d due to invalid content", numBad, len(rows)), zap.Error(errExec))
// write rows again, skip bad ones
if tx, err = conn.Begin(); err != nil {
err = errors.Wrapf(err, "conn.Begin %s", prepareSQL)
return
}
if stmt, err = tx.Prepare(prepareSQL); err != nil {
err = errors.Wrapf(err, "tx.Prepare %s", prepareSQL)
return
}
defer stmt.Close()
for i, row := range rows {
if !bmBad.ContainsInt(i) {
if _, err = stmt.Exec((*row)[idxBegin:idxEnd]...); err != nil {
err = errors.Wrapf(err, "stmt.Exec")
break
}
}
}
if err != nil {
_ = tx.Rollback()
return
}
if err = tx.Commit(); err != nil {
err = errors.Wrapf(err, "tx.Commit")
return
}
return
}
if err = tx.Commit(); err != nil {
err = errors.Wrapf(err, "tx.Commit")
Expand Down

0 comments on commit 8690bd9

Please sign in to comment.