From 6035c25fa9509313a230ad3a5d0625b1aa7b9e92 Mon Sep 17 00:00:00 2001 From: Vladimir Skipor Date: Tue, 20 Feb 2018 09:11:32 +0300 Subject: [PATCH] GH-85: fix buffering in json aggregator --- core/aggregator/encoder.go | 1 + core/aggregator/jsonlines.go | 17 ++++++++++++++--- examples/custom_pandora/custom.yaml | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/core/aggregator/encoder.go b/core/aggregator/encoder.go index 361eb733f..80860745a 100644 --- a/core/aggregator/encoder.go +++ b/core/aggregator/encoder.go @@ -127,6 +127,7 @@ HandleLoop: } case _ = <-flushTick: if previousFlushes == flushes { + a.Log.Debug("Flushing") err = encoder.Flush() if err != nil { return diff --git a/core/aggregator/jsonlines.go b/core/aggregator/jsonlines.go index cc613c817..3d11b5c32 100644 --- a/core/aggregator/jsonlines.go +++ b/core/aggregator/jsonlines.go @@ -6,6 +6,7 @@ package aggregator import ( + "bufio" "io" "github.com/json-iterator/go" @@ -54,16 +55,26 @@ func NewJSONEncoder(w io.Writer, conf JSONLineEncoderConfig) SampleEncoder { var apiConfig jsoniter.Config config.Map(&apiConfig, conf.JSONIterConfig) api := apiConfig.Froze() - stream := jsoniter.NewStream(api, w, conf.BufferSizeOrDefault()) - return jsonEncoder{stream} + // NOTE(skipor): internal buffering is not working really. Don't know why + // OPTIMIZE(skipor): don't wrap into buffer, if already ioutil2.ByteWriter + buf := bufio.NewWriterSize(w, conf.BufferSizeOrDefault()) + stream := jsoniter.NewStream(api, buf, conf.BufferSizeOrDefault()) + return &jsonEncoder{stream, buf} } type jsonEncoder struct { *jsoniter.Stream + buf *bufio.Writer } -func (e jsonEncoder) Encode(s core.Sample) error { +func (e *jsonEncoder) Encode(s core.Sample) error { e.WriteVal(s) e.WriteRaw("\n") return e.Error } + +func (e *jsonEncoder) Flush() error { + err := e.Stream.Flush() + e.buf.Flush() + return err +} diff --git a/examples/custom_pandora/custom.yaml b/examples/custom_pandora/custom.yaml index b109629cd..ab6b0fd6f 100644 --- a/examples/custom_pandora/custom.yaml +++ b/examples/custom_pandora/custom.yaml @@ -13,7 +13,7 @@ pools: result: type: json - sink: stderr # Just for interactivity print result to stderr. Usually file user. + sink: stdout # Just for interactivity print result to stdout. Usually file used here. rps: - {duration: 2s, type: line, from: 1, to: 5}