diff --git a/cmd/clickhouse_sinker/main.go b/cmd/clickhouse_sinker/main.go index 55a89181..a48e8d09 100644 --- a/cmd/clickhouse_sinker/main.go +++ b/cmd/clickhouse_sinker/main.go @@ -334,7 +334,7 @@ func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) { concurrentParsers = 1 } util.InitGlobalParsingPool(concurrentParsers) - totalConn := pool.GetTotalConn() + totalConn := pool.NumShard() util.InitGlobalWritingPool(totalConn) go s.task.Run(s.ctx) @@ -364,7 +364,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) { // 3. Restart goroutine pools. util.InitGlobalTimerWheel() util.GlobalParsingPool.Restart() - totalConn := pool.GetTotalConn() + totalConn := pool.NumShard() util.GlobalWritingPool.Resize(totalConn) util.GlobalWritingPool.Restart() diff --git a/go.sum b/go.sum index 406990ec..0e80d535 100644 --- a/go.sum +++ b/go.sum @@ -11,10 +11,8 @@ github.com/Shopify/sarama v1.27.2/go.mod h1:g5s5osgELxgM+Md9Qni9rzo7Rbt+vvFQI4bt github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 h1:zOVTBdCKFd9JbCKz9/nt+FovbjPFmb7mUnp8nH9fQBA= github.com/aliyun/alibaba-cloud-sdk-go v1.61.18/go.mod h1:v8ESoHo4SyHmuB4b1tJqDHxfTGEciD+yhvOU/5s1Rfk= @@ -32,7 +30,6 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= -github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -54,18 +51,12 @@ github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= -github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= -github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d h1:lBXNCxVENCipq4D1Is42JVOP4eQjlB8TQ6H69Yx5J9Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= @@ -86,9 +77,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -100,7 +89,6 @@ github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uc github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= -github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -110,21 +98,16 @@ github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46O github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -135,9 +118,7 @@ github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgU github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f/go.mod h1:UGmTpUd3rjbtfIpwAPrcfmGf/Z1HS95TATB+m57TPB8= github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042 h1:Bvq8AziQ5jFF4BHGAEDSqwPW1NJS3XshxbRCxtjFAZc= github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPpsiPUEh0zFL1Snz4crhMlBe60PYxRHr5oFF3rRYg0= -github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -147,7 +128,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223 h1:F9x/1yl3T2AeKLr2AMdilSD8+f9bvMnNN8VS5iDtovc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v1.0.1 h1:VNmXGlSS28xOmkO5Nxk5WRp6f1HMosAmG9pDtcnUFcw= github.com/nacos-group/nacos-sdk-go v1.0.1/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= @@ -180,12 +160,10 @@ github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLk github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ= github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/segmentio/kafka-go v0.4.8 h1:LO36H2tb7RcCRjsYzT/qf7xE+vRBXgddZDD82e1eiWY= github.com/segmentio/kafka-go v0.4.8/go.mod h1:Inh7PqOsxmfgasV8InZYKVXWsdjcCq2d9tFV75GLbuM= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -197,7 +175,6 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 h1:2MR0pKUzlP3SGgj5NYJe/zRYDwOu9ku6YHy+Iw7l5DM= github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -238,7 +215,6 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rB golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e h1:JgcxKXxCjrA2tyDP/aNU9K0Ck5Czfk6C7e2tMw7+bSI= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -250,7 +226,6 @@ golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -285,13 +260,11 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/output/clickhouse.go b/output/clickhouse.go index da633678..eda0358c 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -18,7 +18,6 @@ package output import ( "context" "database/sql" - std_errors "errors" "fmt" "io" "math" @@ -28,6 +27,7 @@ import ( "sync" "time" + "github.com/ClickHouse/clickhouse-go" "github.com/housepower/clickhouse_sinker/config" "github.com/housepower/clickhouse_sinker/model" "github.com/housepower/clickhouse_sinker/pool" @@ -40,6 +40,9 @@ import ( var ( selectSQLTemplate = `select name, type, default_kind from system.columns where database = '%s' and table = '%s'` lowCardinalityRegexp = regexp.MustCompile(`LowCardinality\((.+)\)`) + + // refers to src/Common/ErrorCodes.cpp, https://github.com/ClickHouse/ClickHouse/issues/24036 + replicaSpecificErrorCodes = []int32{1000} ) // ClickHouse is an output service consumers from kafka messages @@ -60,7 +63,7 @@ func NewClickHouse(cfg *config.Config) *ClickHouse { // Init the clickhouse intance func (c *ClickHouse) Init() (err error) { chCfg := &c.cfg.Clickhouse - if err = pool.InitConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password, chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify); err != nil { + if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password, chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify); err != nil { return } return c.initSchema() @@ -76,99 +79,104 @@ func (c *ClickHouse) Send(batch *model.Batch) { } // Write kvs to clickhouse -func (c *ClickHouse) write(batch *model.Batch) error { - var numErr int - var err, tmpErr error +func (c *ClickHouse) write(batch *model.Batch, reconnect bool) (err error) { var stmt *sql.Stmt var tx *sql.Tx if len(*batch.Rows) == 0 { - return nil + return } - conn := pool.GetConn(batch.BatchIdx) + conn := pool.GetShardConn(batch.BatchIdx) + if reconnect { + if err = conn.NextGoodReplica(); err != nil { + return + } + } if tx, err = conn.Begin(); err != nil { - goto ERR + err = errors.Wrapf(err, "conn.Begin") + return } if stmt, err = tx.Prepare(c.prepareSQL); err != nil { - goto ERR + err = errors.Wrapf(err, "tx.Prepare %s", c.prepareSQL) + return } defer stmt.Close() for _, row := range *batch.Rows { - if _, tmpErr = stmt.Exec(*row...); tmpErr != nil { - numErr++ - err = tmpErr + if _, err = stmt.Exec(*row...); err != nil { + err = errors.Wrapf(err, "stmt.Exec") + break } } if err != nil { - util.Logger.Error("stmt.Exec failed", zap.String("task", c.cfg.Task.Name), zap.Int("times", numErr), zap.Error(err)) - goto ERR + _ = tx.Rollback() + return err } - if err = tx.Commit(); err != nil { - goto ERR + err = errors.Wrapf(err, "tx.Commit") + return } statistics.FlushMsgsTotal.WithLabelValues(c.cfg.Task.Name).Add(float64(batch.RealSize)) - return err -ERR: - if shouldReconnect(err) { - _ = conn.ReConnect() - statistics.ClickhouseReconnectTotal.WithLabelValues(c.cfg.Task.Name).Inc() - } - return err + return } func shouldReconnect(err error) bool { - if err == nil { - return false - } - if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "bad connection") { - return true + var exp *clickhouse.Exception + if errors.As(err, &exp) { + util.Logger.Error("this is an exception from clickhouse-server", zap.Reflect("exception", exp)) + var replicaSpecific bool + for _, ec := range replicaSpecificErrorCodes { + if ec == exp.Code { + replicaSpecific = true + break + } + } + return replicaSpecific } - util.Logger.Info("this is a permanent error", zap.Error(err)) - return false + return true } // LoopWrite will dead loop to write the records func (c *ClickHouse) loopWrite(batch *model.Batch) { var err error var times int + var reconnect bool for { - if err = c.write(batch); err == nil { + if err = c.write(batch, reconnect); err == nil { if err = batch.Commit(); err == nil { return } - // TODO: kafka_go and sarama commit give different error when context is cancceled. - // How to unify them? - if std_errors.Is(err, context.Canceled) || std_errors.Is(err, io.ErrClosedPipe) { + // Note: kafka_go and sarama commit give different error when context is cancceled. + if errors.Is(err, context.Canceled) || errors.Is(err, io.ErrClosedPipe) { util.Logger.Info("ClickHouse.loopWrite quit due to the context has been cancelled", zap.String("task", c.cfg.Task.Name)) return } util.Logger.Fatal("committing offset failed with permanent error %+v", zap.String("task", c.cfg.Task.Name), zap.Error(err)) } - if std_errors.Is(err, context.Canceled) { + if errors.Is(err, context.Canceled) { util.Logger.Info("ClickHouse.loopWrite quit due to the context has been cancelled", zap.String("task", c.cfg.Task.Name)) return } - util.Logger.Error("flush batch failed", zap.String("task", c.cfg.Task.Name), zap.Int("try", c.cfg.Clickhouse.RetryTimes-times), zap.Error(err)) + util.Logger.Error("flush batch failed", zap.String("task", c.cfg.Task.Name), zap.Int("try", times), zap.Error(err)) statistics.FlushMsgsErrorTotal.WithLabelValues(c.cfg.Task.Name).Add(float64(batch.RealSize)) times++ - if shouldReconnect(err) && (c.cfg.Clickhouse.RetryTimes <= 0 || times < c.cfg.Clickhouse.RetryTimes) { - time.Sleep(10 * time.Second) - } else { + reconnect = shouldReconnect(err) + if reconnect && c.cfg.Clickhouse.RetryTimes > 0 && times >= c.cfg.Clickhouse.RetryTimes { util.Logger.Fatal("ClickHouse.loopWrite failed", zap.String("task", c.cfg.Task.Name)) + } else { + time.Sleep(10 * time.Second) } } } // Stop free clickhouse connections func (c *ClickHouse) Stop() error { - pool.FreeConn() + pool.FreeClusterConn() return nil } func (c *ClickHouse) initSchema() (err error) { if c.cfg.Task.AutoSchema { - conn := pool.GetConn(0) + conn := pool.GetShardConn(0) var rs *sql.Rows if rs, err = conn.Query(fmt.Sprintf(selectSQLTemplate, c.cfg.Clickhouse.DB, c.cfg.Task.TableName)); err != nil { err = errors.Wrapf(err, "") @@ -286,7 +294,7 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) { chCfg.Cluster, chCfg.DB, taskCfg.TableName)) } } - conn := pool.GetConn(0) + conn := pool.GetShardConn(0) for _, query := range queries { util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name)) if _, err = conn.Exec(query); err != nil { @@ -300,7 +308,7 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) { func (c *ClickHouse) getDistTbls() (distTbls []string, err error) { taskCfg := &c.cfg.Task chCfg := &c.cfg.Clickhouse - conn := pool.GetConn(0) + conn := pool.GetShardConn(0) query := fmt.Sprintf(`SELECT name FROM system.tables WHERE engine='Distributed' AND database='%s' AND match(create_table_query, 'Distributed.*\'%s\',\s*\'%s\'')`, chCfg.DB, chCfg.DB, taskCfg.TableName) util.Logger.Info(fmt.Sprintf("executing sql=> %s", query), zap.String("task", taskCfg.Name)) diff --git a/pool/conn.go b/pool/conn.go index 5958f2c7..79a03a38 100644 --- a/pool/conn.go +++ b/pool/conn.go @@ -23,7 +23,6 @@ import ( "fmt" "net/url" "strconv" - "strings" "sync" "time" @@ -40,34 +39,63 @@ const ( var ( lock sync.Mutex - connections []*Connection + clusterConn []*ShardConn + dsnTmpl string ) -// Connection a datastructure for storing the clickhouse connection -type Connection struct { +// ShardConn a datastructure for storing the clickhouse connection +type ShardConn struct { *sql.DB - dsn string + Dsn string + Replicas []string //ip:port list of replicas + NextRep int //index of next replica } -// ReConnect used for restablishing connection with server -func (c *Connection) ReConnect() error { - sqlDB, err := sql.Open("clickhouse", c.dsn) - if err != nil { - util.Logger.Debug("sql.Open failed", zap.String("dsn", c.dsn), zap.Error(err)) - return err +// NextGoodReplica connects to next good replica +func (c *ShardConn) NextGoodReplica() error { + if c.DB != nil { + if err := health.Health.RemoveReadinessCheck(c.Dsn); err != nil { + util.Logger.Warn("health.Health.RemoveReadinessCheck failed", zap.String("dsn", c.Dsn), zap.Error(err)) + } + c.DB.Close() + c.DB = nil } - setDBParams(sqlDB) - util.Logger.Debug("sql.Open succeeded", zap.String("dsn", c.dsn)) - c.DB = sqlDB - return nil + savedNextRep := c.NextRep + // try all replicas, including the current one + for i := 0; i < len(c.Replicas); i++ { + c.Dsn = fmt.Sprintf(dsnTmpl, c.Replicas[c.NextRep]) + c.NextRep = (c.NextRep + 1) % len(c.Replicas) + sqlDB, err := sql.Open("clickhouse", c.Dsn) + if err != nil { + util.Logger.Warn("sql.Open failed", zap.String("dsn", c.Dsn), zap.Error(err)) + continue + } + setDBParams(sqlDB) + util.Logger.Info("sql.Open succeeded", zap.String("dsn", c.Dsn)) + if err = health.Health.AddReadinessCheck(c.Dsn, healthcheck.DatabasePingCheck(sqlDB, 30*time.Second)); err != nil { + util.Logger.Warn("health.Health.RemoveReadinessCheck failed", zap.String("dsn", c.Dsn), zap.Error(err)) + } + c.DB = sqlDB + return nil + } + err := errors.Errorf("no good replica among replicas %v since %d", c.Replicas, savedNextRep) + return err } -func InitConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool) (err error) { - var sqlDB *sql.DB +func InitClusterConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool) (err error) { lock.Lock() defer lock.Unlock() - // Each shard has a *sql.DB which connects to all replicas inside the shard. - // "alt_hosts" tolerates replica single-point-failure. + // Each shard has a *sql.DB which connects to one replica inside the shard. + // "alt_hosts" tolerates replica single-point-failure. However more flexable switching is needed for some cases for example https://github.com/ClickHouse/ClickHouse/issues/24036. + dsnTmpl = "tcp://%s" + fmt.Sprintf("?database=%s&username=%s&password=%s&block_size=%d", + url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), BlockSize) + if dsnParams != "" { + dsnTmpl += "&" + dsnParams + } + if secure { + dsnTmpl += "&secure=true&skip_verify=" + strconv.FormatBool(skipVerify) + } + for _, replicas := range hosts { numReplicas := len(replicas) replicaAddrs := make([]string, numReplicas) @@ -77,28 +105,13 @@ func InitConn(hosts [][]string, port int, db, username, password, dsnParams stri } replicaAddrs[i] = fmt.Sprintf("%s:%d", ip, port) } - dsn := fmt.Sprintf("tcp://%s?database=%s&username=%s&password=%s&block_size=%d", - replicaAddrs[0], url.QueryEscape(db), url.QueryEscape(username), url.QueryEscape(password), BlockSize) - if numReplicas > 1 { - dsn += "&connection_open_strategy=in_order&alt_hosts=" + strings.Join(replicaAddrs[1:numReplicas], ",") - } - if dsnParams != "" { - dsn += "&" + dsnParams + sc := &ShardConn{ + Replicas: replicaAddrs, } - if secure { - dsn += "&secure=true&skip_verify=" + strconv.FormatBool(skipVerify) - } - util.Logger.Debug("sql.Open", zap.String("dsn", dsn)) - if sqlDB, err = sql.Open("clickhouse", dsn); err != nil { - err = errors.Wrapf(err, "") + if err = sc.NextGoodReplica(); err != nil { return } - setDBParams(sqlDB) - if err = health.Health.AddReadinessCheck(dsn, healthcheck.DatabasePingCheck(sqlDB, 30*time.Second)); err != nil { - err = errors.Wrapf(err, "") - return - } - connections = append(connections, &Connection{sqlDB, dsn}) + clusterConn = append(clusterConn, sc) } return } @@ -111,33 +124,35 @@ func setDBParams(sqlDB *sql.DB) { sqlDB.SetConnMaxIdleTime(10 * time.Second) } -func FreeConn() { +func FreeClusterConn() { lock.Lock() defer lock.Unlock() - for _, conn := range connections { - if err := health.Health.RemoveReadinessCheck(conn.dsn); err != nil { - util.Logger.Error("health.Health.RemoveReadinessCheck failed", zap.Error(err)) + for _, sc := range clusterConn { + if sc.DB != nil { + if err := health.Health.RemoveReadinessCheck(sc.Dsn); err != nil { + util.Logger.Error("health.Health.RemoveReadinessCheck failed", zap.String("dsn", sc.Dsn), zap.Error(err)) + } + sc.DB.Close() } - conn.DB.Close() } - connections = []*Connection{} + clusterConn = []*ShardConn{} } -func GetTotalConn() (cnt int) { +func NumShard() (cnt int) { lock.Lock() defer lock.Unlock() - return len(connections) + return len(clusterConn) } -// GetConn select a clickhouse node from the cluster based on batchNum -func GetConn(batchNum int64) (con *Connection) { +// GetShardConn select a clickhouse shard based on batchNum +func GetShardConn(batchNum int64) (con *ShardConn) { lock.Lock() defer lock.Unlock() - con = connections[batchNum%int64(len(connections))] + con = clusterConn[batchNum%int64(len(clusterConn))] return } // CloseAll closed all connection and destroys the pool func CloseAll() { - FreeConn() + FreeClusterConn() } diff --git a/task/sharding.go b/task/sharding.go index 6fb40849..5de656ca 100644 --- a/task/sharding.go +++ b/task/sharding.go @@ -113,7 +113,7 @@ type Sharder struct { func NewSharder(service *Service) (sh *Sharder, err error) { var policy *ShardingPolicy - ckNum := pool.GetTotalConn() + ckNum := pool.NumShard() taskCfg := &service.cfg.Task if policy, err = NewShardingPolicy(taskCfg.ShardingKey, taskCfg.ShardingPolicy, service.clickhouse.Dms, ckNum); err != nil { return