diff --git a/core/aggregator/jsonlines.go b/core/aggregator/jsonlines.go index 27f1998bc..cc613c817 100644 --- a/core/aggregator/jsonlines.go +++ b/core/aggregator/jsonlines.go @@ -29,7 +29,7 @@ type JSONLineEncoderConfig struct { // JSONIterConfig is subset of jsoniter.Config that may be useful to configure. type JSONIterConfig struct { // MarshalFloatWith6Digits makes float marshalling faster. - MarshalFloatWith6Digits bool + MarshalFloatWith6Digits bool `config:"marshal-float-with-6-digits"` // SortMapKeys useful, when sample contains map object, and you want to see them in same order. SortMapKeys bool `config:"sort-map-keys"` } diff --git a/core/datasource/std.go b/core/datasource/std.go index bd39128ef..a9f0aa222 100644 --- a/core/datasource/std.go +++ b/core/datasource/std.go @@ -9,6 +9,7 @@ import ( "bytes" "io" "io/ioutil" + "strings" "github.com/yandex/pandora/core" "github.com/yandex/pandora/lib/ioutil2" @@ -31,14 +32,14 @@ func (b buffer) OpenSource() (wc io.ReadCloser, err error) { // ioutil.NopCloser if r is not io.Closer. // NOTE(skipor): such wrapping hides Seek and other methods that can be used. func NewReader(r io.Reader) core.DataSource { - return &reader{r} + return &readerSource{r} } -type reader struct { +type readerSource struct { source io.Reader } -func (r *reader) OpenSource() (rc io.ReadCloser, err error) { +func (r *readerSource) OpenSource() (rc io.ReadCloser, err error) { if rc, ok := r.source.(io.ReadCloser); ok { return rc, nil } @@ -53,4 +54,25 @@ func (r *reader) OpenSource() (rc io.ReadCloser, err error) { return ioutil.NopCloser(r.source), nil } +func NewString(s string) core.DataSource { + return &stringSource{Reader: strings.NewReader(s)} +} + +type stringSource struct { + *strings.Reader + ioutil2.NopCloser +} + +func (s stringSource) OpenSource() (rc io.ReadCloser, err error) { + return s, nil +} + +type InlineConfig struct { + Data string `validate:"required"` +} + +func NewInline(conf InlineConfig) core.DataSource { + return NewString(conf.Data) +} + // TODO(skipor): InMemory DataSource, that reads all nested source data in open to buffer. diff --git a/core/import/import.go b/core/import/import.go index 0d6dc1e0c..499015eae 100644 --- a/core/import/import.go +++ b/core/import/import.go @@ -3,34 +3,36 @@ // license that can be found in the LICENSE file. // Author: Vladimir Skipor -package core +package coreimport import ( "reflect" "github.com/spf13/afero" - "github.com/yandex/pandora/core/aggregator" - "github.com/yandex/pandora/core/datasink" - "github.com/yandex/pandora/lib/tag" "go.uber.org/zap" "github.com/yandex/pandora/core" + "github.com/yandex/pandora/core/aggregator" "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/config" + "github.com/yandex/pandora/core/datasink" + "github.com/yandex/pandora/core/datasource" "github.com/yandex/pandora/core/plugin" "github.com/yandex/pandora/core/plugin/pluginconfig" + "github.com/yandex/pandora/core/provider" "github.com/yandex/pandora/core/register" "github.com/yandex/pandora/core/schedule" + "github.com/yandex/pandora/lib/tag" ) const ( - fileSinkKey = "file" + fileDataKey = "file" compositeScheduleKey = "composite" ) func Import(fs afero.Fs) { - register.DataSink(fileSinkKey, func(conf datasink.FileConfig) core.DataSink { + register.DataSink(fileDataKey, func(conf datasink.FileConfig) core.DataSink { return datasink.NewFile(fs, conf) }) const ( @@ -48,11 +50,31 @@ func Import(fs afero.Fs) { return }) + register.DataSource(fileDataKey, func(conf datasource.FileConfig) core.DataSource { + return datasource.NewFile(fs, conf) + }) + const ( + stdinSourceKey = "stdin" + ) + register.DataSource(stdinSourceKey, datasource.NewStdin) + AddSinkConfigHook(func(str string) (ok bool, pluginType string, _ map[string]interface{}) { + if str != stdinSourceKey { + return + } + return true, stdinSourceKey, nil + }) + register.DataSource("inline", datasource.NewInline) + + // NOTE(skipor): json provider SHOULD NOT used normally. Register your own, that will return + // type that you need, but untyped map. + RegisterCustomJSONProvider("json", func() core.Ammo { return map[string]interface{}{} }) + register.Aggregator("phout", func(conf netsample.PhoutConfig) (core.Aggregator, error) { a, err := netsample.NewPhout(fs, conf) return netsample.WrapAggregator(a), err }) register.Aggregator("jsonlines", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) + register.Aggregator("json", aggregator.NewJSONLinesAggregator, aggregator.DefaultJSONLinesAggregatorConfig) // TODO(skipor): should be done via alias, but we don't have them yet register.Aggregator("log", aggregator.NewLog) register.Aggregator("discard", aggregator.NewDiscard) @@ -86,13 +108,70 @@ func isPluginOrFactory(expectedPluginType, actualType reflect.Type) bool { type PluginConfigStringHook func(str string) (ok bool, pluginType string, conf map[string]interface{}) var ( - dataSinkConfigHooks []PluginConfigStringHook + dataSinkConfigHooks []PluginConfigStringHook + dataSourceConfigHooks []PluginConfigStringHook ) func AddSinkConfigHook(hook PluginConfigStringHook) { dataSinkConfigHooks = append(dataSinkConfigHooks, hook) } +func AddSourceConfigHook(hook PluginConfigStringHook) { + dataSourceConfigHooks = append(dataSourceConfigHooks, hook) +} + +func RegisterCustomJSONProvider(name string, newAmmo func() core.Ammo) { + register.Provider(name, func(conf provider.JSONProviderConfig) core.Provider { + return provider.NewJSONProvider(newAmmo, conf) + }, provider.DefaultJSONProviderConfig) +} + +// sourceStringHook helps to decode string as core.DataSource plugin. +// Try use source hooks and use file as fallback. +func sourceStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + if f.Kind() != reflect.String { + return data, nil + } + if !isPluginOrFactory(dataSourceType, t) { + return data, nil + } + if tag.Debug { + zap.L().Debug("DataSource string hook triggered") + } + var ( + ok bool + pluginType string + conf map[string]interface{} + ) + dataStr := data.(string) + + for _, hook := range dataSourceConfigHooks { + ok, pluginType, conf = hook(dataStr) + zap.L().Debug("Source hooked", zap.String("plugin", pluginType)) + if ok { + break + } + } + + if !ok { + zap.L().Debug("Consider source as a file", zap.String("source", dataStr)) + pluginType = fileDataKey + conf = map[string]interface{}{ + "path": data, + } + } + + if conf == nil { + conf = make(map[string]interface{}) + } + conf[pluginconfig.PluginNameKey] = pluginType + + if tag.Debug { + zap.L().Debug("Hooked DataSource config", zap.Any("config", conf)) + } + return conf, nil +} + // sinkStringHook helps to decode string as core.DataSink plugin. // Try use sink hooks and use file as fallback. func sinkStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { @@ -114,13 +193,15 @@ func sinkStringHook(f reflect.Type, t reflect.Type, data interface{}) (interface for _, hook := range dataSinkConfigHooks { ok, pluginType, conf = hook(dataStr) + zap.L().Debug("Sink hooked", zap.String("plugin", pluginType)) if ok { break } } if !ok { - pluginType = fileSinkKey + zap.L().Debug("Consider sink as a file", zap.String("source", dataStr)) + pluginType = fileDataKey conf = map[string]interface{}{ "path": data, } diff --git a/core/import/import_suite_test.go b/core/import/import_suite_test.go index a21a61c32..a39a2163d 100644 --- a/core/import/import_suite_test.go +++ b/core/import/import_suite_test.go @@ -1,4 +1,4 @@ -package core +package coreimport import ( "context" @@ -127,6 +127,8 @@ func TestProviderJSONLine(t *testing.T) { testutil.AssertFileEqual(t, fs, filename, "[0,1,2]\n") } +// TODO(skipor): test datasources + func testConfig(keyValuePairs ...interface{}) map[string]interface{} { if len(keyValuePairs)%2 != 0 { panic("invalid len") diff --git a/main.go b/main.go index 52831a9b1..d59a3aadb 100644 --- a/main.go +++ b/main.go @@ -18,12 +18,9 @@ func main() { // CLI don't know anything about components initially. // All extpoints constructors and default configurations should be registered, before CLI run. fs := afero.NewOsFs() - core.Import(fs) - - // Components should not write anything to files. - readOnlyFs := afero.NewReadOnlyFs(fs) + coreimport.Import(fs) + phttp.Import(fs) example.Import() - phttp.Import(readOnlyFs) cli.Run() }