Skip to content

Commit

Permalink
GH-85: enlarge buffers for phout aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
skipor committed Feb 22, 2018
1 parent 6035c25 commit 405145a
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
23 changes: 19 additions & 4 deletions core/aggregator/netsample/phout.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,30 @@ import (
"strconv"
"time"

"github.com/c2h5oh/datasize"
"github.com/pkg/errors"
"github.com/spf13/afero"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/coreutil"
)

type PhoutConfig struct {
Destination string // Destination file name
Id bool // Print ammo ids if true.
Destination string // Destination file name
Id bool // Print ammo ids if true.
FlushTime time.Duration `config:"flush-time"`
SampleQueueSize int `config:"sample-queue-size"`
Buffer coreutil.BufferSizeConfig `config:",squash"`
}

func DefaultPhoutConfig() PhoutConfig {
return PhoutConfig{
FlushTime: time.Second,
SampleQueueSize: 256 * 1024,
Buffer: coreutil.BufferSizeConfig{
BufferSize: 8 * datasize.MB,
},
}
}

func NewPhout(fs afero.Fs, conf PhoutConfig) (a Aggregator, err error) {
Expand All @@ -31,8 +46,8 @@ func NewPhout(fs afero.Fs, conf PhoutConfig) (a Aggregator, err error) {
}
a = &phoutAggregator{
config: conf,
sink: make(chan *Sample, 32*1024),
writer: bufio.NewWriterSize(file, 512*1024),
sink: make(chan *Sample, conf.SampleQueueSize),
writer: bufio.NewWriterSize(file, conf.Buffer.BufferSizeOrDefault()),
buf: make([]byte, 0, 1024),
file: file,
}
Expand Down
3 changes: 2 additions & 1 deletion core/aggregator/netsample/phout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ var _ = Describe("Phout", func() {

BeforeEach(func() {
fs = afero.NewMemMapFs()
conf = PhoutConfig{Destination: fileName}
conf = DefaultPhoutConfig()
conf.Destination = fileName
ctx, cancel = context.WithCancel(context.Background())
})
JustBeforeEach(func() {
Expand Down
2 changes: 1 addition & 1 deletion core/coreutil/buffer_size_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/c2h5oh/datasize"
)

const DefaultBufferSize = 256 * 1024
const DefaultBufferSize = 512 * 1024
const MinimalBufferSize = 4 * 1024

// BufferSizeConfig SHOULD be used to configure buffer size.
Expand Down
2 changes: 1 addition & 1 deletion core/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Import(fs afero.Fs) {
register.Aggregator("phout", func(conf netsample.PhoutConfig) (core.Aggregator, error) {
a, err := netsample.NewPhout(fs, conf)
return netsample.WrapAggregator(a), err
})
}, netsample.DefaultPhoutConfig)
register.Aggregator("jsonlines", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig)
register.Aggregator("json", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) // TODO(skipor): should be done via alias, but we don't have them yet
register.Aggregator("log", aggregator.NewLog)
Expand Down

0 comments on commit 405145a

Please sign in to comment.