Skip to content

Commit

Permalink
GH-85: more data sources
Browse files Browse the repository at this point in the history
  • Loading branch information
skipor committed Feb 20, 2018
1 parent 5be63fa commit 983295b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 18 deletions.
2 changes: 1 addition & 1 deletion core/aggregator/jsonlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type JSONLineEncoderConfig struct {
// JSONIterConfig is subset of jsoniter.Config that may be useful to configure.
type JSONIterConfig struct {
// MarshalFloatWith6Digits makes float marshalling faster.
MarshalFloatWith6Digits bool
MarshalFloatWith6Digits bool `config:"marshal-float-with-6-digits"`
// SortMapKeys useful, when sample contains map object, and you want to see them in same order.
SortMapKeys bool `config:"sort-map-keys"`
}
Expand Down
28 changes: 25 additions & 3 deletions core/datasource/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"io"
"io/ioutil"
"strings"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/lib/ioutil2"
Expand All @@ -31,14 +32,14 @@ func (b buffer) OpenSource() (wc io.ReadCloser, err error) {
// ioutil.NopCloser if r is not io.Closer.
// NOTE(skipor): such wrapping hides Seek and other methods that can be used.
func NewReader(r io.Reader) core.DataSource {
return &reader{r}
return &readerSource{r}
}

type reader struct {
type readerSource struct {
source io.Reader
}

func (r *reader) OpenSource() (rc io.ReadCloser, err error) {
func (r *readerSource) OpenSource() (rc io.ReadCloser, err error) {
if rc, ok := r.source.(io.ReadCloser); ok {
return rc, nil
}
Expand All @@ -53,4 +54,25 @@ func (r *reader) OpenSource() (rc io.ReadCloser, err error) {
return ioutil.NopCloser(r.source), nil
}

func NewString(s string) core.DataSource {
return &stringSource{Reader: strings.NewReader(s)}
}

type stringSource struct {
*strings.Reader
ioutil2.NopCloser
}

func (s stringSource) OpenSource() (rc io.ReadCloser, err error) {
return s, nil
}

type InlineConfig struct {
Data string `validate:"required"`
}

func NewInline(conf InlineConfig) core.DataSource {
return NewString(conf.Data)
}

// TODO(skipor): InMemory DataSource, that reads all nested source data in open to buffer.
97 changes: 89 additions & 8 deletions core/import/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,36 @@
// license that can be found in the LICENSE file.
// Author: Vladimir Skipor <[email protected]>

package core
package coreimport

import (
"reflect"

"github.com/spf13/afero"
"github.com/yandex/pandora/core/aggregator"
"github.com/yandex/pandora/core/datasink"
"github.com/yandex/pandora/lib/tag"
"go.uber.org/zap"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator"
"github.com/yandex/pandora/core/aggregator/netsample"
"github.com/yandex/pandora/core/config"
"github.com/yandex/pandora/core/datasink"
"github.com/yandex/pandora/core/datasource"
"github.com/yandex/pandora/core/plugin"
"github.com/yandex/pandora/core/plugin/pluginconfig"
"github.com/yandex/pandora/core/provider"
"github.com/yandex/pandora/core/register"
"github.com/yandex/pandora/core/schedule"
"github.com/yandex/pandora/lib/tag"
)

const (
fileSinkKey = "file"
fileDataKey = "file"
compositeScheduleKey = "composite"
)

func Import(fs afero.Fs) {

register.DataSink(fileSinkKey, func(conf datasink.FileConfig) core.DataSink {
register.DataSink(fileDataKey, func(conf datasink.FileConfig) core.DataSink {
return datasink.NewFile(fs, conf)
})
const (
Expand All @@ -48,11 +50,31 @@ func Import(fs afero.Fs) {
return
})

register.DataSource(fileDataKey, func(conf datasource.FileConfig) core.DataSource {
return datasource.NewFile(fs, conf)
})
const (
stdinSourceKey = "stdin"
)
register.DataSource(stdinSourceKey, datasource.NewStdin)
AddSinkConfigHook(func(str string) (ok bool, pluginType string, _ map[string]interface{}) {
if str != stdinSourceKey {
return
}
return true, stdinSourceKey, nil
})
register.DataSource("inline", datasource.NewInline)

// NOTE(skipor): json provider SHOULD NOT used normally. Register your own, that will return
// type that you need, but untyped map.
RegisterCustomJSONProvider("json", func() core.Ammo { return map[string]interface{}{} })

register.Aggregator("phout", func(conf netsample.PhoutConfig) (core.Aggregator, error) {
a, err := netsample.NewPhout(fs, conf)
return netsample.WrapAggregator(a), err
})
register.Aggregator("jsonlines", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig)
register.Aggregator("json", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) // TODO(skipor): should be done via alias, but we don't have them yet
register.Aggregator("log", aggregator.NewLog)
register.Aggregator("discard", aggregator.NewDiscard)

Expand Down Expand Up @@ -86,13 +108,70 @@ func isPluginOrFactory(expectedPluginType, actualType reflect.Type) bool {
type PluginConfigStringHook func(str string) (ok bool, pluginType string, conf map[string]interface{})

var (
dataSinkConfigHooks []PluginConfigStringHook
dataSinkConfigHooks []PluginConfigStringHook
dataSourceConfigHooks []PluginConfigStringHook
)

func AddSinkConfigHook(hook PluginConfigStringHook) {
dataSinkConfigHooks = append(dataSinkConfigHooks, hook)
}

func AddSourceConfigHook(hook PluginConfigStringHook) {
dataSourceConfigHooks = append(dataSourceConfigHooks, hook)
}

func RegisterCustomJSONProvider(name string, newAmmo func() core.Ammo) {
register.Provider(name, func(conf provider.JSONProviderConfig) core.Provider {
return provider.NewJSONProvider(newAmmo, conf)
}, provider.DefaultJSONProviderConfig)
}

// sourceStringHook helps to decode string as core.DataSource plugin.
// Try use source hooks and use file as fallback.
func sourceStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
if f.Kind() != reflect.String {
return data, nil
}
if !isPluginOrFactory(dataSourceType, t) {
return data, nil
}
if tag.Debug {
zap.L().Debug("DataSource string hook triggered")
}
var (
ok bool
pluginType string
conf map[string]interface{}
)
dataStr := data.(string)

for _, hook := range dataSourceConfigHooks {
ok, pluginType, conf = hook(dataStr)
zap.L().Debug("Source hooked", zap.String("plugin", pluginType))
if ok {
break
}
}

if !ok {
zap.L().Debug("Consider source as a file", zap.String("source", dataStr))
pluginType = fileDataKey
conf = map[string]interface{}{
"path": data,
}
}

if conf == nil {
conf = make(map[string]interface{})
}
conf[pluginconfig.PluginNameKey] = pluginType

if tag.Debug {
zap.L().Debug("Hooked DataSource config", zap.Any("config", conf))
}
return conf, nil
}

// sinkStringHook helps to decode string as core.DataSink plugin.
// Try use sink hooks and use file as fallback.
func sinkStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) {
Expand All @@ -114,13 +193,15 @@ func sinkStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface

for _, hook := range dataSinkConfigHooks {
ok, pluginType, conf = hook(dataStr)
zap.L().Debug("Sink hooked", zap.String("plugin", pluginType))
if ok {
break
}
}

if !ok {
pluginType = fileSinkKey
zap.L().Debug("Consider sink as a file", zap.String("source", dataStr))
pluginType = fileDataKey
conf = map[string]interface{}{
"path": data,
}
Expand Down
4 changes: 3 additions & 1 deletion core/import/import_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package core
package coreimport

import (
"context"
Expand Down Expand Up @@ -127,6 +127,8 @@ func TestProviderJSONLine(t *testing.T) {
testutil.AssertFileEqual(t, fs, filename, "[0,1,2]\n")
}

// TODO(skipor): test datasources

func testConfig(keyValuePairs ...interface{}) map[string]interface{} {
if len(keyValuePairs)%2 != 0 {
panic("invalid len")
Expand Down
7 changes: 2 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ func main() {
// CLI don't know anything about components initially.
// All extpoints constructors and default configurations should be registered, before CLI run.
fs := afero.NewOsFs()
core.Import(fs)

// Components should not write anything to files.
readOnlyFs := afero.NewReadOnlyFs(fs)
coreimport.Import(fs)
phttp.Import(fs)
example.Import()
phttp.Import(readOnlyFs)

cli.Run()
}

0 comments on commit 983295b

Please sign in to comment.