Skip to content

Commit

Permalink
Add storage to the local-blocks processor and flush RF1 blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
mapno committed May 20, 2024
1 parent cf76d22 commit 5a36e23
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 44 deletions.
4 changes: 2 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (t *App) initIngester() (services.Service, error) {

func (t *App) initGenerator() (services.Service, error) {
t.cfg.Generator.Ring.ListenPort = t.cfg.Server.GRPCListenPort
genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, log.Logger)
genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.store, log.Logger)
if errors.Is(err, generator.ErrUnconfigured) && t.cfg.Target != MetricsGenerator { // just warn if we're not running the metrics-generator
level.Warn(log.Logger).Log("msg", "metrics-generator is not configured.", "err", err)
return services.NewIdleService(nil, nil), nil
Expand Down Expand Up @@ -576,7 +576,7 @@ func (t *App) setupModuleManager() error {
QueryFrontend: {Common, Store, OverridesAPI},
Distributor: {Common, IngesterRing, MetricsGeneratorRing},
Ingester: {Common, Store, MemberlistKV},
MetricsGenerator: {Common, MemberlistKV},
MetricsGenerator: {Common, Store, MemberlistKV},
Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
Compactor: {Common, Store, MemberlistKV},
// composite targets
Expand Down
15 changes: 10 additions & 5 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/tempo/modules/generator/storage"
objStorage "github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/tempopb"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
)
Expand Down Expand Up @@ -52,6 +53,8 @@ type Generator struct {
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

store objStorage.Store

// When set to true, the generator will refuse incoming pushes
// and will flush any remaining metrics.
readOnly atomic.Bool
Expand All @@ -61,7 +64,7 @@ type Generator struct {
}

// New makes a new Generator.
func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, logger log.Logger) (*Generator, error) {
func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, store objStorage.Store, logger log.Logger) (*Generator, error) {
if cfg.Storage.Path == "" {
return nil, ErrUnconfigured
}
Expand All @@ -77,6 +80,8 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist

instances: map[string]*instance{},

store: store,

reg: reg,
logger: logger,
}
Expand Down Expand Up @@ -264,20 +269,20 @@ func (g *Generator) createInstance(id string) (*instance, error) {

tracesWAL, err = tempodb_wal.New(&tracesWALCfg)
if err != nil {
wal.Close()
_ = wal.Close()
return nil, err
}
}

inst, err := newInstance(g.cfg, id, g.overrides, wal, reg, g.logger, tracesWAL)
inst, err := newInstance(g.cfg, id, g.overrides, wal, reg, g.logger, tracesWAL, g.store)
if err != nil {
wal.Close()
_ = wal.Close()
return nil, err
}

err = g.reg.Register(reg)
if err != nil {
wal.Close()
inst.shutdown()
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ overrides:
generatorConfig.Storage.Path = t.TempDir()
generatorConfig.Ring.KVStore.Store = "inmemory"
generatorConfig.Processor.SpanMetrics.RegisterFlagsAndApplyDefaults("", nil)
g, err := New(generatorConfig, o, prometheus.NewRegistry(), newTestLogger(t))
g, err := New(generatorConfig, o, prometheus.NewRegistry(), nil, newTestLogger(t))
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), g))

Expand Down
7 changes: 5 additions & 2 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/tempo/tempodb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -75,6 +76,7 @@ type instance struct {
wal storage.Storage

traceWAL *wal.WAL
writer tempodb.Writer

// processorsMtx protects the processors map, not the processors itself
processorsMtx sync.RWMutex
Expand All @@ -88,7 +90,7 @@ type instance struct {
logger log.Logger
}

func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverrides, wal storage.Storage, reg prometheus.Registerer, logger log.Logger, traceWAL *wal.WAL) (*instance, error) {
func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverrides, wal storage.Storage, reg prometheus.Registerer, logger log.Logger, traceWAL *wal.WAL, writer tempodb.Writer) (*instance, error) {
logger = log.With(logger, "tenant", instanceID)

i := &instance{
Expand All @@ -99,6 +101,7 @@ func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverr
registry: registry.New(&cfg.Registry, overrides, instanceID, wal, logger),
wal: wal,
traceWAL: traceWAL,
writer: writer,

processors: make(map[string]processor.Processor),

Expand Down Expand Up @@ -296,7 +299,7 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
case servicegraphs.Name:
newProcessor = servicegraphs.New(cfg.ServiceGraphs, i.instanceID, i.registry, i.logger)
case localblocks.Name:
p, err := localblocks.New(cfg.LocalBlocks, i.instanceID, i.traceWAL, i.overrides)
p, err := localblocks.New(cfg.LocalBlocks, i.instanceID, i.traceWAL, i.writer, i.overrides)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func Test_instance_concurrency(t *testing.T) {
servicegraphs.Name: {},
}

instance1, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil)
instance1, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil)
assert.NoError(t, err)

instance2, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil)
instance2, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil)
assert.NoError(t, err)

end := make(chan struct{})
Expand Down Expand Up @@ -89,7 +89,7 @@ func Test_instance_updateProcessors(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
overrides := mockOverrides{}

instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil)
instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil, nil)
assert.NoError(t, err)

// stop the update goroutine
Expand Down Expand Up @@ -293,7 +293,7 @@ func Test_instanceQueryRangeTraceQLToProto(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
overrides := mockOverrides{}

instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil)
instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil, nil)
assert.NoError(t, err)

req := &tempopb.QueryRangeRequest{
Expand Down
74 changes: 65 additions & 9 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/go-kit/log/level"
"github.com/golang/groupcache/lru"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/tempodb"
"github.com/opentracing/opentracing-go"
"go.uber.org/atomic"

Expand Down Expand Up @@ -52,17 +54,19 @@ type Processor struct {
blocksMtx sync.RWMutex
headBlock common.WALBlock
walBlocks map[uuid.UUID]common.WALBlock
completeBlocks map[uuid.UUID]common.BackendBlock
completeBlocks map[uuid.UUID]*ingester.LocalBlock
lastCutTime time.Time

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes

writer tempodb.Writer
}

var _ gen.Processor = (*Processor)(nil)

func New(cfg Config, tenant string, wal *wal.WAL, overrides ProcessorOverrides) (p *Processor, err error) {
func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrides ProcessorOverrides) (p *Processor, err error) {
if wal == nil {
return nil, errors.New("local blocks processor requires traces wal")
}
Expand All @@ -82,23 +86,25 @@ func New(cfg Config, tenant string, wal *wal.WAL, overrides ProcessorOverrides)
overrides: overrides,
enc: enc,
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]common.BackendBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
writer: writer,
}

err = p.reloadBlocks()
if err != nil {
return nil, fmt.Errorf("replaying blocks: %w", err)
}

p.wg.Add(4)
p.wg.Add(5)
go p.cutLoop()
go p.completeLoop()
go p.flushLoop()
go p.deleteLoop()
go p.completeLoop()
go p.metricLoop()

return p, nil
Expand Down Expand Up @@ -173,7 +179,7 @@ func (p *Processor) Shutdown(context.Context) {
}
}

func (p *Processor) flushLoop() {
func (p *Processor) cutLoop() {
defer p.wg.Done()

flushTicker := time.NewTicker(p.Cfg.FlushCheckPeriod)
Expand Down Expand Up @@ -238,6 +244,26 @@ func (p *Processor) completeLoop() {
}
}

func (p *Processor) flushLoop() {
defer p.wg.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := p.flushBlock()
if err != nil {
level.Error(log.WithUserID(p.tenant, log.Logger)).Log("msg", "local blocks processor failed to flush a block", "err", err)
}

case <-p.closeCh:
return
}
}
}

func (p *Processor) metricLoop() {
defer p.wg.Done()

Expand Down Expand Up @@ -299,7 +325,7 @@ func (p *Processor) completeBlock() error {
p.blocksMtx.Lock()
defer p.blocksMtx.Unlock()

p.completeBlocks[newMeta.BlockID] = newBlock
p.completeBlocks[newMeta.BlockID] = ingester.NewLocalBlock(ctx, newBlock, p.wal.LocalBackend())

err = b.Clear()
if err != nil {
Expand All @@ -310,6 +336,27 @@ func (p *Processor) completeBlock() error {
return nil
}

func (p *Processor) flushBlock() error {
var firstCompleteBlock *ingester.LocalBlock
p.blocksMtx.RLock()
for _, e := range p.completeBlocks {
firstCompleteBlock = e
break
}
p.blocksMtx.RUnlock()

if firstCompleteBlock == nil {
return nil
}

ctx := context.Background()
err := p.writer.WriteBlock(ctx, firstCompleteBlock)
if err != nil {
return err
}
return nil
}

func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequest) (*tempopb.SpanMetricsResponse, error) {
p.blocksMtx.RLock()
defer p.blocksMtx.RUnlock()
Expand Down Expand Up @@ -558,13 +605,21 @@ func (p *Processor) deleteOldBlocks() (err error) {
}

for id, b := range p.completeBlocks {
if b.BlockMeta().EndTime.Before(before) {
flushedTime := b.FlushedTime()
if flushedTime.IsZero() {
continue
}

if flushedTime.Add(p.Cfg.CompleteBlockTimeout).Before(time.Now()) {
err = p.wal.LocalBackend().ClearBlock(id, p.tenant)
if err != nil {
return err
}
delete(p.completeBlocks, id)
}

//if b.BlockMeta().EndTime.Before(before) {
//}
}

return
Expand Down Expand Up @@ -738,7 +793,8 @@ func (p *Processor) reloadBlocks() error {
return err
}

p.completeBlocks[id] = blk
lb := ingester.NewLocalBlock(ctx, blk, l)
p.completeBlocks[id] = lb
}

return nil
Expand Down
27 changes: 25 additions & 2 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/grafana/tempo/tempodb"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
Expand All @@ -32,6 +34,22 @@ func (m *mockOverrides) UnsafeQueryHints(string) bool {
return false
}

var _ tempodb.Writer = (*mockWriter)(nil)

type mockWriter struct{}

func (m *mockWriter) WriteBlock(context.Context, tempodb.WriteableBlock) error { return nil }

func (m *mockWriter) CompleteBlock(context.Context, common.WALBlock) (common.BackendBlock, error) {
return nil, nil
}

func (m *mockWriter) CompleteBlockWithBackend(context.Context, common.WALBlock, backend.Reader, backend.Writer) (common.BackendBlock, error) {
return nil, nil
}

func (m *mockWriter) WAL() *wal.WAL { return nil }

func TestProcessorDoesNotRace(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Expand Down Expand Up @@ -59,7 +77,7 @@ func TestProcessorDoesNotRace(t *testing.T) {
overrides = &mockOverrides{}
)

p, err := New(cfg, tenant, wal, overrides)
p, err := New(cfg, tenant, wal, &mockWriter{}, overrides)
require.NoError(t, err)

var (
Expand Down Expand Up @@ -112,6 +130,11 @@ func TestProcessorDoesNotRace(t *testing.T) {
require.NoError(t, err, "completing block")
})

go concurrent(func() {
err := p.flushBlock()
require.NoError(t, err, "flushing blocks")
})

go concurrent(func() {
err := p.deleteOldBlocks()
require.NoError(t, err, "deleting old blocks")
Expand Down Expand Up @@ -176,7 +199,7 @@ func TestReplicationFactor(t *testing.T) {
FilterServerSpans: false,
}

p, err := New(cfg, "fake", wal, &mockOverrides{})
p, err := New(cfg, "fake", wal, &mockWriter{}, &mockOverrides{})
require.NoError(t, err)

tr := test.MakeTrace(10, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
Expand Down
Loading

0 comments on commit 5a36e23

Please sign in to comment.