Skip to content

Commit

Permalink
Peel off block-builder changes not ready for this PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Dec 2, 2024
1 parent 63313b0 commit 509e9a8
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 30 deletions.
6 changes: 0 additions & 6 deletions development/mimir-ingest-storage/config/mimir.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ ingester:
min_partition_owners_duration: 10s
delete_inactive_partition_after: 1m

block_builder:
scheduler_config:
address: mimir-block-builder-scheduler-1:9095
update_interval: 5s
max_update_age: 30m

blocks_storage:
s3:
bucket_name: mimir-blocks
Expand Down
25 changes: 1 addition & 24 deletions pkg/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcclient"

"github.com/grafana/mimir/pkg/storage/ingest"
"github.com/grafana/mimir/pkg/storage/tsdb"
Expand All @@ -27,22 +26,13 @@ type Config struct {
ConsumeIntervalBuffer time.Duration `yaml:"consume_interval_buffer"`
LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`

SchedulerConfig SchedulerConfig `yaml:"scheduler_config" doc:"description=Configures block-builder-scheduler RPC communications."`

ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

// Config parameters defined outside the block-builder config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
}

type SchedulerConfig struct {
Address string `yaml:"address"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the block-builders and block-builder-schedulers."`
UpdateInterval time.Duration `yaml:"update_interval" doc:"description=Interval between scheduler updates."`
MaxUpdateAge time.Duration `yaml:"max_update_age" doc:"description=Maximum age of jobs to continue sending to the scheduler."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -58,24 +48,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.")
f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.")

cfg.SchedulerConfig.GRPCClientConfig.RegisterFlags(f)
}

func (cfg *SchedulerConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Address, "block-builder.scheduler.address", "", "GRPC listen address of the block-builder-scheduler service.")
f.DurationVar(&cfg.UpdateInterval, "block-builder.scheduler.update-interval", 20*time.Second, "Interval between scheduler updates.")
f.DurationVar(&cfg.MaxUpdateAge, "block-builder.scheduler.max-update-age", 30*time.Minute, "Maximum age of jobs to continue sending to the scheduler.")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("block-builder.scheduler.grpc-client-config", f)
}

func (cfg *Config) Validate() error {
if err := cfg.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}

if err := cfg.SchedulerConfig.GRPCClientConfig.Validate(); err != nil {
return fmt.Errorf("scheduler grpc config: %w", err)
return err
}

if len(cfg.PartitionAssignment) == 0 {
Expand Down

0 comments on commit 509e9a8

Please sign in to comment.