Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation.RateLimited and TooManyHAClusters to errors catalogue #2009

Merged
merged 7 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* `cortex_discarded_requests_total`
* [CHANGE] Blocks uploaded by ingester no longer contain `__org_id__` label. Compactor now ignores this label and will compact blocks with and without this label together. `mimirconvert` tool will remove the label from blocks as "unknown" label. #1972
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984 #2009
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
Expand Down
40 changes: 40 additions & 0 deletions docs/sources/operators-guide/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,46 @@ Mimir has a limit on the query length.
This limit is applied to partial queries, after they've split (according to time) by the query-frontend. This limit protects the system’s stability from potential abuse or mistakes.
To configure the limit on a per-tenant basis, use the `-store.max-query-length` option (or `max_query_length` in the runtime configuration).

### err-mimir-request-rate-limited

This error occurs when the rate of requests per second is exceeded for this tenant.
zenador marked this conversation as resolved.
Show resolved Hide resolved

How it **works**:

- There is a per-tenant rate limit on the requests per second, and it's applied across all distributors for this tenant.
zenador marked this conversation as resolved.
Show resolved Hide resolved
- The limit is implemented using [token buckets](https://en.wikipedia.org/wiki/Token_bucket).

How to **fix** it:

- Increase the per-tenant limit by using the `-distributor.request-rate-limit` (requests per second) and `-distributor.request-burst-size` (number of requests) options.
zenador marked this conversation as resolved.
Show resolved Hide resolved

### err-mimir-ingestion-rate-limited

This error occurs when the rate of received samples, exemplars and metadata per second is exceeded for this tenant.

How it **works**:

- There is a per-tenant rate limit on the samples, exemplars and metadata that can be ingested per second, and it's applied across all distributors for this tenant.
- The limit is implemented using [token buckets](https://en.wikipedia.org/wiki/Token_bucket).

How to **fix** it:

- Increase the per-tenant limit by using the `-distributor.ingestion-rate-limit` (samples per second) and `-distributor.ingestion-burst-size` (number of samples) options.
zenador marked this conversation as resolved.
Show resolved Hide resolved

### err-mimir-too-many-ha-clusters

This error occurs when a distributor rejects a write request because the number of high-availability (HA) clusters has hit the configured limit for this tenant.
zenador marked this conversation as resolved.
Show resolved Hide resolved

How it **works**:

- The distributor implements an upper limit on the number of clusters that the HA tracker will keep track of for a single tenant.
- It is triggered when the write request would add a new cluster while the number the tenant currently has is already equal to the limit.
- To configure the limit, set the `-distributor.ha-tracker.max-clusters` option.
zenador marked this conversation as resolved.
Show resolved Hide resolved

How to **fix** it:

- Increase the per-tenant limit by using the `-distributor.ha-tracker.max-clusters` option.
zenador marked this conversation as resolved.
Show resolved Hide resolved

## Mimir routes by path

**Write path**:
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

now := mtime.Now()
if !d.requestRateLimiter.AllowN(now, userID, 1) {
validation.DiscardedRequests.WithLabelValues(validation.RateLimited, userID).Add(1)
validation.DiscardedRequests.WithLabelValues(validation.ReasonRateLimited, userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (%v) exceeded", d.requestRateLimiter.Limit(now, userID))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.requestRateLimiter.Limit(now, userID), d.requestRateLimiter.Burst(now, userID)).Error())
}

d.activeUsers.UpdateUserTimestamp(userID, now)
Expand Down Expand Up @@ -643,12 +643,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq
if errors.Is(err, replicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, "%s", err)
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

if errors.Is(err, tooManyClustersError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
validation.DiscardedSamples.WithLabelValues(validation.ReasonTooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return nil, err
Expand Down Expand Up @@ -789,13 +789,13 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
validation.DiscardedSamples.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(d.ingestionRateLimiter.Limit(now, userID), d.ingestionRateLimiter.Burst(now, userID), validatedSamples, validatedExemplars, len(validatedMetadata)).Error())
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(20, 20, 25, 0, 5).Error()),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (2) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 2).Error())},
},
},
"request limit is disabled when set to 0": {
Expand All @@ -452,7 +452,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (1) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(1, 3).Error())},
},
},
}
Expand Down Expand Up @@ -512,10 +512,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 2, 0, 1).Error())},
{samples: 2, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 0, 0, 1).Error())},
},
},
"for each distributor, set an ingestion burst limit.": {
Expand All @@ -525,10 +525,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 10, expectedError: nil},
{samples: 5, expectedError: nil},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 5, 0, 1).Error())},
{samples: 5, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 0, 0, 1).Error())},
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/validation"
)

var (
Expand Down Expand Up @@ -528,7 +530,9 @@ type tooManyClustersError struct {
}

func (e tooManyClustersError) Error() string {
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
return globalerror.TooManyHAClusters.MessageWithLimitConfig(
validation.HATrackerMaxClustersFlag,
fmt.Sprintf("the write request has been rejected because the maximum number of high-availability (HA) clusters has been reached for this tenant (limit: %d)", e.limit))
}

// Needed for errors.Is to work properly.
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestHAClustersLimit(t *testing.T) {
assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b1", now))
waitForClustersUpdate(t, 2, t1, userID)

assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), tooManyClustersError{limit: 2}.Error())

// Move time forward, and make sure that checkReplica for existing cluster works fine.
now = now.Add(5 * time.Second) // higher than "update timeout"
Expand All @@ -627,7 +627,7 @@ func TestHAClustersLimit(t *testing.T) {
waitForClustersUpdate(t, 2, t1, userID)

// But yet another cluster doesn't.
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), tooManyClustersError{limit: 2}.Error())

now = now.Add(5 * time.Second)

Expand Down
23 changes: 22 additions & 1 deletion pkg/util/globalerror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package globalerror

import (
"fmt"
"strings"
)

type ID string
Expand Down Expand Up @@ -47,7 +48,10 @@ const (
MetricMetadataHelpTooLong ID = "help-too-long"
MetricMetadataUnitTooLong ID = "unit-too-long"

MaxQueryLength ID = "max-query-length"
MaxQueryLength ID = "max-query-length"
RequestRateLimited ID = "request-rate-limited"
IngestionRateLimited ID = "ingestion-rate-limited"
zenador marked this conversation as resolved.
Show resolved Hide resolved
TooManyHAClusters ID = "too-many-ha-clusters"
)

// Message returns the provided msg, appending the error id.
Expand All @@ -60,3 +64,20 @@ func (id ID) Message(msg string) string {
func (id ID) MessageWithLimitConfig(flag, msg string) string {
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit by configuring -%s, or by contacting your service administrator.", msg, errPrefix, id, flag)
}

func (id ID) MessageWithLimitConfigs(msg, flag string, addFlags ...string) string {
zenador marked this conversation as resolved.
Show resolved Hide resolved
var sb strings.Builder
sb.WriteString("-")
sb.WriteString(flag)
plural := ""
if len(addFlags) > 0 {
plural = "s"
for _, addFlag := range addFlags[:len(addFlags)-1] {
sb.WriteString(", -")
sb.WriteString(addFlag)
}
sb.WriteString(" and -")
sb.WriteString(addFlags[len(addFlags)-1])
}
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit%s by configuring %s, or by contacting your service administrator.", msg, errPrefix, id, plural, sb.String())
}
22 changes: 22 additions & 0 deletions pkg/util/globalerror/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,25 @@ func TestID_MessageWithLimitConfig(t *testing.T) {
"an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limit by configuring -my-flag, or by contacting your service administrator.",
MissingMetricName.MessageWithLimitConfig("my-flag", "an error"))
}

func TestID_MessageWithLimitConfigs(t *testing.T) {
for _, tc := range []struct {
actual string
expected string
}{
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limit by configuring -my-flag1, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1"),
},
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limits by configuring -my-flag1 and -my-flag2, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1", "my-flag2"),
},
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limits by configuring -my-flag1, -my-flag2 and -my-flag3, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1", "my-flag2", "my-flag3"),
},
} {
assert.Equal(t, tc.actual, tc.expected)
}
}
12 changes: 12 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func NewMaxQueryLengthError(actualQueryLen, maxQueryLength time.Duration) LimitE
fmt.Sprintf("the query time range exceeds the limit (query length: %s, limit: %s)", actualQueryLen, maxQueryLength)))
}

func NewRequestRateLimitedError(limit float64, burst int) LimitError {
return LimitError(globalerror.RequestRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("the request has been rejected because the tenant exceeded the request rate limit, set to %v req/s with a maximum allowed burst of %d", limit, burst),
requestRateFlag, requestBurstSizeFlag))
}

func NewIngestionRateLimitedError(limit float64, burst, numSamples, numExemplars, numMetadata int) LimitError {
return LimitError(globalerror.IngestionRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("the request has been rejected because the tenant exceeded the ingestion rate limit, set to %v items/s with a maximum allowed burst of %d, while adding %d samples, %d exemplars and %d metadata", limit, burst, numSamples, numExemplars, numMetadata),
ingestionRateFlag, ingestionBurstSizeFlag))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
Loading