Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: Support for disallowing Push API for ingest storage #7503

Merged
merged 8 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,10 @@ check-doc-validator: ## Check documentation using doc-validator tool

.PHONY: reference-help
reference-help: ## Generates the reference help documentation.
reference-help: cmd/mimir/mimir
reference-help: cmd/mimir/mimir tools/config-inspector/config-inspector
@(./cmd/mimir/mimir -h || true) > cmd/mimir/help.txt.tmpl
@(./cmd/mimir/mimir -help-all || true) > cmd/mimir/help-all.txt.tmpl
@(go run ./tools/config-inspector || true) > cmd/mimir/config-descriptor.json
@(./tools/config-inspector/config-inspector || true) > cmd/mimir/config-descriptor.json
narqo marked this conversation as resolved.
Show resolved Hide resolved

clean-white-noise: ## Clean the white noise in the markdown files.
@find . -path ./.pkg -prune -o -path ./.cache -prune -o -path "*/vendor/*" -prune -or -type f -name "*.md" -print | \
Expand Down
3 changes: 0 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package api

import (
"context"
"flag"
"net/http"
"path"
Expand All @@ -32,7 +31,6 @@ import (
frontendv2 "github.com/grafana/mimir/pkg/frontend/v2"
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier"
querierapi "github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/querier/tenantfederation"
Expand Down Expand Up @@ -281,7 +279,6 @@ type Ingester interface {
ShutdownHandler(http.ResponseWriter, *http.Request)
PrepareShutdownHandler(http.ResponseWriter, *http.Request)
PreparePartitionDownscaleHandler(http.ResponseWriter, *http.Request)
PushWithCleanup(context.Context, *mimirpb.WriteRequest, func()) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method PushWithCleanup() shouldn't be required by the api.Ingester interface. Thus, I want to remove it here. IMHO, this makes the code a tiny bit simpler.

UserRegistryHandler(http.ResponseWriter, *http.Request)
TenantsHandler(http.ResponseWriter, *http.Request)
TenantTSDBHandler(http.ResponseWriter, *http.Request)
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6293,9 +6293,10 @@ func newMockIngesterPusherAdapter(ingester *mockIngester) *mockIngesterPusherAda
}
}

// Push implements ingest.Pusher.
func (c *mockIngesterPusherAdapter) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
return c.ingester.Push(ctx, req)
// PushToStorage implements ingest.Pusher.
func (c *mockIngesterPusherAdapter) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
_, err := c.ingester.Push(ctx, req)
return err
}

// noopIngester is a mocked ingester which does nothing.
Expand Down
21 changes: 18 additions & 3 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (

const (
integerUnavailableMsgFormat = "ingester is unavailable (current state: %s)"
tooBusyErrorMsg = "the ingester is currently too busy to process queries, try again later"
ingesterTooBusyMsg = "ingester is currently too busy to process queries, try again later"
ingesterPushGrpcDisabledMsg = "ingester is configured with Push gRPC method disabled"
)

var (
tooBusyError = ingesterTooBusyError{}
errTooBusy = ingesterTooBusyError{}
errPushGrpcDisabled = newErrorWithStatus(ingesterPushGrpcDisabledError{}, codes.Unimplemented)
)

// errorWithStatus is used for wrapping errors returned by ingester.
Expand Down Expand Up @@ -515,7 +517,7 @@ var _ ingesterError = tsdbUnavailableError{}
type ingesterTooBusyError struct{}

func (e ingesterTooBusyError) Error() string {
return tooBusyErrorMsg
return ingesterTooBusyMsg
}

func (e ingesterTooBusyError) errorCause() mimirpb.ErrorCause {
Expand All @@ -525,6 +527,19 @@ func (e ingesterTooBusyError) errorCause() mimirpb.ErrorCause {
// Ensure that ingesterTooBusyError is an ingesterError.
var _ ingesterError = ingesterTooBusyError{}

type ingesterPushGrpcDisabledError struct{}

func (e ingesterPushGrpcDisabledError) Error() string {
return ingesterPushGrpcDisabledMsg
}

func (e ingesterPushGrpcDisabledError) errorCause() mimirpb.ErrorCause {
return mimirpb.METHOD_NOT_ALLOWED
}

// Ensure that ingesterPushGrpcDisabledError is an ingesterError.
var _ ingesterError = ingesterPushGrpcDisabledError{}

type ingesterErrSamplers struct {
sampleTimestampTooOld *log.Sampler
sampleTimestampTooOldOOOEnabled *log.Sampler
Expand Down
34 changes: 17 additions & 17 deletions pkg/ingester/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func TestNewTSDBIngestExemplarErr(t *testing.T) {
}

func TestTooBusyError(t *testing.T) {
require.Error(t, tooBusyError)
require.Equal(t, "the ingester is currently too busy to process queries, try again later", tooBusyError.Error())
checkIngesterError(t, tooBusyError, mimirpb.TOO_BUSY, false)
require.Error(t, errTooBusy)
require.Equal(t, "ingester is currently too busy to process queries, try again later", errTooBusy.Error())
checkIngesterError(t, errTooBusy, mimirpb.TOO_BUSY, false)

wrappedErr := wrapOrAnnotateWithUser(tooBusyError, userID)
require.ErrorIs(t, wrappedErr, tooBusyError)
wrappedErr := wrapOrAnnotateWithUser(errTooBusy, userID)
require.ErrorIs(t, wrappedErr, errTooBusy)
var anotherIngesterTooBusyErr ingesterTooBusyError
require.ErrorAs(t, wrappedErr, &anotherIngesterTooBusyErr)
checkIngesterError(t, wrappedErr, mimirpb.TOO_BUSY, false)
Expand Down Expand Up @@ -731,16 +731,16 @@ func TestMapReadErrorToErrorWithStatus(t *testing.T) {
expectedMessage: fmt.Sprintf("wrapped: %s", newUnavailableError(services.Stopping).Error()),
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE},
},
"tooBusyError gets translated into an errorWithStatus ResourceExhausted error with details": {
err: tooBusyError,
"errTooBusy gets translated into an errorWithStatus ResourceExhausted error with details": {
err: errTooBusy,
expectedCode: codes.ResourceExhausted,
expectedMessage: tooBusyErrorMsg,
expectedMessage: ingesterTooBusyMsg,
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY},
},
"a wrapped tooBusyError gets translated into an errorWithStatus ResourceExhausted error with details": {
err: fmt.Errorf("wrapped: %w", tooBusyError),
"a wrapped errTooBusy gets translated into an errorWithStatus ResourceExhausted error with details": {
err: fmt.Errorf("wrapped: %w", errTooBusy),
expectedCode: codes.ResourceExhausted,
expectedMessage: fmt.Sprintf("wrapped: %s", tooBusyErrorMsg),
expectedMessage: fmt.Sprintf("wrapped: %s", ingesterTooBusyMsg),
expectedDetails: &mimirpb.ErrorDetails{Cause: mimirpb.TOO_BUSY},
},
}
Expand Down Expand Up @@ -776,13 +776,13 @@ func TestMapReadErrorToErrorWithHTTPOrGRPCStatus(t *testing.T) {
err: fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)),
expectedTranslation: newErrorWithStatus(fmt.Errorf("wrapped: %w", newUnavailableError(services.Stopping)), codes.Unavailable),
},
"tooBusyError gets translated into an errorWithHTTPStatus with status code 503": {
err: tooBusyError,
expectedTranslation: newErrorWithHTTPStatus(tooBusyError, http.StatusServiceUnavailable),
"errTooBusy gets translated into an errorWithHTTPStatus with status code 503": {
err: errTooBusy,
expectedTranslation: newErrorWithHTTPStatus(errTooBusy, http.StatusServiceUnavailable),
},
"a wrapped tooBusyError gets translated into an errorWithStatus with status code 503": {
err: fmt.Errorf("wrapped: %w", tooBusyError),
expectedTranslation: newErrorWithHTTPStatus(fmt.Errorf("wrapped: %w", tooBusyError), http.StatusServiceUnavailable),
"a wrapped errTooBusy gets translated into an errorWithStatus with status code 503": {
err: fmt.Errorf("wrapped: %w", errTooBusy),
expectedTranslation: newErrorWithHTTPStatus(fmt.Errorf("wrapped: %w", errTooBusy), http.StatusServiceUnavailable),
},
}
for name, tc := range testCases {
Expand Down
29 changes: 22 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ type Config struct {
UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"`
OwnedSeriesUpdateInterval time.Duration `yaml:"owned_series_update_interval" category:"experimental"`

PushGrpcMethodEnabled bool `yaml:"push_grpc_method_enabled" category:"experimental" doc:"hidden"`

// This config is dynamically injected because defined outside the ingester config.
IngestStorageConfig ingest.Config `yaml:"-"`
}
Expand All @@ -230,6 +232,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.UseIngesterOwnedSeriesForLimits, "ingester.use-ingester-owned-series-for-limits", false, "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.")
f.BoolVar(&cfg.UpdateIngesterOwnedSeries, "ingester.track-ingester-owned-series", false, "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.")
f.DurationVar(&cfg.OwnedSeriesUpdateInterval, "ingester.owned-series-update-interval", 15*time.Second, "How often to check for ring changes and possibly recompute owned series as a result of detected change.")
f.BoolVar(&cfg.PushGrpcMethodEnabled, "ingester.push-grpc-method-enabled", true, "Enables Push gRPC method on ingester. Can be only disabled when using ingest-storage to make sure ingesters only receive data from Kafka.")

// The ingester.return-only-grpc-errors flag has been deprecated.
// According to the migration plan (https://github.com/grafana/mimir/issues/6008#issuecomment-1854320098)
Expand Down Expand Up @@ -3487,14 +3490,26 @@ func (i *Ingester) checkAvailable() error {
return newUnavailableError(s)
}

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
if err == nil {
return &mimirpb.WriteResponse{}, nil
if err != nil {
return i.mapPushErrorToErrorWithStatus(err)
}
return nil
}

// Push implements client.IngesterServer, which is registered into gRPC server.
func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
if !i.cfg.PushGrpcMethodEnabled {
return nil, errPushGrpcDisabled
}

err := i.PushToStorage(ctx, req)
if err != nil {
return nil, err
}
handledErr := i.mapPushErrorToErrorWithStatus(err)
return nil, handledErr
return &mimirpb.WriteResponse{}, err
}

func (i *Ingester) mapPushErrorToErrorWithStatus(err error) error {
Expand Down Expand Up @@ -3724,7 +3739,7 @@ func (i *Ingester) checkReadOverloaded() error {
}

i.metrics.utilizationLimitedRequests.WithLabelValues(reason).Inc()
return tooBusyError
return errTooBusy
}

type utilizationBasedLimiter interface {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ingester/ingester_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ func (i *ActivityTrackerWrapper) Push(ctx context.Context, request *mimirpb.Writ
return i.ing.Push(ctx, request)
}

func (i *ActivityTrackerWrapper) PushWithCleanup(ctx context.Context, r *mimirpb.WriteRequest, cleanUp func()) error {
// No tracking in PushWithCleanup
return i.ing.PushWithCleanup(ctx, r, cleanUp)
}

func (i *ActivityTrackerWrapper) QueryStream(request *client.QueryRequest, server client.Ingester_QueryStreamServer) error {
ix := i.tracker.Insert(func() string {
return requestActivity(server.Context(), "Ingester/QueryStream", request)
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ func TestIngester_ShouldNotCreatePartitionIfThereIsShutdownMarker(t *testing.T)
func createTestIngesterWithIngestStorage(t testing.TB, ingesterCfg *Config, overrides *validation.Overrides, reg prometheus.Registerer) (*Ingester, *kfake.Cluster, *ring.PartitionRingWatcher) {
defaultIngesterConfig := defaultIngesterTestConfig(t)

// Always disable gRPC Push API when testing ingest store.
ingesterCfg.PushGrpcMethodEnabled = false

ingesterCfg.IngestStorageConfig.Enabled = true
ingesterCfg.IngestStorageConfig.KafkaConfig.Topic = "mimir"
ingesterCfg.IngestStorageConfig.KafkaConfig.LastProducedOffsetPollInterval = 100 * time.Millisecond
Expand Down
41 changes: 33 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2537,7 +2537,7 @@ func Test_Ingester_LabelNames(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func Test_Ingester_LabelValues(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2802,7 +2802,7 @@ func TestIngester_LabelNamesAndValues(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -2922,7 +2922,7 @@ func TestIngester_LabelValuesCardinality(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -3414,7 +3414,7 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -3812,7 +3812,7 @@ func TestIngester_QueryStream(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -4271,7 +4271,7 @@ func TestIngester_QueryExemplars(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -5606,7 +5606,7 @@ func Test_Ingester_UserStats(t *testing.T) {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
require.Equal(t, ingesterTooBusyMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
}
Expand Down Expand Up @@ -6748,6 +6748,31 @@ func TestIngester_PushInstanceLimitsWithCircuitBreaker_LimitInflightRequestsUsin
}
}

func TestIngester_PushGrpcMethod_Disabled(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.PushGrpcMethodEnabled = false

registry := prometheus.NewRegistry()

i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until the ingester is healthy
test.Poll(t, 100*time.Millisecond, 1, func() any {
return i.lifecycler.HealthyInstancesCount()
})

ctx := user.InjectOrgID(context.Background(), "test")
req := writeRequestSingleSeries(
labels.FromStrings(labels.MetricName, "foo", "l", "1"),
[]mimirpb.Sample{{TimestampMs: 1_000, Value: 1}},
)
_, err = i.Push(ctx, req)
require.ErrorIs(t, err, errPushGrpcDisabled)
}

func TestIngester_instanceLimitsMetrics(t *testing.T) {
reg := prometheus.NewRegistry()

Expand Down
9 changes: 7 additions & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,13 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.IngestStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid ingest storage config")
}
if c.isAnyModuleEnabled(Ingester, Write, All) && c.IngestStorage.Enabled && !c.Ingester.DeprecatedReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
if c.isAnyModuleEnabled(Ingester, Write, All) {
if c.IngestStorage.Enabled && !c.Ingester.DeprecatedReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
}
if !c.IngestStorage.Enabled && !c.Ingester.PushGrpcMethodEnabled {
return errors.New("cannot disable Push gRPC method in ingester, while ingest storage (-ingest-storage.enabled) is not enabled")
}
}
if err := c.BlocksStorage.Validate(c.Ingester.ActiveSeriesMetrics); err != nil {
return errors.Wrap(err, "invalid TSDB config")
Expand Down
12 changes: 12 additions & 0 deletions pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,18 @@ func TestConfigValidation(t *testing.T) {
},
expectAnyError: true,
},
{
name: "should fails if push api disabled in ingester, and the ingester isn't running with ingest storage",
getTestConfig: func() *Config {
cfg := newDefaultConfig()
_ = cfg.Target.Set("ingester")
cfg.Ingester.PushGrpcMethodEnabled = false
cfg.IngestStorage.Enabled = false

return cfg
},
expectAnyError: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
err := tc.getTestConfig().Validate(log.NewNopLogger())
Expand Down
Loading
Loading