Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement spans exporting for ClickHouse storage in Jaeger V2 #4941

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
config *Config
logger *zap.Logger
traceWriter spanstore.Writer
clickhouse bool
// Separate traces exporting function for ClickHouse storage.
// This is temporary until we have v2 storage API.
chExportTraces func(ctx context.Context, td ptrace.Traces) error
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -29,13 +33,18 @@
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
clickhouse, f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.traceWriter, err = f.CreateTraceWriter(); err != nil {
return fmt.Errorf("cannot create trace writer: %w", err)
if clickhouse {
exp.clickhouse = clickhouse
exp.chExportTraces = f.ChExportSpans

Check warning on line 43 in cmd/jaeger/internal/exporters/storageexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/exporters/storageexporter/exporter.go#L42-L43

Added lines #L42 - L43 were not covered by tests
} else {
if exp.traceWriter, err = f.CreateTraceWriter(); err != nil {
return fmt.Errorf("cannot create trace writer: %w", err)
}
}

return nil
Expand All @@ -47,5 +56,9 @@
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
if exp.clickhouse {
return exp.chExportTraces(ctx, td)

Check warning on line 60 in cmd/jaeger/internal/exporters/storageexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/exporters/storageexporter/exporter.go#L60

Added line #L60 was not covered by tests
}

return exp.traceWriter.WriteTraces(ctx, td)
}
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/exporters/storageexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func createTracesExporter(ctx context.Context, set exporter.Settings, config com
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
// Enable queue settings for Clickhouse only
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
exporterhelper.WithStart(ex.start),
exporterhelper.WithShutdown(ex.close),
)
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

Expand All @@ -22,16 +23,12 @@ type Config struct {
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
ClickHouse map[string]clickhouse.Config `mapstructure:"clickhouse"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
51 changes: 36 additions & 15 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
Expand Down Expand Up @@ -64,13 +65,19 @@
return f, nil
}

func GetStorageFactoryV2(name string, host component.Host) (spanstore.Factory, error) {
func GetStorageFactoryV2(name string, host component.Host) (bool, spanstore.Factory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
return nil, err
return false, nil, err
}

return factoryadapter.NewFactory(f), nil
var clickhouse bool
switch f.(type) {

Check failure on line 75 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *ch.Factory:
clickhouse = true

Check warning on line 77 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}

return clickhouse, factoryadapter.NewFactory(f), nil
}

func newStorageExt(config *Config, otel component.TelemetrySettings) *storageExt {
Expand All @@ -82,24 +89,31 @@
}

type starter[Config any, Factory storage.Factory] struct {
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
clickhouseBuilder func(context.Context, Config, *zap.Logger) Factory
}

func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) error {
func (s *starter[Config, Factory]) build(ctx context.Context, _ component.Host) error {
for name, cfg := range s.cfg {
if _, ok := s.ext.factories[name]; ok {
return fmt.Errorf("duplicate %s storage name %s", s.storageKind, name)
}
factory, err := s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
var factory Factory
if s.clickhouseBuilder != nil {
factory = s.clickhouseBuilder(ctx, cfg, s.ext.logger.With(zap.String("storage_name", name)))

Check warning on line 106 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L106

Added line #L106 was not covered by tests
} else {
var err error
factory, err = s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
}
}
s.ext.factories[name] = factory
}
Expand Down Expand Up @@ -150,6 +164,12 @@
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}
clickhouseStarter := &starter[ch.Config, *ch.Factory]{
ext: s,
storageKind: "clickhouse",
cfg: s.config.ClickHouse,
clickhouseBuilder: ch.NewFactory,
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
Expand All @@ -158,6 +178,7 @@
esStarter.build,
osStarter.build,
cassandraStarter.build,
clickhouseStarter.build,
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestStorageFactoryBadShutdownError(t *testing.T) {

func TestStorageFactoryV2Error(t *testing.T) {
host := componenttest.NewNopHost()
_, err := GetStorageFactoryV2("something", host)
_, _, err := GetStorageFactoryV2("something", host)
require.ErrorContains(t, err, "cannot find extension")
}

Expand All @@ -127,7 +127,7 @@ func TestStorageExtension(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, f)

f2, err := GetStorageFactoryV2(name, host)
_, f2, err := GetStorageFactoryV2(name, host)
require.NoError(t, err)
require.NotNil(t, f2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"context"
"io"

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
storage_v1 "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)
Expand All @@ -26,6 +29,15 @@
panic("not implemented")
}

func (f *Factory) ChExportSpans(ctx context.Context, td ptrace.Traces) error {
switch t := f.ss.(type) {

Check failure on line 33 in cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *clickhouse.Factory:
return t.ChExportSpans(ctx, td)

Check warning on line 35 in cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go#L32-L35

Added lines #L32 - L35 were not covered by tests
}

return nil

Check warning on line 38 in cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/factoryadapter/factory.go#L38

Added line #L38 was not covered by tests
}

// Close implements spanstore.Factory.
func (f *Factory) Close(_ context.Context) error {
if closer, ok := f.ss.(io.Closer); ok {
Expand Down
53 changes: 53 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
# health_check:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679

jaeger_query:
trace_storage: ch_store
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000
clickhouse:
ch_store:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
spans_table_name: jaeger_spans

receivers:
otlp:
protocols:
grpc:
endpoint: 127.0.0.1:4317
http:
endpoint: 127.0.0.1:4318

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: ch_store
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,25 @@ require (
)

require (
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/IBM/sarama v1.43.2 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/aws/aws-sdk-go v1.53.11 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
Expand Down Expand Up @@ -156,6 +161,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.103.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
Expand All @@ -170,8 +176,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v4 v4.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand Down
Loading
Loading