Skip to content

Commit

Permalink
Merge pull request #90 from yandex/feature/85
Browse files Browse the repository at this point in the history
DataSink, DataSource. Fast abstract JSON Lines aggregator
  • Loading branch information
skipor authored Jan 23, 2018
2 parents 945aa01 + 6655e38 commit 6a51797
Show file tree
Hide file tree
Showing 47 changed files with 2,035 additions and 217 deletions.
2 changes: 0 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@

[[constraint]]
name = "github.com/mitchellh/mapstructure"
# Use patched fersion with https://github.com/mitchellh/mapstructure/issues/70 fix.
source = "https://github.com/skipor/mapstructure"

[[constraint]]
name = "github.com/onsi/ginkgo"
Expand Down
3 changes: 1 addition & 2 deletions components/phttp/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <[email protected]>

// package phttp (pandora http) contains pandora extension points for HTTP
// related protocols.
// package phttp (pandora http) contains pandora extension points for HTTP related protocols.
package phttp
30 changes: 30 additions & 0 deletions core/aggregate/buffer_size_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <[email protected]>

package aggregate

import (
"github.com/c2h5oh/datasize"
)

const DefaultBufferSize = 256 * 1024
const MinimalBufferSize = 4 * 1024

// BufferSizeConfig SHOULD be used to configure buffer size.
// That makes buffer size configuration consistent among all Aggregators.
type BufferSizeConfig struct {
BufferSize datasize.ByteSize `config:"buffer-size"`
}

func (conf BufferSizeConfig) BufferSizeOrDefault() int {
bufSize := int(conf.BufferSize)
if bufSize == 0 {
return DefaultBufferSize
}
if bufSize <= MinimalBufferSize {
return MinimalBufferSize
}
return bufSize
}
23 changes: 23 additions & 0 deletions core/aggregate/buffer_size_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <[email protected]>

package aggregate

import (
"testing"

"github.com/c2h5oh/datasize"
"github.com/magiconair/properties/assert"
)

func TestBufferSizeConfig_BufferSizeOrDefault(t *testing.T) {
get := func(s datasize.ByteSize) int {
return BufferSizeConfig{s}.BufferSizeOrDefault()
}
assert.Equal(t, DefaultBufferSize, get(0))
assert.Equal(t, MinimalBufferSize, get(1))
const big = DefaultBufferSize * DefaultBufferSize
assert.Equal(t, big, get(big))
}
5 changes: 4 additions & 1 deletion core/aggregate/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/coreutil"
)

// NewDiscard returns Aggregator that just throws reported ammo away.
Expand All @@ -23,4 +24,6 @@ func (discard) Run(ctx context.Context, _ core.AggregatorDeps) error {
return nil
}

func (discard) Report(core.Sample) {}
func (discard) Report(s core.Sample) {
coreutil.ReturnSampleIfBorrowed(s)
}
161 changes: 161 additions & 0 deletions core/aggregate/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2018 Yandex LLC. All rights reserved.
// Use of this source code is governed by a MPL 2.0
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <[email protected]>

package aggregate

import (
"context"
"io"
"time"

"github.com/pkg/errors"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/coreutil"
"github.com/yandex/pandora/lib/errutil"
)

type NewSampleEncoder func(w io.Writer, onFlush func()) SampleEncoder

//go:generate mockery -name=SampleEncoder -case=underscore -outpkg=aggregatemock

// SampleEncoder is efficient, buffered encoder of samples.
// SampleEncoder MAY support only concrete type of sample.
// MAY also implement SampleEncodeCloser.
type SampleEncoder interface {
// SampleEncoder SHOULD panic, if passed sample type is not supported.
Encode(s core.Sample) error
// Flush flushes internal buffer to wrapped io.Writer.
Flush() error
// Optional. Close MUST be called, if io.Closer is implemented.
// io.Closer
}

//go:generate mockery -name=SampleEncodeCloser -case=underscore -outpkg=aggregatemock

// SampleEncoderCloser is SampleEncoder that REQUIRE Close call to finish encoding.
type SampleEncodeCloser interface {
SampleEncoder
io.Closer
}

type EncoderAggregatorConfig struct {
Sink core.DataSink `config:"sink" validate:"required"`
BufferSize int `config:"buffer-size"`
FlushInterval time.Duration `config:"flush-interval"`
ReporterConfig ReporterConfig `config:",squash"`
}

func NewDefaultEncoderAggregatorConfig() EncoderAggregatorConfig {
return EncoderAggregatorConfig{
FlushInterval: time.Second,
ReporterConfig: NewDefaultReporterConfig(),
}
}

// NewEncoderAggregator returns aggregator that use SampleEncoder to marshall samples to core.DataSink.
// Handles encoder flushing and sample dropping on queue overflow.
// putSample is optional func, that called on handled sample. Usually returns sample to pool.
func NewEncoderAggregator(
newEncoder NewSampleEncoder,
conf EncoderAggregatorConfig,
) core.Aggregator {
return &dataSinkAggregator{
Reporter: *NewReporter(conf.ReporterConfig),
newEncoder: newEncoder,
conf: conf,
}
return &dataSinkAggregator{
Reporter: *NewReporter(conf.ReporterConfig),
newEncoder: newEncoder,
conf: conf,
}
}

type dataSinkAggregator struct {
Reporter
core.AggregatorDeps

newEncoder NewSampleEncoder
conf EncoderAggregatorConfig
}

func (a *dataSinkAggregator) Run(ctx context.Context, deps core.AggregatorDeps) (err error) {
a.AggregatorDeps = deps

sink, err := a.conf.Sink.OpenSink()
if err != nil {
return
}
defer func() {
closeErr := sink.Close()
err = errutil.Join(err, closeErr)
err = errutil.Join(err, a.DroppedErr())
}()

var flushes int
encoder := a.newEncoder(sink, func() {
flushes++
})
defer func() {
if encoder, ok := encoder.(io.Closer); ok {
closeErr := encoder.Close()
err = errutil.Join(err, errors.WithMessage(closeErr, "encoder close failed"))
return
}
flushErr := encoder.Flush()
err = errutil.Join(err, errors.WithMessage(flushErr, "final flush failed"))
}()

var flushTick <-chan time.Time
if a.conf.FlushInterval > 0 {
flushTicker := time.NewTicker(a.conf.FlushInterval)
flushTick = flushTicker.C
defer flushTicker.Stop()
}

var previousFlushes int
HandleLoop:
for {
select {
case sample := <-a.Incomming:
err = a.handleSample(encoder, sample)
if err != nil {
return
}
case _ = <-flushTick:
if previousFlushes == flushes {
err = encoder.Flush()
if err != nil {
return
}
}
previousFlushes = flushes
case <-ctx.Done():
break HandleLoop // Still need to handle all queued samples.
}
}

for {
select {
case sample := <-a.Incomming:
err = a.handleSample(encoder, sample)
if err != nil {
return
}
default:
return nil
}
}
}

func (a *dataSinkAggregator) handleSample(enc SampleEncoder, sample core.Sample) error {
err := enc.Encode(sample)
if err != nil {
return errors.WithMessage(err, "sample encode failed")
}
coreutil.ReturnSampleIfBorrowed(sample)
return nil
}
Loading

0 comments on commit 6a51797

Please sign in to comment.