Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max-chunks-bytes-per-query limiter #4216

Merged
merged 10 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- `-alertmanager.receivers-firewall.block.private-addresses` renamed to `-alertmanager.receivers-firewall-block-private-addresses`
* [CHANGE] Change default value of `-server.grpc.keepalive.min-time-between-pings` to `10s` and `-server.grpc.keepalive.ping-without-stream-allowed` to `true`. #4168
* [FEATURE] Querier: Added new `-querier.max-fetched-series-per-query` flag. When Cortex is running with blocks storage, the max series per query limit is enforced in the querier and applies to unique series received from ingesters and store-gateway (long-term storage). #4179
* [FEATURE] Querier: Added new `-querier.max-chunk-bytes-per-query` flag. When Cortex is running with blocks storage, the max chunk bytes limit is enforced in the querier and counts the size of chunks returned from ingesters and blocks storage as bytes. #4216
* [FEATURE] Alertmanager: Added rate-limits to notifiers. Rate limits used by all integrations can be configured using `-alertmanager.notification-rate-limit`, while per-integration rate limits can be specified via `-alertmanager.notification-rate-limit-per-integration` parameter. Both shared and per-integration limits can be overwritten using overrides mechanism. These limits are applied on individual (per-tenant) alertmanagers. Rate-limited notifications are failed notifications. It is possible to monitor rate-limited notifications via new `cortex_alertmanager_notification_rate_limited_total` metric. #4135 #4163
* [ENHANCEMENT] Alertmanager: introduced new metrics to monitor operation when using `-alertmanager.sharding-enabled`: #4149
* `cortex_alertmanager_state_fetch_replica_state_total`
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac

limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0))
// Prepare distributors.
ds, _, r, _ := prepare(t, prepConfig{
numIngesters: 3,
Expand Down
6 changes: 6 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,16 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
}
}

for _, series := range resp.Chunkseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, limitErr
}
for _, chunks := range series.Chunks {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
treid314 marked this conversation as resolved.
Show resolved Hide resolved
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunks.Size()); chunkBytesLimitErr != nil {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
return nil, chunkBytesLimitErr
}
}
}
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
var (
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
errMaxChunkBytesHit = "The query hit the max number of chunk bytes limit (limit: %d)"
treid314 marked this conversation as resolved.
Show resolved Hide resolved
)

// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
Expand Down Expand Up @@ -626,6 +627,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
}
}

for _, c := range s.Chunks{
treid314 marked this conversation as resolved.
Show resolved Hide resolved
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(c.Size()); chunkBytesLimitErr != nil {
return chunkBytesLimitErr
}
}
}

if w := resp.GetWarning(); w != "" {
Expand Down
22 changes: 20 additions & 2 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName}
series1Label = labels.Label{Name: "series", Value: "1"}
series2Label = labels.Label{Name: "series", Value: "2"}
noOpQueryLimiter = limiter.NewQueryLimiter(0)
noOpQueryLimiter = limiter.NewQueryLimiter(0, 0)
)

type valueResult struct {
Expand Down Expand Up @@ -507,9 +507,27 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: limiter.NewQueryLimiter(1),
queryLimiter: limiter.NewQueryLimiter(1, 0),
expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit (limit: %d)", 1)),
},
"max chunk bytes per query limit hit while fetching chunks": {
finderResult: bucketindex.Blocks{
{ID: block1},
{ID: block2},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1),
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2),
mockHintsResponse(block1, block2),
}}: {block1, block2},
},
},
limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1},
queryLimiter: limiter.NewQueryLimiter(0, 8),
expectedErr: validation.LimitError(fmt.Sprintf(errMaxChunkBytesHit, 8)),
},
}

for testName, testData := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
return nil, err
}

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID)))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxChunkBytesPerQuery(userID)))

mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture)
if err == errEmptyTimeRange {
Expand Down
30 changes: 24 additions & 6 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -15,25 +16,32 @@ import (
type queryLimiterCtxKey struct{}

var (
ctxKey = &queryLimiterCtxKey{}
errMaxSeriesHit = "The query hit the max number of series limit (limit: %d)"
ctxKey = &queryLimiterCtxKey{}
errMaxSeriesHit = "The query hit the max number of series limit (limit: %d)"
errMaxChunkBytesHit = "The query hit the max number of chunk bytes limit (limit: %d)"
treid314 marked this conversation as resolved.
Show resolved Hide resolved
)

type QueryLimiter struct {
uniqueSeriesMx sync.Mutex
uniqueSeries map[model.Fingerprint]struct{}

maxSeriesPerQuery int
chunkBytesCount *atomic.Int32

maxSeriesPerQuery int
maxChunkBytesPerQuery int
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limits us to 2GB (2^31 -1 bytes) per query, is it worth making this an unsigned int which is about 4GB (2^32 bytes) per query or a 64 bit number?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int64 please. 4GB is not that much. We may have use cases setting higher limits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On 64-bit systems, int is 64-bit, so this is fine. Note that Cortex officially doesn't support 32-bit systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be explicit like we do everywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also pass in an int64 at the config/limit.go level? Or is leaving NewQueryLimiter(int, int) and casting the maxChunkBytes value to an int64 ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be explicit like we do everywhere else.

I don't think we're explicit "everywhere else". I think it would make sense to use int here simply because we cannot fit more than max of int into memory anyway (applies for both 32-bit and 64-bit platforms).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your question Tyler, if you go with int64 route, you will need to "extend" that everywhere to avoid losing precision somewhere (ie. in NewQueryLimiter too)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let's not block on this and keep int.

}

// NewQueryLimiter makes a new per-query limiter. Each query limiter
// is configured using the `maxSeriesPerQuery` limit.
func NewQueryLimiter(maxSeriesPerQuery int) *QueryLimiter {
func NewQueryLimiter(maxSeriesPerQuery int, maxChunkBytesPerQuery int) *QueryLimiter {
treid314 marked this conversation as resolved.
Show resolved Hide resolved
return &QueryLimiter{
uniqueSeriesMx: sync.Mutex{},
uniqueSeries: map[model.Fingerprint]struct{}{},

maxSeriesPerQuery: maxSeriesPerQuery,
chunkBytesCount: atomic.NewInt32(0),

maxSeriesPerQuery: maxSeriesPerQuery,
maxChunkBytesPerQuery: maxChunkBytesPerQuery,
}
}

Expand All @@ -47,7 +55,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
ql, ok := ctx.Value(ctxKey).(*QueryLimiter)
if !ok {
// If there's no limiter return a new unlimited limiter as a fallback
ql = NewQueryLimiter(0)
ql = NewQueryLimiter(0, 0)
}
return ql
}
Expand Down Expand Up @@ -77,3 +85,13 @@ func (ql *QueryLimiter) uniqueSeriesCount() int {
defer ql.uniqueSeriesMx.Unlock()
return len(ql.uniqueSeries)
}

func (ql *QueryLimiter) AddChunkBytes(bytes int) error {
if ql.maxChunkBytesPerQuery == 0 {
return nil
}
if ql.chunkBytesCount.Add(int32(bytes)) > int32(ql.maxChunkBytesPerQuery) {
return validation.LimitError(fmt.Sprintf(errMaxChunkBytesHit, ql.maxChunkBytesPerQuery))
}
return nil
}
15 changes: 12 additions & 3 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(100)
limiter = NewQueryLimiter(100, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
assert.NoError(t, err)
Expand Down Expand Up @@ -53,14 +53,23 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T)
labels.MetricName: metricName + "_2",
"series2": "1",
})
limiter = NewQueryLimiter(1)
limiter = NewQueryLimiter(1, 0)
)
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1))
require.NoError(t, err)
err = limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series2))
require.Error(t, err)
}

func TestQueryLimiter_AddChunkBytes(t *testing.T) {
var limiter = NewQueryLimiter(0, 100)

err := limiter.AddChunkBytes(100)
require.NoError(t, err)
err = limiter.AddChunkBytes(1)
require.Error(t, err)
}

func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
const (
metricName = "test_metric"
Expand All @@ -75,7 +84,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
}
b.ResetTimer()

limiter := NewQueryLimiter(b.N + 1)
limiter := NewQueryLimiter(b.N+1, 0)
for _, s := range series {
err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s))
assert.NoError(b, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Limits struct {
MaxChunksPerQueryFromStore int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` // TODO Remove in Cortex 1.12.
MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"`
MaxFetchedSeriesPerQuery int `yaml:"max_fetched_series_per_query" json:"max_fetched_series_per_query"`
MaxChunkBytesPerQuery int `yaml:"max_chunk_bytes_per_query" json:"max_chunk_bytes_per_query"`
MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"`
MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
Expand Down Expand Up @@ -147,6 +148,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxChunksPerQueryFromStore, "store.query-chunk-limit", 2e6, "Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its respective YAML config option instead. Maximum number of chunks that can be fetched in a single query. This limit is enforced when fetching chunks from the long-term storage only. When running the Cortex chunks storage, this limit is enforced in the querier and ruler, while when running the Cortex blocks storage this limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 0, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. This limit is enforced in the ingester (if chunks streaming is enabled), querier, ruler and store-gateway. Takes precedence over the deprecated -store.query-chunk-limit. 0 to disable.")
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable")
f.IntVar(&l.MaxChunkBytesPerQuery, "querier.max-chunk bytes-per-query", 0, "The maximum number of chunk bytes for which a query can fetch from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable")
treid314 marked this conversation as resolved.
Show resolved Hide resolved
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query), in the querier (on the query possibly split by the query-frontend) and in the chunks storage. 0 to disable.")
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.")
Expand Down Expand Up @@ -394,6 +396,12 @@ func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxFetchedSeriesPerQuery
}

// MaxChunkBytesPerQuery returns the maximum number of bytes for chunks allowed per query when fetching
// chunks from ingesters and blocks storage.
func (o *Overrides) MaxChunkBytesPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxChunkBytesPerQuery
}

// MaxQueryLookback returns the max lookback period of queries.
func (o *Overrides) MaxQueryLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback)
Expand Down