diff --git a/output/clickhouse.go b/output/clickhouse.go index 939e41e7..ba012408 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -118,9 +118,13 @@ func (c *ClickHouse) writeSeries(rows model.Rows, conn *sql.DB) (err error) { } c.mux.Unlock() if len(seriesRows) != 0 { - if err = writeRows(c.promSerSQL, seriesRows, c.IdxSerID, len(c.Dims), conn); err != nil { + var numBad int + if numBad, err = writeRows(c.promSerSQL, seriesRows, c.IdxSerID, len(c.Dims), conn); err != nil { return } + if numBad != 0 { + statistics.ParseMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(numBad)) + } } return } @@ -143,9 +147,13 @@ func (c *ClickHouse) write(batch *model.Batch, sc *pool.ShardConn, dbVer *int) ( return } } - if err = writeRows(c.prepareSQL, *batch.Rows, 0, numDims, conn); err != nil { + var numBad int + if numBad, err = writeRows(c.prepareSQL, *batch.Rows, 0, numDims, conn); err != nil { return } + if numBad != 0 { + statistics.ParseMsgsErrorTotal.WithLabelValues(c.taskCfg.Name).Add(float64(numBad)) + } statistics.FlushMsgsTotal.WithLabelValues(c.taskCfg.Name).Add(float64(batch.RealSize)) return } diff --git a/output/clickhouse_util.go b/output/clickhouse_util.go index 15656f7c..0662d33a 100644 --- a/output/clickhouse_util.go +++ b/output/clickhouse_util.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/ClickHouse/clickhouse-go" + "github.com/RoaringBitmap/roaring" "github.com/housepower/clickhouse_sinker/model" "github.com/housepower/clickhouse_sinker/pool" "github.com/housepower/clickhouse_sinker/util" @@ -28,9 +29,10 @@ func shouldReconnect(err error, sc *pool.ShardConn) bool { return true } -func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *sql.DB) (err error) { +func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *sql.DB) (numBad int, err error) { var stmt *sql.Stmt var tx *sql.Tx + var errExec error if tx, err = conn.Begin(); err != nil { err = errors.Wrapf(err, "conn.Begin %s", prepareSQL) return @@ -40,15 +42,48 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn *s return } defer stmt.Close() - for _, row := range rows { + var bmBad *roaring.Bitmap + for i, row := range rows { if _, err = stmt.Exec((*row)[idxBegin:idxEnd]...); err != nil { - err = errors.Wrapf(err, "stmt.Exec") - break + if bmBad == nil { + errExec = errors.Wrapf(err, "stmt.Exec") + bmBad = roaring.NewBitmap() + } + bmBad.AddInt(i) } } - if err != nil { + if errExec != nil { + stmt.Close() _ = tx.Rollback() - return err + numBad = int(bmBad.GetCardinality()) + util.Logger.Warn(fmt.Sprintf("writeRows skipped %d rows of %d due to invalid content", numBad, len(rows)), zap.Error(errExec)) + // write rows again, skip bad ones + if tx, err = conn.Begin(); err != nil { + err = errors.Wrapf(err, "conn.Begin %s", prepareSQL) + return + } + if stmt, err = tx.Prepare(prepareSQL); err != nil { + err = errors.Wrapf(err, "tx.Prepare %s", prepareSQL) + return + } + defer stmt.Close() + for i, row := range rows { + if !bmBad.ContainsInt(i) { + if _, err = stmt.Exec((*row)[idxBegin:idxEnd]...); err != nil { + err = errors.Wrapf(err, "stmt.Exec") + break + } + } + } + if err != nil { + _ = tx.Rollback() + return + } + if err = tx.Commit(); err != nil { + err = errors.Wrapf(err, "tx.Commit") + return + } + return } if err = tx.Commit(); err != nil { err = errors.Wrapf(err, "tx.Commit")