Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation.RateLimited and TooManyHAClusters to errors catalogue #2009

Merged
merged 7 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* `cortex_discarded_requests_total`
* [CHANGE] Blocks uploaded by ingester no longer contain `__org_id__` label. Compactor now ignores this label and will compact blocks with and without this label together. `mimirconvert` tool will remove the label from blocks as "unknown" label. #1972
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984 #2009
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
Expand Down
12 changes: 12 additions & 0 deletions docs/sources/operators-guide/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,18 @@ Mimir has a limit on the query length.
This limit is applied to partial queries, after they've split (according to time) by the query-frontend. This limit protects the system’s stability from potential abuse or mistakes.
To configure the limit on a per-tenant basis, use the `-store.max-query-length` option (or `max_query_length` in the runtime configuration).

### err-mimir-request-rate-limited

TBA

### err-mimir-ingestion-rate-limited

TBA

### err-mimir-too-many-ha-clusters

TBA

## Mimir routes by path

**Write path**:
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

now := mtime.Now()
if !d.requestRateLimiter.AllowN(now, userID, 1) {
validation.DiscardedRequests.WithLabelValues(validation.RateLimited, userID).Add(1)
validation.DiscardedRequests.WithLabelValues(validation.ReasonRateLimited, userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (%v) exceeded", d.requestRateLimiter.Limit(now, userID))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.requestRateLimiter.Limit(now, userID), d.requestRateLimiter.Burst(now, userID)).Error())
}

d.activeUsers.UpdateUserTimestamp(userID, now)
Expand Down Expand Up @@ -643,12 +643,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq
if errors.Is(err, replicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, "%s", err)
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

if errors.Is(err, tooManyClustersError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
validation.DiscardedSamples.WithLabelValues(validation.ReasonTooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return nil, err
Expand Down Expand Up @@ -789,13 +789,13 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
validation.DiscardedSamples.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(d.ingestionRateLimiter.Limit(now, userID), d.ingestionRateLimiter.Burst(now, userID), validatedSamples, validatedExemplars, len(validatedMetadata)).Error())
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(20, 20, 25, 0, 5).Error()),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (2) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 2).Error())},
},
},
"request limit is disabled when set to 0": {
Expand All @@ -452,7 +452,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (1) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(1, 3).Error())},
},
},
}
Expand Down Expand Up @@ -512,10 +512,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 2, 0, 1).Error())},
{samples: 2, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 0, 0, 1).Error())},
},
},
"for each distributor, set an ingestion burst limit.": {
Expand All @@ -525,10 +525,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 10, expectedError: nil},
{samples: 5, expectedError: nil},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 5, 0, 1).Error())},
{samples: 5, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 0, 0, 1).Error())},
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/validation"
)

var (
Expand Down Expand Up @@ -528,7 +530,9 @@ type tooManyClustersError struct {
}

func (e tooManyClustersError) Error() string {
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
return globalerror.TooManyHAClusters.MessageWithLimitConfig(
validation.HATrackerMaxClustersFlag,
fmt.Sprintf("too many HA clusters (limit: %d)", e.limit))
zenador marked this conversation as resolved.
Show resolved Hide resolved
}

// Needed for errors.Is to work properly.
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestHAClustersLimit(t *testing.T) {
assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b1", now))
waitForClustersUpdate(t, 2, t1, userID)

assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), tooManyClustersError{limit: 2}.Error())

// Move time forward, and make sure that checkReplica for existing cluster works fine.
now = now.Add(5 * time.Second) // higher than "update timeout"
Expand All @@ -627,7 +627,7 @@ func TestHAClustersLimit(t *testing.T) {
waitForClustersUpdate(t, 2, t1, userID)

// But yet another cluster doesn't.
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), tooManyClustersError{limit: 2}.Error())

now = now.Add(5 * time.Second)

Expand Down
21 changes: 20 additions & 1 deletion pkg/util/globalerror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package globalerror

import (
"fmt"
"strings"
)

type ID string
Expand Down Expand Up @@ -47,7 +48,10 @@ const (
MetricMetadataHelpTooLong ID = "help-too-long"
MetricMetadataUnitTooLong ID = "unit-too-long"

MaxQueryLength ID = "max-query-length"
MaxQueryLength ID = "max-query-length"
RequestRateLimited ID = "request-rate-limited"
IngestionRateLimited ID = "ingestion-rate-limited"
zenador marked this conversation as resolved.
Show resolved Hide resolved
TooManyHAClusters ID = "too-many-ha-clusters"
)

// Message returns the provided msg, appending the error id.
Expand All @@ -60,3 +64,18 @@ func (id ID) Message(msg string) string {
func (id ID) MessageWithLimitConfig(flag, msg string) string {
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit by configuring -%s, or by contacting your service administrator.", msg, errPrefix, id, flag)
}

func (id ID) MessageWithLimitConfigs(msg, flag string, addFlags ...string) string {
zenador marked this conversation as resolved.
Show resolved Hide resolved
var sb strings.Builder
sb.WriteString("-")
sb.WriteString(flag)
if len(addFlags) > 0 {
for _, addFlag := range addFlags[:len(addFlags)-1] {
sb.WriteString(", -")
sb.WriteString(addFlag)
}
sb.WriteString(" and -")
sb.WriteString(addFlags[len(addFlags)-1])
}
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limits by configuring %s, or by contacting your service administrator.", msg, errPrefix, id, sb.String())
}
12 changes: 12 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func NewMaxQueryLengthError(actualQueryLen, maxQueryLength time.Duration) LimitE
fmt.Sprintf("the query time range exceeds the limit (query length: %s, limit: %s)", actualQueryLen, maxQueryLength)))
}

func NewRequestRateLimitedError(limit float64, burst int) LimitError {
return LimitError(globalerror.RequestRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("request rate limit (%v) with burst (%d) exceeded", limit, burst),
zenador marked this conversation as resolved.
Show resolved Hide resolved
requestRateFlag, requestBurstSizeFlag))
}

func NewIngestionRateLimitedError(limit float64, burst, numSamples, numExemplars, numMetadata int) LimitError {
return LimitError(globalerror.IngestionRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("ingestion rate limit (%v) with burst (%d) exceeded while adding %d samples, %d exemplars and %d metadata", limit, burst, numSamples, numExemplars, numMetadata),
zenador marked this conversation as resolved.
Show resolved Hide resolved
ingestionRateFlag, ingestionBurstSizeFlag))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
15 changes: 10 additions & 5 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
maxMetadataLengthFlag = "validation.max-metadata-length"
creationGracePeriodFlag = "validation.create-grace-period"
maxQueryLengthFlag = "store.max-query-length"
requestRateFlag = "distributor.request-rate-limit"
requestBurstSizeFlag = "distributor.request-burst-size"
ingestionRateFlag = "distributor.ingestion-rate-limit"
ingestionBurstSizeFlag = "distributor.ingestion-burst-size"
HATrackerMaxClustersFlag = "distributor.ha-tracker.max-clusters"
)

// LimitError are errors that do not comply with the limits specified.
Expand Down Expand Up @@ -147,14 +152,14 @@ type Limits struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The tenant's shard size used by shuffle-sharding. Must be set both on ingesters and distributors. 0 disables shuffle sharding.")
f.Float64Var(&l.RequestRate, "distributor.request-rate-limit", 0, "Per-tenant request rate limit in requests per second. 0 to disable.")
f.IntVar(&l.RequestBurstSize, "distributor.request-burst-size", 0, "Per-tenant allowed request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 10000, "Per-tenant ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 200000, "Per-tenant allowed ingestion burst size (in number of samples).")
f.Float64Var(&l.RequestRate, requestRateFlag, 0, "Per-tenant request rate limit in requests per second. 0 to disable.")
f.IntVar(&l.RequestBurstSize, requestBurstSizeFlag, 0, "Per-tenant allowed request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, ingestionRateFlag, 10000, "Per-tenant ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, ingestionBurstSizeFlag, 200000, "Per-tenant allowed ingestion burst size (in number of samples).")
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all tenants, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for a single tenant. 0 to disable the limit.")
f.IntVar(&l.HAMaxClusters, HATrackerMaxClustersFlag, 0, "Maximum number of clusters that HA tracker will keep track of for a single tenant. 0 to disable the limit.")
f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.")
f.IntVar(&l.MaxLabelNameLength, maxLabelNameLengthFlag, 1024, "Maximum length accepted for label names")
f.IntVar(&l.MaxLabelValueLength, maxLabelValueLengthFlag, 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
Expand Down
14 changes: 7 additions & 7 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ import (
const (
discardReasonLabel = "reason"

// RateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
RateLimited = "rate_limited"

// Too many HA clusters is one of the reasons for discarding samples.
TooManyHAClusters = "too_many_ha_clusters"

// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
ExemplarMaxLabelSetLength = 128
Expand Down Expand Up @@ -59,6 +52,13 @@ var (
reasonMetadataMetricNameTooLong = metricReasonFromErrorID(globalerror.MetricMetadataMetricNameTooLong)
reasonMetadataHelpTooLong = metricReasonFromErrorID(globalerror.MetricMetadataHelpTooLong)
reasonMetadataUnitTooLong = metricReasonFromErrorID(globalerror.MetricMetadataUnitTooLong)

// ReasonRateLimited is one of the values for the reason to discard samples.
// Declared here to avoid duplication in ingester and distributor.
ReasonRateLimited = "rate_limited" // same for request and ingestion which are separate errors, so not using metricReasonFromErrorID with global error

// ReasonTooManyHAClusters is one of the reasons for discarding samples.
ReasonTooManyHAClusters = metricReasonFromErrorID(globalerror.TooManyHAClusters)
)

func metricReasonFromErrorID(id globalerror.ID) string {
Expand Down