Skip to content

Commit

Permalink
GH-85: fix buffering in json aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
skipor committed Feb 20, 2018
1 parent 8fb991c commit 6035c25
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/aggregator/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ HandleLoop:
}
case _ = <-flushTick:
if previousFlushes == flushes {
a.Log.Debug("Flushing")
err = encoder.Flush()
if err != nil {
return
Expand Down
17 changes: 14 additions & 3 deletions core/aggregator/jsonlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package aggregator

import (
"bufio"
"io"

"github.com/json-iterator/go"
Expand Down Expand Up @@ -54,16 +55,26 @@ func NewJSONEncoder(w io.Writer, conf JSONLineEncoderConfig) SampleEncoder {
var apiConfig jsoniter.Config
config.Map(&apiConfig, conf.JSONIterConfig)
api := apiConfig.Froze()
stream := jsoniter.NewStream(api, w, conf.BufferSizeOrDefault())
return jsonEncoder{stream}
// NOTE(skipor): internal buffering is not working really. Don't know why
// OPTIMIZE(skipor): don't wrap into buffer, if already ioutil2.ByteWriter
buf := bufio.NewWriterSize(w, conf.BufferSizeOrDefault())
stream := jsoniter.NewStream(api, buf, conf.BufferSizeOrDefault())
return &jsonEncoder{stream, buf}
}

type jsonEncoder struct {
*jsoniter.Stream
buf *bufio.Writer
}

func (e jsonEncoder) Encode(s core.Sample) error {
func (e *jsonEncoder) Encode(s core.Sample) error {
e.WriteVal(s)
e.WriteRaw("\n")
return e.Error
}

func (e *jsonEncoder) Flush() error {
err := e.Stream.Flush()
e.buf.Flush()
return err
}
2 changes: 1 addition & 1 deletion examples/custom_pandora/custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pools:
result:
type: json
sink: stderr # Just for interactivity print result to stderr. Usually file user.
sink: stdout # Just for interactivity print result to stdout. Usually file used here.

rps:
- {duration: 2s, type: line, from: 1, to: 5}
Expand Down

0 comments on commit 6035c25

Please sign in to comment.