Skip to content

Commit

Permalink
add validation.RateLimited to error catalogue
Browse files Browse the repository at this point in the history
  • Loading branch information
zenador committed Jun 3, 2022
1 parent 8bc28f1 commit fb00149
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* The following metric is exposed to tell how many requests have been rejected:
* `cortex_discarded_requests_total`
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #2009
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
Expand Down
8 changes: 8 additions & 0 deletions docs/sources/operators-guide/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,14 @@ Mimir has a limit on the query length.
This limit is applied to partial queries, after they've split (according to time) by the query-frontend. This limit protects the system’s stability from potential abuse or mistakes.
You can configure the limit on a per-tenant basis by using the `-store.max-query-length` option (or `max_query_length` in the runtime configuration).
### err-mimir-request-rate-limited
TBA
### err-mimir-ingestion-rate-limited
TBA
## Mimir routes by path
**Write path**:
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

now := mtime.Now()
if !d.requestRateLimiter.AllowN(now, userID, 1) {
validation.DiscardedRequests.WithLabelValues(validation.RateLimited, userID).Add(1)
validation.DiscardedRequests.WithLabelValues(validation.ReasonRateLimited, userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (%v) exceeded", d.requestRateLimiter.Limit(now, userID))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.requestRateLimiter.Limit(now, userID), d.requestRateLimiter.Burst(now, userID)).Error())
}

d.activeUsers.UpdateUserTimestamp(userID, now)
Expand Down Expand Up @@ -783,13 +783,13 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
validation.DiscardedSamples.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(d.ingestionRateLimiter.Limit(now, userID), d.ingestionRateLimiter.Burst(now, userID), validatedSamples, len(validatedMetadata)).Error())
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(20, 20, 25, 5).Error()),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (2) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 2).Error())},
},
},
"request limit is disabled when set to 0": {
Expand All @@ -452,7 +452,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (1) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(1, 3).Error())},
},
},
}
Expand Down Expand Up @@ -512,10 +512,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 2, 1).Error())},
{samples: 2, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 1, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 0, 1).Error())},
},
},
"for each distributor, set an ingestion burst limit.": {
Expand All @@ -525,10 +525,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 10, expectedError: nil},
{samples: 5, expectedError: nil},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 5, 1).Error())},
{samples: 5, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 1, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 0, 1).Error())},
},
},
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/globalerror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ const (
MetricMetadataHelpTooLong ID = "help-too-long"
MetricMetadataUnitTooLong ID = "unit-too-long"

MaxQueryLength ID = "max-query-length"
MaxQueryLength ID = "max-query-length"
RequestRateLimited ID = "request-rate-limited"
IngestionRateLimited ID = "ingestion-rate-limited"
)

// Message returns the provided msg, appending the error id.
Expand All @@ -57,3 +59,7 @@ func (id ID) Message(msg string) string {
func (id ID) MessageWithLimitConfig(flag, msg string) string {
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit by configuring -%s, or by contacting your service administrator.", msg, errPrefix, id, flag)
}

func (id ID) MessageWith2LimitConfig(flag, flag2, msg string) string {
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limits by configuring -%s and -%s, or by contacting your service administrator.", msg, errPrefix, id, flag, flag2)
}
12 changes: 12 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func NewMaxQueryLengthError(actualQueryLen, maxQueryLength time.Duration) LimitE
fmt.Sprintf("the query time range exceeds the limit (query length: %s, limit: %s)", actualQueryLen, maxQueryLength)))
}

func NewRequestRateLimitedError(limit float64, burst int) LimitError {
return LimitError(globalerror.RequestRateLimited.MessageWith2LimitConfig(
requestRateFlag, requestBurstSizeFlag,
fmt.Sprintf("request rate limit (%v) with burst (%d) exceeded", limit, burst)))
}

func NewIngestionRateLimitedError(limit float64, burst, numSamples, numMetadata int) LimitError {
return LimitError(globalerror.IngestionRateLimited.MessageWith2LimitConfig(
ingestionRateFlag, ingestionBurstSizeFlag,
fmt.Sprintf("ingestion rate limit (%v) with burst (%d) exceeded while adding %d samples and %d metadata", limit, burst, numSamples, numMetadata)))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
12 changes: 8 additions & 4 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const (
maxMetadataLengthFlag = "validation.max-metadata-length"
creationGracePeriodFlag = "validation.create-grace-period"
maxQueryLengthFlag = "store.max-query-length"
requestRateFlag = "distributor.request-rate-limit"
requestBurstSizeFlag = "distributor.request-burst-size"
ingestionRateFlag = "distributor.ingestion-rate-limit"
ingestionBurstSizeFlag = "distributor.ingestion-burst-size"
)

// LimitError are errors that do not comply with the limits specified.
Expand Down Expand Up @@ -147,10 +151,10 @@ type Limits struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The tenant's shard size used by shuffle-sharding. Must be set both on ingesters and distributors. 0 disables shuffle sharding.")
f.Float64Var(&l.RequestRate, "distributor.request-rate-limit", 0, "Per-tenant request rate limit in requests per second. 0 to disable.")
f.IntVar(&l.RequestBurstSize, "distributor.request-burst-size", 0, "Per-tenant allowed request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 10000, "Per-tenant ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 200000, "Per-tenant allowed ingestion burst size (in number of samples).")
f.Float64Var(&l.RequestRate, requestRateFlag, 0, "Per-tenant request rate limit in requests per second. 0 to disable.")
f.IntVar(&l.RequestBurstSize, requestBurstSizeFlag, 0, "Per-tenant allowed request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, ingestionRateFlag, 10000, "Per-tenant ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, ingestionBurstSizeFlag, 200000, "Per-tenant allowed ingestion burst size (in number of samples).")
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all tenants, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
const (
discardReasonLabel = "reason"

// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"

// Too many HA clusters is one of the reasons for discarding samples.
TooManyHAClusters = "too_many_ha_clusters"

Expand Down Expand Up @@ -59,6 +55,10 @@ var (
reasonMetadataMetricNameTooLong = metricReasonFromErrorID(globalerror.MetricMetadataMetricNameTooLong)
reasonMetadataHelpTooLong = metricReasonFromErrorID(globalerror.MetricMetadataHelpTooLong)
reasonMetadataUnitTooLong = metricReasonFromErrorID(globalerror.MetricMetadataUnitTooLong)

// ReasonRateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
ReasonRateLimited = "rate_limited" // same for request and ingestion which are separate errors, so not using metricReasonFromErrorID with global error
)

func metricReasonFromErrorID(id globalerror.ID) string {
Expand Down

0 comments on commit fb00149

Please sign in to comment.