From 286c23cc6bc77bbcc20cd34df50003395a6ef7c8 Mon Sep 17 00:00:00 2001 From: Vladimir Varankin Date: Wed, 25 Sep 2024 20:12:36 +0200 Subject: [PATCH] Clean up deprecated features for 2.14 (#9407) * ingester, distributor: remove deprecated limit-inflight-requests-using-grpc-method-limiter Signed-off-by: Vladimir Varankin * querier: remove deprecated -querier.max-query-into-future Signed-off-by: Vladimir Varankin * update CHANGELOG Signed-off-by: Vladimir Varankin * rebuild assets Signed-off-by: Vladimir Varankin --------- Signed-off-by: Vladimir Varankin --- CHANGELOG.md | 2 + cmd/mimir/config-descriptor.json | 33 -- cmd/mimir/help-all.txt.tmpl | 6 - cmd/mimir/main_test.go | 6 +- .../configuration-parameters/index.md | 14 - pkg/distributor/distributor.go | 6 +- pkg/ingester/ingester.go | 5 +- pkg/ingester/ingester_test.go | 390 +++++++----------- pkg/mimir/modules.go | 26 +- pkg/querier/querier.go | 31 +- pkg/querier/querier_test.go | 66 ++- 11 files changed, 212 insertions(+), 373 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9db8b06f0b0..449c45fd4d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ * [CHANGE] Querier: allow wrapping errors with context errors only when the former actually correspond to `context.Canceled` and `context.DeadlineExceeded`. #9175 * [CHANGE] Query-scheduler: Remove the experimental `-query-scheduler.use-multi-algorithm-query-queue` flag. The new multi-algorithm tree queue is always used for the scheduler. #9210 * [CHANGE] Distributor: reject incoming requests until the distributor service has started. #9317 +* [CHANGE] Ingester, Distributor: Remove deprecated `-ingester.limit-inflight-requests-using-grpc-method-limiter` and `-distributor.limit-inflight-requests-using-grpc-method-limiter`. The feature was deprecated and enabled by default in Mimir 2.12. #9407 +* [CHANGE] Querier: Remove deprecated `-querier.max-query-into-future`. The feature was deprecated in Mimir 2.12. #9407 * [FEATURE] Alertmanager: Added `-alertmanager.log-parsing-label-matchers` to control logging when parsing label matchers. This flag is intended to be used with `-alertmanager.utf8-strict-mode-enabled` to validate UTF-8 strict mode is working as intended. The default value is `false`. #9173 * [FEATURE] Alertmanager: Added `-alertmanager.utf8-migration-logging-enabled` to enable logging of tenant configurations that are incompatible with UTF-8 strict mode. The default value is `false`. #9174 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995 #9008 #9017 #9018 #9019 #9120 #9121 #9136 #9139 #9140 #9145 #9191 #9192 #9194 #9196 #9201 #9212 #9225 #9260 #9272 #9277 #9278 #9280 #9281 #9342 #9343 #9367 #9368 #9371 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 4f77e1e8c86..4e77e051f3c 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1680,17 +1680,6 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, - { - "kind": "field", - "name": "limit_inflight_requests_using_grpc_method_limiter", - "required": false, - "desc": "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.", - "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "distributor.limit-inflight-requests-using-grpc-method-limiter", - "fieldType": "boolean", - "fieldCategory": "deprecated" - }, { "kind": "field", "name": "reusable_ingester_push_workers", @@ -1734,17 +1723,6 @@ "fieldType": "duration", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "max_query_into_future", - "required": false, - "desc": "Maximum duration into the future you can query. 0 to disable.", - "fieldValue": null, - "fieldDefaultValue": 600000000000, - "fieldFlag": "querier.max-query-into-future", - "fieldType": "duration", - "fieldCategory": "deprecated" - }, { "kind": "block", "name": "store_gateway_client", @@ -3425,17 +3403,6 @@ "fieldType": "boolean", "fieldCategory": "experimental" }, - { - "kind": "field", - "name": "limit_inflight_requests_using_grpc_method_limiter", - "required": false, - "desc": "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.", - "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "ingester.limit-inflight-requests-using-grpc-method-limiter", - "fieldType": "boolean", - "fieldCategory": "deprecated" - }, { "kind": "field", "name": "error_sample_rate", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index d33ac244065..9b549261512 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1227,8 +1227,6 @@ Usage of ./cmd/mimir/mimir: The sum of the request sizes in bytes of inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited. -distributor.instance-limits.max-ingestion-rate float Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited. - -distributor.limit-inflight-requests-using-grpc-method-limiter - [deprecated] When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed. (default true) -distributor.max-exemplars-per-series-per-request int [experimental] Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped. -distributor.max-otlp-request-size int @@ -1455,8 +1453,6 @@ Usage of ./cmd/mimir/mimir: Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. 0 = unlimited. -ingester.instance-limits.max-tenants int Max tenants that this ingester can hold. Requests from additional tenants will be rejected. 0 = unlimited. - -ingester.limit-inflight-requests-using-grpc-method-limiter - [deprecated] When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed. (default true) -ingester.log-utilization-based-limiter-cpu-samples [experimental] Enable logging of utilization based limiter CPU samples. -ingester.max-global-exemplars-per-user int @@ -1917,8 +1913,6 @@ Usage of ./cmd/mimir/mimir: Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429. (default 100) -querier.max-partial-query-length duration Limit the time range for partial queries at the querier level. - -querier.max-query-into-future duration - [deprecated] Maximum duration into the future you can query. 0 to disable. (default 10m0s) -querier.max-query-lookback duration Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler for instant, range and remote read queries. For metadata queries like series, label names, label values queries the limit is enforced in the querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable. -querier.max-query-parallelism int diff --git a/cmd/mimir/main_test.go b/cmd/mimir/main_test.go index 4b863f2be89..4de3d9f0e22 100644 --- a/cmd/mimir/main_test.go +++ b/cmd/mimir/main_test.go @@ -104,12 +104,12 @@ func TestFlagParsing(t *testing.T) { defaults := mimir.Config{} flagext.DefaultValues(&defaults) - require.NotZero(t, defaults.Querier.MaxQueryIntoFuture, - "This test asserts that mimir.Config.Querier.MaxQueryIntoFuture default value is not zero. "+ + require.NotZero(t, defaults.Querier.QueryStoreAfter, + "This test asserts that mimir.Config.Querier.QueryStoreAfter default value is not zero. "+ "If it's zero, this test is useless. Please change it to use a config value with a non-zero default.", ) - require.Equal(t, cfg.Querier.MaxQueryIntoFuture, defaults.Querier.MaxQueryIntoFuture, + require.Equal(t, cfg.Querier.QueryStoreAfter, defaults.Querier.QueryStoreAfter, "YAML parser has set the [entire] Querier config to zero values by specifying an empty node."+ "If this happens again, check git history on how this was checked with previous YAML parser implementation.") }, diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 3ae87a15d31..0c8e1141a49 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -948,11 +948,6 @@ instance_limits: # CLI flag: -distributor.write-requests-buffer-pooling-enabled [write_requests_buffer_pooling_enabled: | default = true] -# (deprecated) When enabled, in-flight write requests limit is checked as soon -# as the gRPC request is received, before the request is decoded and parsed. -# CLI flag: -distributor.limit-inflight-requests-using-grpc-method-limiter -[limit_inflight_requests_using_grpc_method_limiter: | default = true] - # (advanced) Number of pre-allocated workers used to forward push requests to # the ingesters. If 0, no workers will be used and a new goroutine will be # spawned for each ingester push request. If not enough workers available, new @@ -1251,11 +1246,6 @@ instance_limits: # CLI flag: -ingester.log-utilization-based-limiter-cpu-samples [log_utilization_based_limiter_cpu_samples: | default = false] -# (deprecated) When enabled, in-flight write requests limit is checked as soon -# as the gRPC request is received, before the request is decoded and parsed. -# CLI flag: -ingester.limit-inflight-requests-using-grpc-method-limiter -[limit_inflight_requests_using_grpc_method_limiter: | default = true] - # (advanced) Each error will be logged once in this many times. Use 0 to log all # of them. # CLI flag: -ingester.error-sample-rate @@ -1365,10 +1355,6 @@ The `querier` block configures the querier. # CLI flag: -querier.query-store-after [query_store_after: | default = 12h] -# (deprecated) Maximum duration into the future you can query. 0 to disable. -# CLI flag: -querier.max-query-into-future -[max_query_into_future: | default = 10m] - store_gateway_client: # (advanced) Enable TLS for gRPC client connecting to store-gateway. # CLI flag: -querier.store-gateway-client.tls-enabled diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 013a949820c..59d9161147b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -221,9 +221,8 @@ type Config struct { // These functions will only receive samples that don't get dropped by HA deduplication. PushWrappers []PushWrapper `yaml:"-"` - WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"` - LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled - ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` + WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"` + ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"` // DirectOTLPTranslationEnabled allows reverting to the older way of translating from OTLP write requests via Prometheus, in case of problems. DirectOTLPTranslationEnabled bool `yaml:"direct_otlp_translation_enabled" category:"experimental"` @@ -244,7 +243,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.") f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.") - f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.") f.IntVar(&cfg.ReusableIngesterPushWorkers, "distributor.reusable-ingester-push-workers", 2000, "Number of pre-allocated workers used to forward push requests to the ingesters. If 0, no workers will be used and a new goroutine will be spawned for each ingester push request. If not enough workers available, new goroutine will be spawned. (Note: this is a performance optimization, not a limiting feature.)") f.BoolVar(&cfg.DirectOTLPTranslationEnabled, "distributor.direct-otlp-translation-enabled", true, "When enabled, OTLP write requests are directly translated to Mimir equivalents, for optimum performance.") diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 14fffdb761e..93462209303 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -199,8 +199,6 @@ type Config struct { ReadPathMemoryUtilizationLimit uint64 `yaml:"read_path_memory_utilization_limit" category:"experimental"` LogUtilizationBasedLimiterCPUSamples bool `yaml:"log_utilization_based_limiter_cpu_samples" category:"experimental"` - LimitInflightRequestsUsingGrpcMethodLimiter bool `yaml:"limit_inflight_requests_using_grpc_method_limiter" category:"deprecated"` // TODO Remove the configuration option in Mimir 2.14, keeping the same behavior as if it's enabled. - ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"advanced"` UseIngesterOwnedSeriesForLimits bool `yaml:"use_ingester_owned_series_for_limits" category:"experimental"` @@ -236,7 +234,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.Float64Var(&cfg.ReadPathCPUUtilizationLimit, "ingester.read-path-cpu-utilization-limit", 0, "CPU utilization limit, as CPU cores, for CPU/memory utilization based read request limiting. Use 0 to disable it.") f.Uint64Var(&cfg.ReadPathMemoryUtilizationLimit, "ingester.read-path-memory-utilization-limit", 0, "Memory limit, in bytes, for CPU/memory utilization based read request limiting. Use 0 to disable it.") f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.") - f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.") f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 10, "Each error will be logged once in this many times. Use 0 to log all of them.") f.BoolVar(&cfg.UseIngesterOwnedSeriesForLimits, "ingester.use-ingester-owned-series-for-limits", false, "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.") f.BoolVar(&cfg.UpdateIngesterOwnedSeries, "ingester.track-ingester-owned-series", false, "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.") @@ -1088,7 +1085,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // Only start/finish request here when the request comes NOT from grpc handlers (i.e., from ingest.Store). // NOTE: request coming from grpc handler may end up calling start multiple times during its lifetime (e.g., when migrating to ingest storage). // startPushRequest handles this. - if i.cfg.IngestStorageConfig.Enabled || !i.cfg.LimitInflightRequestsUsingGrpcMethodLimiter { + if i.cfg.IngestStorageConfig.Enabled { reqSize := int64(req.Size()) var ( shouldFinish bool diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 737b0693af1..7b43b9af992 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4612,50 +4612,39 @@ func TestIngester_LabelNames_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) { } func TestIngester_Push_ShouldNotCreateTSDBIngesterServiceIsNotInRunningState(t *testing.T) { - for _, grpcLimitEnabled := range []bool{false, true} { - t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) { - cfg := defaultIngesterTestConfig(t) - cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled + cfg := defaultIngesterTestConfig(t) - // Configure the lifecycler to not immediately leave the ring, to make sure - // the ingester service will stay in Stopping state for longer. - cfg.IngesterRing.FinalSleep = 5 * time.Second + // Configure the lifecycler to not immediately leave the ring, to make sure + // the ingester service will stay in Stopping state for longer. + cfg.IngesterRing.FinalSleep = 5 * time.Second - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, nil) - require.NoError(t, err) + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, nil) + require.NoError(t, err) - // Start the ingester and then stop it. - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - i.StopAsync() + // Start the ingester and then stop it. + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + i.StopAsync() - // Wait until the ingester service switches to Stopping state. - require.Eventually(t, func() bool { - return i.State() == services.Stopping - }, time.Second, 10*time.Millisecond) + // Wait until the ingester service switches to Stopping state. + require.Eventually(t, func() bool { + return i.State() == services.Stopping + }, time.Second, 10*time.Millisecond) - // Mock request - userID := "test" - ctx := user.InjectOrgID(context.Background(), userID) - req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 0) - - var res *mimirpb.WriteResponse + // Mock request + userID := "test" + ctx := user.InjectOrgID(context.Background(), userID) + req, _, _, _ := mockWriteRequest(t, labels.FromStrings(labels.MetricName, "test"), 0, 0) - if grpcLimitEnabled { - res, err = pushWithSimulatedGRPCHandler(ctx, i, req) - } else { - res, err = i.Push(ctx, req) - } + res, err := pushWithSimulatedGRPCHandler(ctx, i, req) - assert.EqualError(t, err, newUnavailableError(services.Stopping).Error()) - assert.Nil(t, res) + assert.EqualError(t, err, newUnavailableError(services.Stopping).Error()) + assert.Nil(t, res) - // Check if the TSDB has been created - assert.Nil(t, i.getTSDB(userID)) + // Check if the TSDB has been created + assert.Nil(t, i.getTSDB(userID)) - // Wait until terminated. - require.NoError(t, i.AwaitTerminated(context.Background())) - }) - } + // Wait until terminated. + require.NoError(t, i.AwaitTerminated(context.Background())) } func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { @@ -7883,87 +7872,75 @@ func TestIngester_PushInstanceLimits(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - for _, grpcLimiterEnabled := range []bool{false, true} { - t.Run(fmt.Sprintf("with gRPC limiter: %t", grpcLimiterEnabled), func(t *testing.T) { + // Create a mocked ingester + cfg := defaultIngesterTestConfig(t) + cfg.InstanceLimitsFn = func() *InstanceLimits { + return &testData.limits + } - // Create a mocked ingester - cfg := defaultIngesterTestConfig(t) - cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimiterEnabled - cfg.InstanceLimitsFn = func() *InstanceLimits { - return &testData.limits - } + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + // Wait until the ingester is healthy + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) - // Wait until the ingester is healthy - test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return i.lifecycler.HealthyInstancesCount() - }) + // Iterate through users in sorted order (by username). + uids := []string{} + totalPushes := 0 + for uid, requests := range testData.reqs { + uids = append(uids, uid) + totalPushes += len(requests) + } + slices.Sort(uids) - // Iterate through users in sorted order (by username). - uids := []string{} - totalPushes := 0 - for uid, requests := range testData.reqs { - uids = append(uids, uid) - totalPushes += len(requests) - } - slices.Sort(uids) + pushIdx := 0 + for _, uid := range uids { + ctx := user.InjectOrgID(context.Background(), uid) - pushIdx := 0 - for _, uid := range uids { - ctx := user.InjectOrgID(context.Background(), uid) + for _, origReq := range testData.reqs[uid] { + pushIdx++ - for _, origReq := range testData.reqs[uid] { - pushIdx++ + // Clone the request so that it's safe to be sent multiple times. + reqData, marshalErr := origReq.Marshal() + require.NoError(t, marshalErr) + req := &mimirpb.WriteRequest{} + require.NoError(t, req.Unmarshal(reqData)) - // Clone the request so that it's safe to be sent multiple times. - reqData, marshalErr := origReq.Marshal() - require.NoError(t, marshalErr) - req := &mimirpb.WriteRequest{} - require.NoError(t, req.Unmarshal(reqData)) + // We simulate the sequence of calls done by the gRPC handler. + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) - var err error + if pushIdx < totalPushes { + require.NoError(t, err) + } else { + // Last push may expect error. + if testData.expectedErr != nil { + assert.ErrorIs(t, err, testData.expectedErr) - // If gRPC limiter is enabled we simulate the sequence of calls done by the gRPC handler. - if grpcLimiterEnabled { - _, err = pushWithSimulatedGRPCHandler(ctx, i, req) - } else { - _, err = i.Push(ctx, req) + if testData.expectedOptionalLoggingErr { + var optional middleware.OptionalLogging + assert.ErrorAs(t, err, &optional) } - if pushIdx < totalPushes { - require.NoError(t, err) - } else { - // Last push may expect error. - if testData.expectedErr != nil { - assert.ErrorIs(t, err, testData.expectedErr) - - if testData.expectedOptionalLoggingErr { - var optional middleware.OptionalLogging - assert.ErrorAs(t, err, &optional) - } - - if testData.expectedGRPCErr { - s, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok, "expected to be able to convert to gRPC status") - assert.Equal(t, codes.Unavailable, s.Code()) - } - } else { - assert.NoError(t, err) - } + if testData.expectedGRPCErr { + s, ok := grpcutil.ErrorToStatus(err) + require.True(t, ok, "expected to be able to convert to gRPC status") + assert.Equal(t, codes.Unavailable, s.Code()) } - - // imitate time ticking between each push - i.ingestionRate.Tick() - - rate := testutil.ToFloat64(i.metrics.ingestionRate) - require.NotZero(t, rate) + } else { + assert.NoError(t, err) } } - }) + + // imitate time ticking between each push + i.ingestionRate.Tick() + + rate := testutil.ToFloat64(i.metrics.ingestionRate) + require.NotZero(t, rate) + } } }) } @@ -8038,31 +8015,27 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) { } func TestIngester_inflightPushRequests(t *testing.T) { - for _, grpcLimitEnabled := range []bool{false, true} { - t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) { - limits := InstanceLimits{MaxInflightPushRequests: 1} + t.Run("with classic ingester", func(t *testing.T) { + limits := InstanceLimits{MaxInflightPushRequests: 1} - cfg := defaultIngesterTestConfig(t) - cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled - cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } + cfg := defaultIngesterTestConfig(t) + cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } - // Create a mocked ingester - reg := prometheus.NewPedanticRegistry() - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) - require.NoError(t, err) + // Create a mocked ingester + reg := prometheus.NewPedanticRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) + require.NoError(t, err) - testIngesterInflightPushRequests(t, i, reg, grpcLimitEnabled) - }) - } + testIngesterInflightPushRequests(t, i, reg) + }) - t.Run("gRPC limit enabled with ingest storage enabled", func(t *testing.T) { + t.Run("with ingest storage enabled", func(t *testing.T) { limits := InstanceLimits{MaxInflightPushRequests: 1} overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) cfg := defaultIngesterTestConfig(t) - cfg.LimitInflightRequestsUsingGrpcMethodLimiter = true cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits } reg := prometheus.NewPedanticRegistry() @@ -8071,11 +8044,11 @@ func TestIngester_inflightPushRequests(t *testing.T) { // Re-enable push gRPC method to simulate migration period, when ingester can receive requests from gRPC i.cfg.PushGrpcMethodEnabled = true - testIngesterInflightPushRequests(t, i, reg, cfg.LimitInflightRequestsUsingGrpcMethodLimiter) + testIngesterInflightPushRequests(t, i, reg) }) } -func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus.Gatherer, grpcLimitEnabled bool) { +func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus.Gatherer) { require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) t.Cleanup(func() { services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -8099,14 +8072,7 @@ func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus. // Signal that we're going to do the real push now. close(startCh) - var err error - - if grpcLimitEnabled { - _, err = pushWithSimulatedGRPCHandler(ctx, i, req) - } else { - _, err = i.Push(ctx, req) - } - + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) return err }) @@ -8124,23 +8090,8 @@ func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus. return i.inflightPushRequests.Load() }) - if grpcLimitEnabled { - _, err := pushWithSimulatedGRPCHandler(ctx, i, req) - require.ErrorIs(t, err, errMaxInflightRequestsReached) - } else { - _, err := i.Push(ctx, req) - require.ErrorIs(t, err, errMaxInflightRequestsReached) - - var optional middleware.OptionalLogging - require.ErrorAs(t, err, &optional) - - shouldLog, _ := optional.ShouldLog(ctx) - require.False(t, shouldLog, "expected not to log via .ShouldLog()") - - s, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok, "expected to be able to convert to gRPC status") - require.Equal(t, codes.Unavailable, s.Code()) - } + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) + require.ErrorIs(t, err, errMaxInflightRequestsReached) return nil }) @@ -8163,117 +8114,94 @@ func testIngesterInflightPushRequests(t *testing.T, i *Ingester, reg prometheus. } func TestIngester_inflightPushRequestsBytes(t *testing.T) { - for _, grpcLimitEnabled := range []bool{false, true} { - t.Run(fmt.Sprintf("gRPC limit enabled: %t", grpcLimitEnabled), func(t *testing.T) { - var limitsMx sync.Mutex - limits := InstanceLimits{MaxInflightPushRequestsBytes: 0} + var limitsMx sync.Mutex + limits := InstanceLimits{MaxInflightPushRequestsBytes: 0} - // Create a mocked ingester - cfg := defaultIngesterTestConfig(t) - cfg.LimitInflightRequestsUsingGrpcMethodLimiter = grpcLimitEnabled - cfg.InstanceLimitsFn = func() *InstanceLimits { - limitsMx.Lock() - defer limitsMx.Unlock() + // Create a mocked ingester + cfg := defaultIngesterTestConfig(t) + cfg.InstanceLimitsFn = func() *InstanceLimits { + limitsMx.Lock() + defer limitsMx.Unlock() - // Make a copy - il := limits - return &il - } + // Make a copy + il := limits + return &il + } - reg := prometheus.NewPedanticRegistry() - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + reg := prometheus.NewPedanticRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil, reg) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - // Wait until the ingester is healthy - test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return i.lifecycler.HealthyInstancesCount() - }) + // Wait until the ingester is healthy + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) - ctx := user.InjectOrgID(context.Background(), "test") + ctx := user.InjectOrgID(context.Background(), "test") - startCh := make(chan int) + startCh := make(chan int) - const targetRequestDuration = time.Second + const targetRequestDuration = time.Second - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration) + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + req := prepareRequestForTargetRequestDuration(ctx, t, i, targetRequestDuration) - // Update instance limits. Set limit to EXACTLY the request size. - limitsMx.Lock() - limits.MaxInflightPushRequestsBytes = int64(req.Size()) - limitsMx.Unlock() + // Update instance limits. Set limit to EXACTLY the request size. + limitsMx.Lock() + limits.MaxInflightPushRequestsBytes = int64(req.Size()) + limitsMx.Unlock() - // Signal that we're going to do the real push now. - startCh <- req.Size() - close(startCh) + // Signal that we're going to do the real push now. + startCh <- req.Size() + close(startCh) - var err error - if grpcLimitEnabled { - _, err = pushWithSimulatedGRPCHandler(ctx, i, req) - } else { - _, err = i.Push(ctx, req) - } - return err - }) + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) + return err + }) - g.Go(func() error { - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase1"), 1, 1024) + g.Go(func() error { + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase1"), 1, 1024) - var requestSize int - select { - case <-ctx.Done(): - // failed to setup - case requestSize = <-startCh: - // we can start the test. - } + var requestSize int + select { + case <-ctx.Done(): + // failed to setup + case requestSize = <-startCh: + // we can start the test. + } - test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} { - return i.inflightPushRequests.Load() - }) + test.Poll(t, targetRequestDuration/3, int64(1), func() interface{} { + return i.inflightPushRequests.Load() + }) - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` # HELP cortex_ingester_inflight_push_requests_bytes Total sum of inflight push request sizes in ingester in bytes. # TYPE cortex_ingester_inflight_push_requests_bytes gauge cortex_ingester_inflight_push_requests_bytes %d `, requestSize)), "cortex_ingester_inflight_push_requests_bytes")) - // Starting push request fails - _, err = i.StartPushRequest(ctx, 100) - require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - - // Starting push request with unknown size fails - _, err = i.StartPushRequest(ctx, 0) - require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - - // Sending push request fails - if grpcLimitEnabled { - _, err := pushWithSimulatedGRPCHandler(ctx, i, req) - require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - } else { - _, err := i.Push(ctx, req) - require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) + // Starting push request fails + _, err = i.StartPushRequest(ctx, 100) + require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - var optional middleware.OptionalLogging - require.ErrorAs(t, err, &optional) + // Starting push request with unknown size fails + _, err = i.StartPushRequest(ctx, 0) + require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - shouldLog, _ := optional.ShouldLog(ctx) - require.False(t, shouldLog, "expected not to log via .ShouldLog()") - - s, ok := grpcutil.ErrorToStatus(err) - require.True(t, ok, "expected to be able to convert to gRPC status") - require.Equal(t, codes.Unavailable, s.Code()) - } + // Sending push request fails + _, err := pushWithSimulatedGRPCHandler(ctx, i, req) + require.ErrorIs(t, err, errMaxInflightRequestsBytesReached) - return nil - }) + return nil + }) - require.NoError(t, g.Wait()) + require.NoError(t, g.Wait()) - // Ensure the rejected request has been tracked in a metric. - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + // Ensure the rejected request has been tracked in a metric. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingester_instance_rejected_requests_total Requests rejected for hitting per-instance limits # TYPE cortex_ingester_instance_rejected_requests_total counter cortex_ingester_instance_rejected_requests_total{reason="ingester_max_inflight_push_requests"} 0 @@ -8282,8 +8210,6 @@ func TestIngester_inflightPushRequestsBytes(t *testing.T) { cortex_ingester_instance_rejected_requests_total{reason="ingester_max_series"} 0 cortex_ingester_instance_rejected_requests_total{reason="ingester_max_tenants"} 0 `), "cortex_ingester_instance_rejected_requests_total")) - }) - } } func prepareRequestForTargetRequestDuration(ctx context.Context, t *testing.T, i *Ingester, targetRequestDuration time.Duration) *mimirpb.WriteRequest { diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 6f92103496c..d5d90b8648c 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -292,26 +292,20 @@ func (t *Mimir) initServer() (services.Service, error) { // t.Ingester or t.Distributor will be available. There's no race condition here, because gRPC server (service returned by this method, ie. initServer) // is started only after t.Ingester and t.Distributor are set in initIngester or initDistributorService. - var ingFn func() pushReceiver - if t.Cfg.Ingester.LimitInflightRequestsUsingGrpcMethodLimiter { - ingFn = func() pushReceiver { - // Return explicit nil, if there's no ingester. We don't want to return typed-nil as interface value. - if t.Ingester == nil { - return nil - } - return t.Ingester + ingFn := func() pushReceiver { + // Return explicit nil if there's no ingester. We don't want to return typed-nil as interface value. + if t.Ingester == nil { + return nil } + return t.Ingester } - var distFn func() pushReceiver - if t.Cfg.Distributor.LimitInflightRequestsUsingGrpcMethodLimiter { - distFn = func() pushReceiver { - // Return explicit nil, if there's no distributor. We don't want to return typed-nil as interface value. - if t.Distributor == nil { - return nil - } - return t.Distributor + distFn := func() pushReceiver { + // Return explicit nil if there's no distributor. We don't want to return typed-nil as interface value. + if t.Distributor == nil { + return nil } + return t.Distributor } // Installing this allows us to reject push requests received via gRPC early -- before they are fully read into memory. diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 795e6b9bb24..94bba4de56b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -44,8 +44,6 @@ import ( type Config struct { // QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters. QueryStoreAfter time.Duration `yaml:"query_store_after" category:"advanced"` - // Deprecated in Mimir 2.12, remove in Mimir 2.14 - MaxQueryIntoFuture time.Duration `yaml:"max_query_into_future" category:"deprecated"` StoreGatewayClient ClientConfig `yaml:"store_gateway_client"` @@ -74,7 +72,6 @@ const ( func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f) - f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag)) f.StringVar(&cfg.PreferAvailabilityZone, "querier.prefer-availability-zone", "", "Preferred availability zone to query ingesters from when using the ingest storage.") @@ -218,15 +215,14 @@ func newQueryable( ) storage.Queryable { return storage.QueryableFunc(func(minT, maxT int64) (storage.Querier, error) { return multiQuerier{ - distributor: distributor, - blockStore: blockStore, - queryMetrics: queryMetrics, - cfg: cfg, - minT: minT, - maxT: maxT, - maxQueryIntoFuture: cfg.MaxQueryIntoFuture, - limits: limits, - logger: logger, + distributor: distributor, + blockStore: blockStore, + queryMetrics: queryMetrics, + cfg: cfg, + minT: minT, + maxT: maxT, + limits: limits, + logger: logger, }, nil }) @@ -240,8 +236,7 @@ type multiQuerier struct { cfg Config minT, maxT int64 - maxQueryIntoFuture time.Duration - limits *validation.Overrides + limits *validation.Overrides logger log.Logger } @@ -262,7 +257,7 @@ func (mq multiQuerier) getQueriers(ctx context.Context) (context.Context, []stor mq.queryMetrics, )) - mq.minT, mq.maxT, err = validateQueryTimeRange(tenantID, mq.minT, mq.maxT, now.UnixMilli(), mq.limits, mq.cfg.MaxQueryIntoFuture, spanlogger.FromContext(ctx, mq.logger)) + mq.minT, mq.maxT, err = validateQueryTimeRange(tenantID, mq.minT, mq.maxT, now.UnixMilli(), mq.limits, spanlogger.FromContext(ctx, mq.logger)) if err != nil { return nil, nil, err } @@ -332,7 +327,7 @@ func (mq multiQuerier) Select(ctx context.Context, _ bool, sp *storage.SelectHin // Validate query time range. Even if the time range has already been validated when we created // the querier, we need to check it again here because the time range specified in hints may be // different. - startMs, endMs, err := validateQueryTimeRange(userID, sp.Start, sp.End, now.UnixMilli(), mq.limits, mq.maxQueryIntoFuture, spanLog) + startMs, endMs, err := validateQueryTimeRange(userID, sp.Start, sp.End, now.UnixMilli(), mq.limits, spanLog) if errors.Is(err, errEmptyTimeRange) { return storage.NoopSeriesSet() } else if err != nil { @@ -586,9 +581,7 @@ func (s *sliceSeriesSet) Warnings() annotations.Annotations { return nil } -func validateQueryTimeRange(userID string, startMs, endMs, now int64, limits *validation.Overrides, maxQueryIntoFuture time.Duration, spanLog *spanlogger.SpanLogger) (int64, int64, error) { - endMs = clampMaxTime(spanLog, endMs, now, maxQueryIntoFuture, "max query into future") - +func validateQueryTimeRange(userID string, startMs, endMs, now int64, limits *validation.Overrides, spanLog *spanlogger.SpanLogger) (int64, int64, error) { maxQueryLookback := limits.MaxQueryLookback(userID) startMs = clampMinTime(spanLog, startMs, now, -maxQueryLookback, "max query lookback") diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 27aedb68b40..17667195e11 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -642,45 +642,34 @@ func TestQuerier_QueryIngestersWithinConfig(t *testing.T) { } } -func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { +func TestQuerier_ValidateQueryTimeRange(t *testing.T) { const engineLookbackDelta = 5 * time.Minute now := time.Now() tests := map[string]struct { - maxQueryIntoFuture time.Duration - queryStartTime time.Time - queryEndTime time.Time - expectedSkipped bool - expectedStartTime time.Time - expectedEndTime time.Time + queryStartTime time.Time + queryEndTime time.Time + expectedStartTime time.Time + expectedEndTime time.Time }{ "should manipulate query if end time is after the limit": { - maxQueryIntoFuture: 10 * time.Minute, - queryStartTime: now.Add(-5 * time.Hour), - queryEndTime: now.Add(1 * time.Hour), - expectedStartTime: now.Add(-5 * time.Hour).Add(-engineLookbackDelta), - expectedEndTime: now.Add(10 * time.Minute), + queryStartTime: now.Add(-5 * time.Hour), + queryEndTime: now.Add(1 * time.Hour), + expectedStartTime: now.Add(-5 * time.Hour).Add(-engineLookbackDelta), + expectedEndTime: now.Add(1 * time.Hour), }, - "should not manipulate query if end time is far in the future but limit is disabled": { - maxQueryIntoFuture: 0, - queryStartTime: now.Add(-5 * time.Hour), - queryEndTime: now.Add(100 * time.Hour), - expectedStartTime: now.Add(-5 * time.Hour).Add(-engineLookbackDelta), - expectedEndTime: now.Add(100 * time.Hour), + "should not manipulate query if end time is far in the future": { + queryStartTime: now.Add(-5 * time.Hour), + queryEndTime: now.Add(100 * time.Hour), + expectedStartTime: now.Add(-5 * time.Hour).Add(-engineLookbackDelta), + expectedEndTime: now.Add(100 * time.Hour), }, - "should not manipulate query if end time is in the future but below the limit": { - maxQueryIntoFuture: 10 * time.Minute, - queryStartTime: now.Add(-100 * time.Minute), - queryEndTime: now.Add(5 * time.Minute), - expectedStartTime: now.Add(-100 * time.Minute).Add(-engineLookbackDelta), - expectedEndTime: now.Add(5 * time.Minute), - }, - "should skip executing a query outside the allowed time range": { - maxQueryIntoFuture: 10 * time.Minute, - queryStartTime: now.Add(50 * time.Minute), - queryEndTime: now.Add(60 * time.Minute), - expectedSkipped: true, + "should manipulate query if start time is far in the future": { + queryStartTime: now.Add(50 * time.Minute), + queryEndTime: now.Add(60 * time.Minute), + expectedStartTime: now.Add(50 * time.Minute).Add(-engineLookbackDelta), + expectedEndTime: now.Add(60 * time.Minute), }, } @@ -695,7 +684,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { flagext.DefaultValues(&cfg) for name, c := range tests { - cfg.MaxQueryIntoFuture = c.maxQueryIntoFuture t.Run(name, func(t *testing.T) { // We don't need to query any data for this test, so an empty store is fine. distributor := &mockDistributor{} @@ -718,16 +706,11 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { _, err = r.Matrix() require.Nil(t, err) - if !c.expectedSkipped { - // Assert on the time range of the actual executed query (5s delta). - delta := float64(5000) - require.Len(t, distributor.Calls, 1) - assert.InDelta(t, util.TimeToMillis(c.expectedStartTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) - assert.InDelta(t, util.TimeToMillis(c.expectedEndTime), int64(distributor.Calls[0].Arguments.Get(3).(model.Time)), delta) - } else { - // Ensure no query has been executed (because skipped). - assert.Len(t, distributor.Calls, 0) - } + // Assert on the time range of the actual executed query (5s delta). + delta := float64(5000) + require.Len(t, distributor.Calls, 1) + assert.InDelta(t, util.TimeToMillis(c.expectedStartTime), int64(distributor.Calls[0].Arguments.Get(2).(model.Time)), delta) + assert.InDelta(t, util.TimeToMillis(c.expectedEndTime), int64(distributor.Calls[0].Arguments.Get(3).(model.Time)), delta) }) } } @@ -1080,7 +1063,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxLabelsQueryRange(t *testing.T) { var cfg Config flagext.DefaultValues(&cfg) - cfg.MaxQueryIntoFuture = 0 limits := defaultLimitsConfig() limits.MaxQueryLookback = model.Duration(thirtyDays * 2)