From 57d54b2fc36743765ea9977d300750ec265ccb2b Mon Sep 17 00:00:00 2001 From: Vladimir Skipor Date: Sun, 18 Feb 2018 19:56:08 +0300 Subject: [PATCH] GH-85: JSON provider --- Gopkg.lock | 197 ++++++++++++++---- Gopkg.toml | 47 ++++- cli/cli.go | 3 + core/aggregate/jsonlines.go | 5 +- core/aggregate/reporter.go | 2 +- core/core.go | 10 + core/coreutil/ammo.go | 24 +++ .../buffer_size_config.go | 2 +- .../buffer_size_config_test.go | 2 +- core/coreutil/doc.go | 1 + core/datasink/file.go | 2 + core/datasource/file.go | 3 +- core/datasource/std.go | 12 +- core/engine/engine.go | 10 +- core/provider/chunk_decoder.go | 72 +++++++ core/provider/decoder.go | 113 ++++++++++ core/provider/json.go | 69 ++++++ core/provider/json_test.go | 137 ++++++++++++ core/provider/provider_suite_test.go | 7 +- core/provider/queue.go | 52 +++++ lib/ioutil2/reader.go | 50 +++++ 21 files changed, 766 insertions(+), 54 deletions(-) create mode 100644 core/coreutil/ammo.go rename core/{aggregate => coreutil}/buffer_size_config.go (97%) rename core/{aggregate => coreutil}/buffer_size_config_test.go (97%) create mode 100644 core/provider/chunk_decoder.go create mode 100644 core/provider/decoder.go create mode 100644 core/provider/json.go create mode 100644 core/provider/json_test.go create mode 100644 core/provider/queue.go create mode 100644 lib/ioutil2/reader.go diff --git a/Gopkg.lock b/Gopkg.lock index a49846db0..5154b6078 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -40,8 +40,8 @@ [[projects]] name = "github.com/fsnotify/fsnotify" packages = ["."] - revision = "629574ca2a5df945712d3079857300b5e4da0236" - version = "v1.4.2" + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" [[projects]] name = "github.com/ghodss/yaml" @@ -51,7 +51,10 @@ [[projects]] name = "github.com/go-playground/locales" - packages = [".","currency"] + packages = [ + ".", + "currency" + ] revision = "e4cbcb5d0652150d40ad0646651076b6bd2be4f6" version = "v0.11.2" @@ -61,42 +64,109 @@ revision = "b32fa301c9fe55953584134cb6853a13c87ec0a1" version = "v0.16.0" +[[projects]] + branch = "master" + name = "github.com/hashicorp/errwrap" + packages = ["."] + revision = "7554cd9344cec97297fa6649b055a8c98c2a1e55" + +[[projects]] + branch = "master" + name = "github.com/hashicorp/go-multierror" + packages = ["."] + revision = "b7773ae218740a7be65057fc60b366a49b538a44" + [[projects]] branch = "master" name = "github.com/hashicorp/hcl" - packages = [".","hcl/ast","hcl/parser","hcl/scanner","hcl/strconv","hcl/token","json/parser","json/scanner","json/token"] + packages = [ + ".", + "hcl/ast", + "hcl/parser", + "hcl/scanner", + "hcl/strconv", + "hcl/token", + "json/parser", + "json/scanner", + "json/token" + ] revision = "23c074d0eceb2b8a5bfdbb271ab780cde70f05a8" [[projects]] - name = "github.com/magiconair/properties" + name = "github.com/json-iterator/go" packages = ["."] - revision = "d419a98cdbed11a922bf76f257b7c4be79b50e73" - version = "v1.7.4" + revision = "e7c7f3b33712573affdcc7a107218e7926b9a05b" + version = "1.0.6" + +[[projects]] + name = "github.com/magiconair/properties" + packages = [ + ".", + "assert" + ] + revision = "c3beff4c2358b44d0493c7dda585e7db7ff28ae6" + version = "v1.7.6" [[projects]] branch = "master" name = "github.com/mitchellh/mapstructure" packages = ["."] - revision = "e88fb6b4946b282e0d5196ac35b09d256e09e9d2" - source = "https://github.com/skipor/mapstructure" + revision = "a4e142e9c047c904fa2f1e144d9a84e6133024bc" [[projects]] name = "github.com/onsi/ginkgo" - packages = [".","config","extensions/table","internal/codelocation","internal/containernode","internal/failer","internal/leafnodes","internal/remote","internal/spec","internal/spec_iterator","internal/specrunner","internal/suite","internal/testingtproxy","internal/writer","reporters","reporters/stenographer","reporters/stenographer/support/go-colorable","reporters/stenographer/support/go-isatty","types"] + packages = [ + ".", + "config", + "extensions/table", + "internal/codelocation", + "internal/containernode", + "internal/failer", + "internal/leafnodes", + "internal/remote", + "internal/spec", + "internal/spec_iterator", + "internal/specrunner", + "internal/suite", + "internal/testingtproxy", + "internal/writer", + "reporters", + "reporters/stenographer", + "reporters/stenographer/support/go-colorable", + "reporters/stenographer/support/go-isatty", + "types" + ] revision = "9eda700730cba42af70d53180f9dcce9266bc2bc" version = "v1.4.0" [[projects]] name = "github.com/onsi/gomega" - packages = [".","format","gbytes","gexec","gstruct","gstruct/errors","internal/assertion","internal/asyncassertion","internal/oraclematcher","internal/testingtsupport","matchers","matchers/support/goraph/bipartitegraph","matchers/support/goraph/edge","matchers/support/goraph/node","matchers/support/goraph/util","types"] - revision = "c893efa28eb45626cdaa76c9f653b62488858837" - version = "v1.2.0" + packages = [ + ".", + "format", + "gbytes", + "gexec", + "gstruct", + "gstruct/errors", + "internal/assertion", + "internal/asyncassertion", + "internal/oraclematcher", + "internal/testingtsupport", + "matchers", + "matchers/support/goraph/bipartitegraph", + "matchers/support/goraph/edge", + "matchers/support/goraph/node", + "matchers/support/goraph/util", + "types" + ] + revision = "003f63b7f4cff3fc95357005358af2de0f5fe152" + version = "v1.3.0" [[projects]] name = "github.com/pelletier/go-toml" packages = ["."] - revision = "16398bac157da96aa88f98a2df640c7f32af1da2" - version = "v1.0.1" + revision = "acdc4509485b587f5e675510c4f2c63e90ff68a8" + version = "v1.1.0" [[projects]] name = "github.com/pkg/errors" @@ -113,26 +183,32 @@ [[projects]] branch = "master" name = "github.com/pquerna/ffjson" - packages = ["fflib/v1","fflib/v1/internal"] + packages = [ + "fflib/v1", + "fflib/v1/internal" + ] revision = "d49c2bc1aa135aad0c6f4fc2056623ec78f5d5ac" [[projects]] name = "github.com/spf13/afero" - packages = [".","mem"] - revision = "ec3a3111d1e1bdff38a61e09d5a5f5e974905611" - version = "v1.0.1" + packages = [ + ".", + "mem" + ] + revision = "bb8f1927f2a9d3ab41c9340aa034f6b803f4359c" + version = "v1.0.2" [[projects]] name = "github.com/spf13/cast" packages = ["."] - revision = "acbeb36b902d72a7a4c18e8f3241075e7ab763e4" - version = "v1.1.0" + revision = "8965335b8c7107321228e3e3702cab9832751bac" + version = "v1.2.0" [[projects]] branch = "master" name = "github.com/spf13/jwalterweatherman" packages = ["."] - revision = "12bd96e66386c1960ab0f74ced1362f66f552f7b" + revision = "7c0cea34c8ece3fbeb2b27ab9b59511d360fb394" [[projects]] name = "github.com/spf13/pflag" @@ -147,16 +223,20 @@ version = "v1.0.0" [[projects]] - branch = "master" name = "github.com/stretchr/objx" packages = ["."] - revision = "1a9d0bb9f541897e62256577b352fdbc1fb4fd94" + revision = "facf9a85c22f48d2f52f2380e4efce1768749a89" + version = "v0.1" [[projects]] name = "github.com/stretchr/testify" - packages = ["assert","mock","require"] - revision = "b91bfb9ebec76498946beb6af7c0230c7cc7ba6c" - version = "v1.2.0" + packages = [ + "assert", + "mock", + "require" + ] + revision = "12b6f73e6084dad08a7c6e575284b177ecafbc71" + version = "v1.2.1" [[projects]] name = "github.com/uber-go/atomic" @@ -178,43 +258,86 @@ [[projects]] name = "go.uber.org/zap" - packages = [".","buffer","internal/bufferpool","internal/color","internal/exit","zapcore"] + packages = [ + ".", + "buffer", + "internal/bufferpool", + "internal/color", + "internal/exit", + "zapcore", + "zaptest/observer" + ] revision = "35aad584952c3e7020db7b839f6b102de6271f89" version = "v1.7.1" [[projects]] branch = "master" name = "golang.org/x/net" - packages = ["html","html/atom","html/charset","http2","http2/hpack","idna","lex/httplex"] - revision = "d866cfc389cec985d6fda2859936a575a55a3ab6" + packages = [ + "html", + "html/atom", + "html/charset", + "http2", + "http2/hpack", + "idna", + "lex/httplex" + ] + revision = "136a25c244d3019482a795d728110278d6ba09a4" [[projects]] branch = "master" name = "golang.org/x/sys" packages = ["unix"] - revision = "83801418e1b59fb1880e363299581ee543af32ca" + revision = "37707fdb30a5b38865cfb95e5aab41707daec7fd" [[projects]] branch = "master" name = "golang.org/x/text" - packages = ["collate","collate/build","encoding","encoding/charmap","encoding/htmlindex","encoding/internal","encoding/internal/identifier","encoding/japanese","encoding/korean","encoding/simplifiedchinese","encoding/traditionalchinese","encoding/unicode","internal/colltab","internal/gen","internal/tag","internal/triegen","internal/ucd","internal/utf8internal","language","runes","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable"] - revision = "e19ae1496984b1c655b8044a65c0300a3c878dd3" + packages = [ + "collate", + "collate/build", + "encoding", + "encoding/charmap", + "encoding/htmlindex", + "encoding/internal", + "encoding/internal/identifier", + "encoding/japanese", + "encoding/korean", + "encoding/simplifiedchinese", + "encoding/traditionalchinese", + "encoding/unicode", + "internal/colltab", + "internal/gen", + "internal/tag", + "internal/triegen", + "internal/ucd", + "internal/utf8internal", + "language", + "runes", + "secure/bidirule", + "transform", + "unicode/bidi", + "unicode/cldr", + "unicode/norm", + "unicode/rangetable" + ] + revision = "4e4a3210bb54bb31f6ab2cdca2edcc0b50c420c1" [[projects]] name = "gopkg.in/bluesuncorp/validator.v9" packages = ["."] - revision = "b1f51f36f1c98cc97f777d6fc9d4b05eaa0cabb5" - version = "v9.9.1" + revision = "535f85221eb67ab842438877b22cac6274380cf2" + version = "v9.10.0" [[projects]] branch = "v2" name = "gopkg.in/yaml.v2" packages = ["."] - revision = "287cf08546ab5e7e37d55a84f7ed3fd1db036de5" + revision = "d670f9405373e636a5a2765eea47fac0c9bc91a4" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "a41684e385a44450860551bf478c3a9a1dfd6128a7050bfd8dbcb642bdb79970" + inputs-digest = "b192bac007a93578bd17b7b68880608ecdfc55a766e29215dfc4c656e805d483" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index a77274403..d712e7242 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,4 +1,3 @@ - # Gopkg.toml example # # Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md @@ -17,63 +16,103 @@ # source = "github.com/myfork/project2" # # [[override]] -# name = "github.com/x/y" -# version = "2.4.0" +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true [[constraint]] name = "github.com/asaskevich/govalidator" + version = "8.0.0" [[constraint]] + branch = "master" name = "github.com/c2h5oh/datasize" [[constraint]] + branch = "master" name = "github.com/facebookgo/stack" [[constraint]] + branch = "master" name = "github.com/facebookgo/stackerr" [[constraint]] name = "github.com/fatih/structs" + version = "1.0.0" [[constraint]] name = "github.com/ghodss/yaml" + version = "1.0.0" [[constraint]] + branch = "master" + name = "github.com/hashicorp/go-multierror" + +[[constraint]] + name = "github.com/json-iterator/go" + version = "1.0.6" + +[[constraint]] + name = "github.com/magiconair/properties" + version = "1.7.6" + +[[constraint]] + branch = "master" name = "github.com/mitchellh/mapstructure" [[constraint]] name = "github.com/onsi/ginkgo" + version = "1.4.0" [[constraint]] name = "github.com/onsi/gomega" + version = "1.3.0" [[constraint]] name = "github.com/pkg/errors" + version = "0.8.0" [[constraint]] + branch = "master" name = "github.com/pquerna/ffjson" [[constraint]] name = "github.com/spf13/afero" + version = "1.0.2" [[constraint]] name = "github.com/spf13/viper" + version = "1.0.0" [[constraint]] name = "github.com/stretchr/testify" + version = "1.2.1" [[constraint]] name = "github.com/uber-go/atomic" + version = "1.3.0" [[constraint]] name = "go.uber.org/atomic" + version = "1.3.1" [[constraint]] name = "go.uber.org/zap" + version = "1.7.1" [[constraint]] + branch = "master" name = "golang.org/x/net" [[constraint]] - name = "gopkg.in/go-playground/validator.v8" + name = "gopkg.in/bluesuncorp/validator.v9" + version = "9.10.0" + +[prune] + go-tests = true + unused-packages = true diff --git a/cli/cli.go b/cli/cli.go index 6b8bed50a..ad316dabc 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -37,6 +37,9 @@ type logConfig struct { File string `config:"file"` } +// TODO(skipor): log sampling with WARN when first message is dropped, and WARN at finish with all +// filtered out entries num. Message is filtered out when zapcore.CoreEnable returns true but +// zapcore.Core.Check return nil. func newLogger(conf logConfig) *zap.Logger { zapConf := zap.NewDevelopmentConfig() zapConf.OutputPaths = []string{conf.File} diff --git a/core/aggregate/jsonlines.go b/core/aggregate/jsonlines.go index f574bc054..c04d45bff 100644 --- a/core/aggregate/jsonlines.go +++ b/core/aggregate/jsonlines.go @@ -13,6 +13,7 @@ import ( "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/config" + "github.com/yandex/pandora/core/coreutil" ) type JSONLineAggregatorConfig struct { @@ -21,8 +22,8 @@ type JSONLineAggregatorConfig struct { } type JSONLineEncoderConfig struct { - JSONIterConfig `config:",squash"` - BufferSizeConfig `config:",squash"` + JSONIterConfig `config:",squash"` + coreutil.BufferSizeConfig `config:",squash"` } // JSONIterConfig is subset of jsoniter.Config that may be useful to configure. diff --git a/core/aggregate/reporter.go b/core/aggregate/reporter.go index 2d40af294..f345a50d6 100644 --- a/core/aggregate/reporter.go +++ b/core/aggregate/reporter.go @@ -18,7 +18,7 @@ import ( type ReporterConfig struct { // SampleQueueSize is number maximum number of unhandled samples. // On queue overflow, samples are dropped. - SampleQueueSize int `config:"sample-buff-size" validate:"min=1"` + SampleQueueSize int `config:"sample-queue-size" validate:"min=1"` } const ( diff --git a/core/core.go b/core/core.go index 8c441f412..07164e034 100644 --- a/core/core.go +++ b/core/core.go @@ -27,6 +27,15 @@ import ( // Information common for all shoots SHOULD be passed via Provider configuration. type Ammo interface{} +// ResettableAmmo is ammo that can be efficiently reset before reuse. +// Generic Provider (Provider that accepts undefined type of Ammo) SHOULD Reset Ammo before reuse +// if it implements ResettableAmmo. +// Ammo that is not going to be used with generic Providers don't need to implement ResettableAmmo. +type ResettableAmmo interface { + Ammo + Reset() +} + //go:generate mockery -name=Provider -case=underscore -outpkg=coremock // Provider is routine that generates ammo for Instance shoots. @@ -108,6 +117,7 @@ type Sample interface{} // BorrowedSample is Sample that was borrowed from pool, and SHOULD be returned by Aggregator, // after it will handle Sample. type BorrowedSample interface { + Sample Return() } diff --git a/core/coreutil/ammo.go b/core/coreutil/ammo.go new file mode 100644 index 000000000..a2ce8a667 --- /dev/null +++ b/core/coreutil/ammo.go @@ -0,0 +1,24 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package coreutil + +import ( + "reflect" + + "github.com/yandex/pandora/core" +) + +// ResetReusedAmmo sets to zero any ammo. +// Used by core.Provider implementations that accepts generic type, and need to clean reused ammo +// before fill with fresh data. +func ResetReusedAmmo(ammo core.Ammo) { + if resettable, ok := ammo.(core.ResettableAmmo); ok { + resettable.Reset() + return + } + elem := reflect.ValueOf(ammo).Elem() + elem.Set(reflect.Zero(elem.Type())) +} diff --git a/core/aggregate/buffer_size_config.go b/core/coreutil/buffer_size_config.go similarity index 97% rename from core/aggregate/buffer_size_config.go rename to core/coreutil/buffer_size_config.go index a19647782..a37a134e2 100644 --- a/core/aggregate/buffer_size_config.go +++ b/core/coreutil/buffer_size_config.go @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. // Author: Vladimir Skipor -package aggregate +package coreutil import ( "github.com/c2h5oh/datasize" diff --git a/core/aggregate/buffer_size_config_test.go b/core/coreutil/buffer_size_config_test.go similarity index 97% rename from core/aggregate/buffer_size_config_test.go rename to core/coreutil/buffer_size_config_test.go index 53cd51265..13223be73 100644 --- a/core/aggregate/buffer_size_config_test.go +++ b/core/coreutil/buffer_size_config_test.go @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. // Author: Vladimir Skipor -package aggregate +package coreutil import ( "testing" diff --git a/core/coreutil/doc.go b/core/coreutil/doc.go index 42fe6a700..6ee897e82 100644 --- a/core/coreutil/doc.go +++ b/core/coreutil/doc.go @@ -5,4 +5,5 @@ // package coreutil provides utilities for core interfaces, that can be useful // not only for engine, but other core importers too. +// coreutil MUST NOT depend on any core subpackage, because they may import coreutil. package coreutil diff --git a/core/datasink/file.go b/core/datasink/file.go index b925c1221..7ea859b48 100644 --- a/core/datasink/file.go +++ b/core/datasink/file.go @@ -14,6 +14,8 @@ import ( "github.com/yandex/pandora/core" ) +// TODO(skipor): gzip on flag + type FileConfig struct { Path string `config:"path" validate:"required"` } diff --git a/core/datasource/file.go b/core/datasource/file.go index cefefa94f..ab6f4ce1b 100644 --- a/core/datasource/file.go +++ b/core/datasource/file.go @@ -14,7 +14,8 @@ import ( "github.com/yandex/pandora/core" ) -// TODO(skipor): auto unzip +// TODO(skipor): auto unzip with option to turn this behaviour off. + type FileConfig struct { Path string `config:"path" validate:"required"` } diff --git a/core/datasource/std.go b/core/datasource/std.go index db6ef39c1..bd39128ef 100644 --- a/core/datasource/std.go +++ b/core/datasource/std.go @@ -29,7 +29,7 @@ func (b buffer) OpenSource() (wc io.ReadCloser, err error) { // NewReader returns dummy core.DataSource that returns it on OpenSource call, wrapping it // ioutil.NopCloser if r is not io.Closer. -// NONE(skipor): such wrapping hide +// NOTE(skipor): such wrapping hides Seek and other methods that can be used. func NewReader(r io.Reader) core.DataSource { return &reader{r} } @@ -42,5 +42,15 @@ func (r *reader) OpenSource() (rc io.ReadCloser, err error) { if rc, ok := r.source.(io.ReadCloser); ok { return rc, nil } + // Need to add io.Closer, but don't want to hide seeker. + rs, ok := r.source.(io.ReadSeeker) + if ok { + return &struct { + io.ReadSeeker + ioutil2.NopCloser + }{ReadSeeker: rs}, nil + } return ioutil.NopCloser(r.source), nil } + +// TODO(skipor): InMemory DataSource, that reads all nested source data in open to buffer. diff --git a/core/engine/engine.go b/core/engine/engine.go index c1f22ca14..2e2ef4a47 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -56,7 +56,7 @@ type Engine struct { // Run runs all instance pools. Run blocks until fail happen, or all pools // subroutines are successfully finished. -// Ctx will be ancestor to Contexts passed to Provider, Gun and Aggregator. +// Ctx will be ancestor to Contexts passed to AmmoQueue, Gun and Aggregator. // That's ctx cancel cancels shooting and it's Context values can be used for communication between plugins. func (e *Engine) Run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) @@ -124,10 +124,10 @@ type instancePool struct { // Run start instance pool. Run blocks until fail happen, or all instances finish. // What's going on: -// Provider and Aggregator are started in separate goroutines. +// AmmoQueue and Aggregator are started in separate goroutines. // Instances create due to schedule is started in separate goroutine. // Every new instance started in separate goroutine. -// When all instances are finished, Aggregator and Provider contexts are canceled, +// When all instances are finished, Aggregator and AmmoQueue contexts are canceled, // and their execution results are awaited. // If error happen or Run context has been canceled, Run returns non-nil error immediately, // remaining results awaiting goroutine in background, that will call onWaitDone callback, @@ -243,7 +243,7 @@ type runAwaitHandle struct { func (p *instancePool) newAwaitRunHandle(runHandle *poolAsyncRunHandle) (*runAwaitHandle, <-chan error) { awaitErr := make(chan error) - const resultsToWait = 4 // Provider, Aggregator, instance start, instance run. + const resultsToWait = 4 // AmmoQueue, Aggregator, instance start, instance run. awaitHandle := &runAwaitHandle{ log: p.log, poolAsyncRunHandle: *runHandle, @@ -261,7 +261,7 @@ func (ah *runAwaitHandle) awaitRun() { ah.providerErr = nil // TODO(skipor): not wait for provider, to return success result? ah.toWait-- - ah.log.Debug("Provider awaited", zap.Error(err)) + ah.log.Debug("AmmoQueue awaited", zap.Error(err)) if errutil.IsNotCtxError(ah.runCtx, err) { ah.onErrAwaited(errors.WithMessage(err, "provider failed")) } diff --git a/core/provider/chunk_decoder.go b/core/provider/chunk_decoder.go new file mode 100644 index 000000000..0904b4497 --- /dev/null +++ b/core/provider/chunk_decoder.go @@ -0,0 +1,72 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package provider + +import ( + "bufio" + "fmt" + + "github.com/pkg/errors" + + "github.com/yandex/pandora/core" +) + +var ErrNoAmmoDecoded = fmt.Errorf("no ammo has been decoded from chunk") + +// ChunkAmmoDecoder accept data chunks that can contain encoded ammo or some meta information that +// changes ChunkAmmoDecoder state and affects next ammo decoding. +// For example, chunks are lines that contains HTTP URI to be transformed to http.Request or +// HTTP header to be added to next decoded http.Requests. +type ChunkAmmoDecoder interface { + // DecodeChunk accepts chunk of data, than decode it to ammo or change ChunkAmmoDecoder internal + // state. + // Returns nil on when ammo was successfully decoded. + // ErrNoAmmoDecoded MAY be returned, to indicate that chunk was accepted, but ammo were not + // decoded. + // Returns other non nil error, on chunk decode fail. + // Panics if ammo type is not supported. + DecodeChunk(chunk []byte, ammo core.Ammo) error +} + +func NewScanDecoder(scanner Scanner, decoder ChunkAmmoDecoder) *ScanAmmoDecoder { + return &ScanAmmoDecoder{scanner: scanner, decoder: decoder} +} + +// Scanner is interface of bufio.Scanner like scanners. +type Scanner interface { + Scan() bool + Bytes() []byte + Err() error +} + +var _ Scanner = &bufio.Scanner{} + +type ScanAmmoDecoder struct { + chunkCounter int + scanner Scanner + decoder ChunkAmmoDecoder +} + +var _ AmmoDecoder = &ScanAmmoDecoder{} + +func (d *ScanAmmoDecoder) Decode(ammo core.Ammo) error { + for { + if !d.scanner.Scan() { + return d.scanner.Err() + } + chunk := d.scanner.Bytes() + err := d.decoder.DecodeChunk(chunk, ammo) + if err == ErrNoAmmoDecoded { + continue + } + if err != nil { + return errors.Wrapf(err, "chunk %v decode failed", d.chunkCounter) + } + d.chunkCounter++ + return nil + } + +} diff --git a/core/provider/decoder.go b/core/provider/decoder.go new file mode 100644 index 000000000..8edb5e3eb --- /dev/null +++ b/core/provider/decoder.go @@ -0,0 +1,113 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package provider + +import ( + "context" + "fmt" + "io" + + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/yandex/pandora/core" + "github.com/yandex/pandora/lib/errutil" + "github.com/yandex/pandora/lib/ioutil2" +) + +type NewAmmoDecoder func(deps core.ProviderDeps, source io.Reader) (AmmoDecoder, error) + +// TODO(skipo): test decoder that fills ammo with random data + +// AmmoEncoder MAY support only concrete type of ammo. +// AmmoDecoder SHOULD NOT be used after first decode fail. +type AmmoDecoder interface { + // Decode fills passed ammo with data. + // Returns non nil error on fail. + // Panics if ammo type is not supported. + Decode(ammo core.Ammo) error +} + +// TODO(skipor): test + +func NewDecodeProvider(newAmmo func() core.Ammo, newDecoder NewAmmoDecoder, conf DecodeProviderConfig) *DecodeProvider { + return &DecodeProvider{ + AmmoQueue: *NewAmmoQueue(newAmmo, conf.Queue), + newDecoder: newDecoder, + conf: conf, + } +} + +type DecodeProviderConfig struct { + Queue AmmoQueueConfig `config:",squash"` + Source core.DataSource `config:"source" validate:"required"` + // Limit limits total num of ammo. Unlimited if zero. + Limit int `validate:"min=0"` + // Passes limits ammo file passes. Unlimited if zero. + Passes int `validate:"min=0"` +} + +func DefaultDecodeProviderConfig() DecodeProviderConfig { + return DecodeProviderConfig{ + Queue: DefaultAmmoQueueConfig(), + } +} + +type DecodeProvider struct { + AmmoQueue + conf DecodeProviderConfig + newDecoder NewAmmoDecoder + core.ProviderDeps +} + +var _ core.Provider = &DecodeProvider{} + +func (p *DecodeProvider) Run(ctx context.Context, deps core.ProviderDeps) (err error) { + p.ProviderDeps = deps + defer close(p.OutQueue) + source, err := p.conf.Source.OpenSource() + if err != nil { + return errors.WithMessage(err, "data source open failed") + } + defer func() { + errutil.Join(err, errors.Wrap(source.Close(), "data source close failed")) + }() + + // Problem: can't use decoder after io.EOF, because decoder is invalidated. But decoder recreation + // is not efficient, when we have short data source. + // Now problem solved by using MultiPassReader, but in such case decoder don't know real input + // position, so can't put this important information in decode error. + // TODO(skipor): Let's add optional Reset(io.Reader) method, that will allow efficient Decoder reset after every pass. + multipassReader := ioutil2.NewMultiPassReader(source, p.conf.Passes) + if source == multipassReader { + p.Log.Info("Ammo data source can't sought, so will be read only once") + } + decoder, err := p.newDecoder(deps, multipassReader) + + if err != nil { + return errors.WithMessage(err, "decoder construction failed") + } + var ammoNum int + for ; p.conf.Limit <= 0 || ammoNum < p.conf.Limit; ammoNum++ { + ammo := p.InputPool.Get() + err = decoder.Decode(ammo) + if err == io.EOF { + p.Log.Info("Ammo finished", zap.Int("decoded", ammoNum)) + return nil + } + if err != nil { + return errors.WithMessage(err, fmt.Sprintf("ammo #%v decode failed", ammoNum)) + } + select { + case p.OutQueue <- ammo: + case <-ctx.Done(): + p.Log.Debug("Provider run context is Done", zap.Int("decoded", ammoNum+1)) + return nil + } + } + p.Log.Info("Ammo limit is reached", zap.Int("decoded", ammoNum)) + return nil +} diff --git a/core/provider/json.go b/core/provider/json.go new file mode 100644 index 000000000..1472750a6 --- /dev/null +++ b/core/provider/json.go @@ -0,0 +1,69 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package provider + +import ( + "io" + + "github.com/json-iterator/go" + + "github.com/yandex/pandora/core" + "github.com/yandex/pandora/core/coreutil" + "github.com/yandex/pandora/lib/ioutil2" +) + +func NewJSONProvider(newAmmo func() core.Ammo, conf JSONProviderConfig) core.Provider { + var newDecoder NewAmmoDecoder = func(deps core.ProviderDeps, source io.Reader) (AmmoDecoder, error) { + return NewJSONAmmoDecoder(source, conf.Buffer.BufferSizeOrDefault()), nil + } + return NewDecodeProvider(newAmmo, newDecoder, conf.Decode) +} + +type JSONProviderConfig struct { + Decode DecodeProviderConfig `config:",squash"` + Buffer coreutil.BufferSizeConfig `config:",squash"` +} + +func DefaultJSONProviderConfig() JSONProviderConfig { + return JSONProviderConfig{Decode: DefaultDecodeProviderConfig()} +} + +func NewJSONAmmoDecoder(r io.Reader, buffSize int) AmmoDecoder { + var readError error + // HACK(skipor): jsoniter.Iterator don't handle read errors well, but jsoniter.Decoder don't allow to set buffer size. + var errTrackingReader ioutil2.ReaderFunc = func(p []byte) (n int, err error) { + n, err = r.Read(p) + if n > 0 { + // Need to suppress error, to distinguish parse error in last chunk and read error. + return n, nil + } + if err != nil { + readError = err + } + return n, err + } + return &JSONAmmoDecoder{ + iter: jsoniter.Parse(jsoniter.ConfigFastest, errTrackingReader, buffSize), + readErrorPtr: &readError, + } +} + +type JSONAmmoDecoder struct { + iter *jsoniter.Iterator + readErrorPtr *error +} + +func (d *JSONAmmoDecoder) Decode(ammo core.Ammo) error { + coreutil.ResetReusedAmmo(ammo) + d.iter.ReadVal(ammo) + if d.iter.Error != nil { + if *d.readErrorPtr != nil { + return *d.readErrorPtr + } + return d.iter.Error + } + return nil +} diff --git a/core/provider/json_test.go b/core/provider/json_test.go new file mode 100644 index 000000000..9a9ec5a1e --- /dev/null +++ b/core/provider/json_test.go @@ -0,0 +1,137 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package provider + +import ( + "context" + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/yandex/pandora/core" + "github.com/yandex/pandora/core/datasource" +) + +type testJSONAmmo struct { + Id string + Data string +} + +// TODO(skipor): test this in decode provider, not json +func TestDecodeProviderPasses(t *testing.T) { + input := strings.NewReader(` {"data":"first"} `) + conf := DefaultJSONProviderConfig() + conf.Decode.Source = datasource.NewReader(input) + conf.Decode.Passes = 3 + newAmmo := func() core.Ammo { + return &testJSONAmmo{} + } + provider := NewJSONProvider(newAmmo, conf) + err := provider.Run(context.Background(), testDeps()) + require.NoError(t, err) + + expected := func(data string) *testJSONAmmo { + return &testJSONAmmo{Data: data} + } + ammo, ok := provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("first"), ammo) + + ammo, ok = provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("first"), ammo) + + ammo, ok = provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("first"), ammo) + + _, ok = provider.Acquire() + assert.False(t, ok) +} + +func TestDecodeProvider(t *testing.T) { + input := strings.NewReader(` {"data":"first"} +{"data":"second"} `) + conf := DefaultJSONProviderConfig() + conf.Decode.Source = datasource.NewReader(input) + conf.Decode.Limit = 3 + newAmmo := func() core.Ammo { + return &testJSONAmmo{} + } + provider := NewJSONProvider(newAmmo, conf) + err := provider.Run(context.Background(), testDeps()) + require.NoError(t, err) + + expected := func(data string) *testJSONAmmo { + return &testJSONAmmo{Data: data} + } + ammo, ok := provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("first"), ammo) + + ammo, ok = provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("second"), ammo) + + ammo, ok = provider.Acquire() + require.True(t, ok) + assert.Equal(t, expected("first"), ammo) + + _, ok = provider.Acquire() + assert.False(t, ok) +} + +func TestDecoderSimple(t *testing.T) { + var val struct { + Data string + } + input := strings.NewReader(`{"data":"first"}`) + decoder := NewJSONAmmoDecoder(input, 512) + err := decoder.Decode(&val) + require.NoError(t, err) + assert.Equal(t, "first", val.Data) + + err = decoder.Decode(&val) + require.Equal(t, io.EOF, err) +} + +func TestDecoderWhitespaces(t *testing.T) { + var val struct { + Data string + } + input := strings.NewReader(` {"data":"first"} + {"data":"second"} {"data":"third"} `) + decoder := NewJSONAmmoDecoder(input, 512) + err := decoder.Decode(&val) + require.NoError(t, err) + assert.Equal(t, "first", val.Data) + + err = decoder.Decode(&val) + require.NoError(t, err) + assert.Equal(t, "second", val.Data) + + err = decoder.Decode(&val) + require.NoError(t, err) + assert.Equal(t, "third", val.Data) + + err = decoder.Decode(&val) + require.Equal(t, io.EOF, err) +} + +func TestDecoderReset(t *testing.T) { + val := testJSONAmmo{ + Id: "id", + } + input := strings.NewReader(`{"data":"first"}`) + decoder := NewJSONAmmoDecoder(input, 512) + err := decoder.Decode(&val) + require.NoError(t, err) + assert.Equal(t, "first", val.Data) + assert.Zero(t, val.Id) +} diff --git a/core/provider/provider_suite_test.go b/core/provider/provider_suite_test.go index 73cbe3960..4273bdb81 100644 --- a/core/provider/provider_suite_test.go +++ b/core/provider/provider_suite_test.go @@ -3,9 +3,14 @@ package provider import ( "testing" + "github.com/yandex/pandora/core" "github.com/yandex/pandora/lib/testutil" ) func TestProvider(t *testing.T) { - testutil.RunSuite(t, "Provider Suite") + testutil.RunSuite(t, "AmmoQueue Suite") +} + +func testDeps() core.ProviderDeps { + return core.ProviderDeps{testutil.NewLogger()} } diff --git a/core/provider/queue.go b/core/provider/queue.go new file mode 100644 index 000000000..ef208d5d6 --- /dev/null +++ b/core/provider/queue.go @@ -0,0 +1,52 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package provider + +import ( + "sync" + + "github.com/yandex/pandora/core" +) + +type AmmoQueueConfig struct { + // AmmoQueueSize is number maximum number of ready but not acquired ammo. + // On queue overflow, ammo decode is stopped. + AmmoQueueSize int `config:"ammo-queue-size" validate:"min=1"` +} + +const ( + shootsPerSecondUpperBound = 128 * 1024 + DefaultAmmoQueueSize = shootsPerSecondUpperBound / 16 +) + +func DefaultAmmoQueueConfig() AmmoQueueConfig { + return AmmoQueueConfig{ + AmmoQueueSize: DefaultAmmoQueueSize, + } +} + +func NewAmmoQueue(newAmmo func() core.Ammo, conf AmmoQueueConfig) *AmmoQueue { + return &AmmoQueue{ + OutQueue: make(chan core.Ammo, conf.AmmoQueueSize), + InputPool: sync.Pool{New: func() interface{} { + return newAmmo() + }}, + } +} + +type AmmoQueue struct { + OutQueue chan core.Ammo + InputPool sync.Pool +} + +func (p *AmmoQueue) Acquire() (core.Ammo, bool) { + ammo, ok := <-p.OutQueue + return ammo, ok +} + +func (p *AmmoQueue) Release(a core.Ammo) { + p.InputPool.Put(a) +} diff --git a/lib/ioutil2/reader.go b/lib/ioutil2/reader.go new file mode 100644 index 000000000..261e93de7 --- /dev/null +++ b/lib/ioutil2/reader.go @@ -0,0 +1,50 @@ +// Copyright (c) 2018 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package ioutil2 + +import "io" + +type ReaderWrapper interface { + Unwrap() io.Reader +} + +// TODO(skipor): test + +func NewMultiPassReader(r io.Reader, passes int) io.Reader { + if passes == 1 { + return r + } + rs, isSeakable := r.(io.ReadSeeker) + if !isSeakable { + return r + } + return &MultiPassReader{rs: rs, passesLimit: passes} +} + +type MultiPassReader struct { + rs io.ReadSeeker + passesCount int + passesLimit int +} + +func (r *MultiPassReader) Read(p []byte) (n int, err error) { + n, err = r.rs.Read(p) + if err == io.EOF { + r.passesCount++ + if r.passesLimit <= 0 || r.passesCount < r.passesLimit { + _, err = r.rs.Seek(0, io.SeekStart) + } + } + return +} + +func (r *MultiPassReader) PassesLeft() int { + return r.PassesLeft() +} + +func (r *MultiPassReader) Unwrap() io.Reader { + return r.rs +}