From 405145a776325096c570661f8d584310111bca43 Mon Sep 17 00:00:00 2001 From: Vladimir Skipor Date: Thu, 22 Feb 2018 09:29:23 +0300 Subject: [PATCH] GH-85: enlarge buffers for phout aggregator --- core/aggregator/netsample/phout.go | 23 +++++++++++++++++++---- core/aggregator/netsample/phout_test.go | 3 ++- core/coreutil/buffer_size_config.go | 2 +- core/import/import.go | 2 +- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/aggregator/netsample/phout.go b/core/aggregator/netsample/phout.go index 408532ed2..6be94a31b 100644 --- a/core/aggregator/netsample/phout.go +++ b/core/aggregator/netsample/phout.go @@ -8,15 +8,30 @@ import ( "strconv" "time" + "github.com/c2h5oh/datasize" "github.com/pkg/errors" "github.com/spf13/afero" "github.com/yandex/pandora/core" + "github.com/yandex/pandora/core/coreutil" ) type PhoutConfig struct { - Destination string // Destination file name - Id bool // Print ammo ids if true. + Destination string // Destination file name + Id bool // Print ammo ids if true. + FlushTime time.Duration `config:"flush-time"` + SampleQueueSize int `config:"sample-queue-size"` + Buffer coreutil.BufferSizeConfig `config:",squash"` +} + +func DefaultPhoutConfig() PhoutConfig { + return PhoutConfig{ + FlushTime: time.Second, + SampleQueueSize: 256 * 1024, + Buffer: coreutil.BufferSizeConfig{ + BufferSize: 8 * datasize.MB, + }, + } } func NewPhout(fs afero.Fs, conf PhoutConfig) (a Aggregator, err error) { @@ -31,8 +46,8 @@ func NewPhout(fs afero.Fs, conf PhoutConfig) (a Aggregator, err error) { } a = &phoutAggregator{ config: conf, - sink: make(chan *Sample, 32*1024), - writer: bufio.NewWriterSize(file, 512*1024), + sink: make(chan *Sample, conf.SampleQueueSize), + writer: bufio.NewWriterSize(file, conf.Buffer.BufferSizeOrDefault()), buf: make([]byte, 0, 1024), file: file, } diff --git a/core/aggregator/netsample/phout_test.go b/core/aggregator/netsample/phout_test.go index 6aeb94951..67f1093e4 100644 --- a/core/aggregator/netsample/phout_test.go +++ b/core/aggregator/netsample/phout_test.go @@ -30,7 +30,8 @@ var _ = Describe("Phout", func() { BeforeEach(func() { fs = afero.NewMemMapFs() - conf = PhoutConfig{Destination: fileName} + conf = DefaultPhoutConfig() + conf.Destination = fileName ctx, cancel = context.WithCancel(context.Background()) }) JustBeforeEach(func() { diff --git a/core/coreutil/buffer_size_config.go b/core/coreutil/buffer_size_config.go index a37a134e2..a7fac9293 100644 --- a/core/coreutil/buffer_size_config.go +++ b/core/coreutil/buffer_size_config.go @@ -9,7 +9,7 @@ import ( "github.com/c2h5oh/datasize" ) -const DefaultBufferSize = 256 * 1024 +const DefaultBufferSize = 512 * 1024 const MinimalBufferSize = 4 * 1024 // BufferSizeConfig SHOULD be used to configure buffer size. diff --git a/core/import/import.go b/core/import/import.go index 499015eae..3bf142802 100644 --- a/core/import/import.go +++ b/core/import/import.go @@ -72,7 +72,7 @@ func Import(fs afero.Fs) { register.Aggregator("phout", func(conf netsample.PhoutConfig) (core.Aggregator, error) { a, err := netsample.NewPhout(fs, conf) return netsample.WrapAggregator(a), err - }) + }, netsample.DefaultPhoutConfig) register.Aggregator("jsonlines", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) register.Aggregator("json", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) // TODO(skipor): should be done via alias, but we don't have them yet register.Aggregator("log", aggregator.NewLog)