Skip to content

Commit

Permalink
Add validation.RateLimited and TooManyHAClusters to errors catalogue (#…
Browse files Browse the repository at this point in the history
…2009)

* add validation.RateLimited to error catalogue

* Add validation.TooManyHAClusters to error catalogue

* update docs

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>

* improve new MessageWithLimitConfig and add tests

* Apply suggestions from code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>

* Update from changes in code review

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
zenador and pracucci authored Jun 8, 2022
1 parent 716d0c6 commit eb812fd
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* The following metric is exposed to tell how many requests have been rejected:
* `cortex_discarded_requests_total`
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984
* [ENHANCEMENT] Improved error messages to make them easier to understand; each now have a unique, global identifier that you can use to look up in the runbooks for more information. #1907 #1919 #1888 #1939 #1984 #2009
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028
Expand Down
40 changes: 40 additions & 0 deletions docs/sources/operators-guide/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,46 @@ Mimir has a limit on the query length.
This limit is applied to partial queries, after they've split (according to time) by the query-frontend. This limit protects the system’s stability from potential abuse or mistakes.
To configure the limit on a per-tenant basis, use the `-store.max-query-length` option (or `max_query_length` in the runtime configuration).
### err-mimir-tenant-max-request-rate
This error occurs when the rate of write requests per second is exceeded for this tenant.
How it **works**:
- There is a per-tenant rate limit on the write requests per second, and it's applied across all distributors for this tenant.
- The limit is implemented using [token buckets](https://en.wikipedia.org/wiki/Token_bucket).
How to **fix** it:
- Increase the per-tenant limit by using the `-distributor.request-rate-limit` (requests per second) and `-distributor.request-burst-size` (number of requests) options (or `request_rate` and `request_burst_size` in the runtime configuration). The configurable burst represents how many requests can temporarily exceed the limit, in case of short traffic peaks. The configured burst size must be greater or equal than the configured limit.
### err-mimir-tenant-max-ingestion-rate
This error occurs when the rate of received samples, exemplars and metadata per second is exceeded for this tenant.
How it **works**:
- There is a per-tenant rate limit on the samples, exemplars and metadata that can be ingested per second, and it's applied across all distributors for this tenant.
- The limit is implemented using [token buckets](https://en.wikipedia.org/wiki/Token_bucket).
How to **fix** it:
- Increase the per-tenant limit by using the `-distributor.ingestion-rate-limit` (samples per second) and `-distributor.ingestion-burst-size` (number of samples) options (or `ingestion_rate` and `ingestion_burst_size` in the runtime configuration). The configurable burst represents how many samples, exemplars and metadata can temporarily exceed the limit, in case of short traffic peaks. The configured burst size must be greater or equal than the configured limit.
### err-mimir-too-many-ha-clusters
This error occurs when a distributor rejects a write request because the number of [high-availability (HA) clusters]({{< relref "../configuring/configuring-high-availability-deduplication.md" >}}) has hit the configured limit for this tenant.
How it **works**:
- The distributor implements an upper limit on the number of clusters that the HA tracker will keep track of for a single tenant.
- It is triggered when the write request would add a new cluster while the number the tenant currently has is already equal to the limit.
- To configure the limit, set the `-distributor.ha-tracker.max-clusters` option (or `ha_max_clusters` in the runtime configuration).
How to **fix** it:
- Increase the per-tenant limit by using the `-distributor.ha-tracker.max-clusters` option (or `ha_max_clusters` in the runtime configuration).
## Mimir routes by path
**Write path**:
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

now := mtime.Now()
if !d.requestRateLimiter.AllowN(now, userID, 1) {
validation.DiscardedRequests.WithLabelValues(validation.RateLimited, userID).Add(1)
validation.DiscardedRequests.WithLabelValues(validation.ReasonRateLimited, userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (%v) exceeded", d.requestRateLimiter.Limit(now, userID))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.requestRateLimiter.Limit(now, userID), d.requestRateLimiter.Burst(now, userID)).Error())
}

d.activeUsers.UpdateUserTimestamp(userID, now)
Expand Down Expand Up @@ -643,12 +643,12 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq
if errors.Is(err, replicasNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusAccepted, "%s", err)
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
}

if errors.Is(err, tooManyClustersError{}) {
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err)
validation.DiscardedSamples.WithLabelValues(validation.ReasonTooManyHAClusters, userID).Add(float64(numSamples))
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return nil, err
Expand Down Expand Up @@ -789,13 +789,13 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq

totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
validation.DiscardedSamples.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedSamples))
validation.DiscardedExemplars.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(validatedExemplars))
validation.DiscardedMetadata.WithLabelValues(validation.ReasonRateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(d.ingestionRateLimiter.Limit(now, userID), d.ingestionRateLimiter.Burst(now, userID), validatedSamples, validatedExemplars, len(validatedMetadata)).Error())
}

// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
Expand Down
18 changes: 9 additions & 9 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 25 samples and 5 metadata"),
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(20, 20, 25, 0, 5).Error()),
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (2) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 2).Error())},
},
},
"request limit is disabled when set to 0": {
Expand All @@ -452,7 +452,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (1) exceeded")},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(1, 3).Error())},
},
},
}
Expand Down Expand Up @@ -512,10 +512,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 2 samples and 1 metadata")},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 2, 0, 1).Error())},
{samples: 2, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 5, 0, 0, 1).Error())},
},
},
"for each distributor, set an ingestion burst limit.": {
Expand All @@ -525,10 +525,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
pushes: []testPush{
{samples: 10, expectedError: nil},
{samples: 5, expectedError: nil},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 5 samples and 1 metadata")},
{samples: 5, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 5, 0, 1).Error())},
{samples: 5, expectedError: nil},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 1, 0, 0).Error())},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewIngestionRateLimitedError(5, 20, 0, 0, 1).Error())},
},
},
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/distributor/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
"github.com/grafana/mimir/pkg/util/validation"
)

var (
Expand Down Expand Up @@ -528,7 +530,9 @@ type tooManyClustersError struct {
}

func (e tooManyClustersError) Error() string {
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
return globalerror.TooManyHAClusters.MessageWithLimitConfig(
validation.HATrackerMaxClustersFlag,
fmt.Sprintf("the write request has been rejected because the maximum number of high-availability (HA) clusters has been reached for this tenant (limit: %d)", e.limit))
}

// Needed for errors.Is to work properly.
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ func TestHAClustersLimit(t *testing.T) {
assert.NoError(t, t1.checkReplica(context.Background(), userID, "b", "b1", now))
waitForClustersUpdate(t, 2, t1, userID)

assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "c", "c1", now), tooManyClustersError{limit: 2}.Error())

// Move time forward, and make sure that checkReplica for existing cluster works fine.
now = now.Add(5 * time.Second) // higher than "update timeout"
Expand All @@ -627,7 +627,7 @@ func TestHAClustersLimit(t *testing.T) {
waitForClustersUpdate(t, 2, t1, userID)

// But yet another cluster doesn't.
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), "too many HA clusters (limit: 2)")
assert.EqualError(t, t1.checkReplica(context.Background(), userID, "a", "a2", now), tooManyClustersError{limit: 2}.Error())

now = now.Add(5 * time.Second)

Expand Down
23 changes: 22 additions & 1 deletion pkg/util/globalerror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package globalerror

import (
"fmt"
"strings"
)

type ID string
Expand Down Expand Up @@ -47,7 +48,10 @@ const (
MetricMetadataHelpTooLong ID = "help-too-long"
MetricMetadataUnitTooLong ID = "unit-too-long"

MaxQueryLength ID = "max-query-length"
MaxQueryLength ID = "max-query-length"
RequestRateLimited ID = "tenant-max-request-rate"
IngestionRateLimited ID = "tenant-max-ingestion-rate"
TooManyHAClusters ID = "too-many-ha-clusters"
)

// Message returns the provided msg, appending the error id.
Expand All @@ -60,3 +64,20 @@ func (id ID) Message(msg string) string {
func (id ID) MessageWithLimitConfig(flag, msg string) string {
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit by configuring -%s, or by contacting your service administrator.", msg, errPrefix, id, flag)
}

func (id ID) MessageWithLimitConfigs(msg, flag string, addFlags ...string) string {
var sb strings.Builder
sb.WriteString("-")
sb.WriteString(flag)
plural := ""
if len(addFlags) > 0 {
plural = "s"
for _, addFlag := range addFlags[:len(addFlags)-1] {
sb.WriteString(", -")
sb.WriteString(addFlag)
}
sb.WriteString(" and -")
sb.WriteString(addFlags[len(addFlags)-1])
}
return fmt.Sprintf("%s (%s%s). You can adjust the related per-tenant limit%s by configuring %s, or by contacting your service administrator.", msg, errPrefix, id, plural, sb.String())
}
22 changes: 22 additions & 0 deletions pkg/util/globalerror/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,25 @@ func TestID_MessageWithLimitConfig(t *testing.T) {
"an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limit by configuring -my-flag, or by contacting your service administrator.",
MissingMetricName.MessageWithLimitConfig("my-flag", "an error"))
}

func TestID_MessageWithLimitConfigs(t *testing.T) {
for _, tc := range []struct {
actual string
expected string
}{
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limit by configuring -my-flag1, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1"),
},
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limits by configuring -my-flag1 and -my-flag2, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1", "my-flag2"),
},
{
actual: "an error (err-mimir-missing-metric-name). You can adjust the related per-tenant limits by configuring -my-flag1, -my-flag2 and -my-flag3, or by contacting your service administrator.",
expected: MissingMetricName.MessageWithLimitConfigs("an error", "my-flag1", "my-flag2", "my-flag3"),
},
} {
assert.Equal(t, tc.actual, tc.expected)
}
}
12 changes: 12 additions & 0 deletions pkg/util/validation/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func NewMaxQueryLengthError(actualQueryLen, maxQueryLength time.Duration) LimitE
fmt.Sprintf("the query time range exceeds the limit (query length: %s, limit: %s)", actualQueryLen, maxQueryLength)))
}

func NewRequestRateLimitedError(limit float64, burst int) LimitError {
return LimitError(globalerror.RequestRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("the request has been rejected because the tenant exceeded the request rate limit, set to %v req/s with a maximum allowed burst of %d", limit, burst),
requestRateFlag, requestBurstSizeFlag))
}

func NewIngestionRateLimitedError(limit float64, burst, numSamples, numExemplars, numMetadata int) LimitError {
return LimitError(globalerror.IngestionRateLimited.MessageWithLimitConfigs(
fmt.Sprintf("the request has been rejected because the tenant exceeded the ingestion rate limit, set to %v items/s with a maximum allowed burst of %d, while adding %d samples, %d exemplars and %d metadata", limit, burst, numSamples, numExemplars, numMetadata),
ingestionRateFlag, ingestionBurstSizeFlag))
}

// formatLabelSet formats label adapters as a metric name with labels, while preserving
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
// first one is used as metric name, other ones will be included as regular labels.
Expand Down
Loading

0 comments on commit eb812fd

Please sign in to comment.