From d2367de16a5623e422f802b1e89826051e3cc6f2 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 18 Nov 2024 12:57:03 +1100 Subject: [PATCH] Don't hold labels from store-gateways in two forms, and don't convert them multiple times (#9914) * Don't hold labels from store-gateways in two forms * Don't retain labels longer than needed * Don't convert mimirpb.LabelAdaptors to labels.Labels multiple times * Add changelog entry --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 2 +- pkg/distributor/query.go | 10 +++++---- pkg/querier/block_streaming.go | 13 ++++++----- pkg/querier/block_streaming_test.go | 5 +---- pkg/querier/blocks_store_queryable.go | 30 +++++++++++++++----------- pkg/util/limiter/query_limiter.go | 6 +++--- pkg/util/limiter/query_limiter_test.go | 17 +++++++-------- 8 files changed, 46 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1389c5a2bf..c4adf5db7f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879 * [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826 * [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892 +* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914 * [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508 * [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508 * [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7ca90a4185e..fe475d5ed09 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -2557,7 +2557,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through result := make([]labels.Labels, 0, len(metrics)) for _, m := range metrics { - if err := queryLimiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(m)); err != nil { + if err := queryLimiter.AddSeries(m); err != nil { return nil, err } result = append(result, m) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 73610f6e089..ee381656840 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -268,7 +268,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ if len(resp.Timeseries) > 0 { for _, series := range resp.Timeseries { - if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { + if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil { return ingesterQueryResult{}, limitErr } } @@ -285,7 +285,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ } for _, series := range resp.Chunkseries { - if err := queryLimiter.AddSeries(series.Labels); err != nil { + if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil { return ingesterQueryResult{}, err } } @@ -300,7 +300,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ streamingSeriesCount += len(resp.StreamingSeries) for _, s := range resp.StreamingSeries { - if err := queryLimiter.AddSeries(s.Labels); err != nil { + l := mimirpb.FromLabelAdaptersToLabels(s.Labels) + + if err := queryLimiter.AddSeries(l); err != nil { return ingesterQueryResult{}, err } @@ -313,7 +315,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [ return ingesterQueryResult{}, err } - labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels)) + labelsBatch = append(labelsBatch, l) } streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch) diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 927c5b2d449..3d21084f9ff 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -17,7 +17,6 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" - "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/series" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" @@ -31,7 +30,7 @@ import ( // Implementation of storage.SeriesSet, based on individual responses from store client. type blockStreamingQuerierSeriesSet struct { - series []*storepb.StreamingSeries + series []labels.Labels streamReader chunkStreamReader // next response to process @@ -55,18 +54,22 @@ func (bqss *blockStreamingQuerierSeriesSet) Next() bool { return false } - currLabels := bqss.series[bqss.nextSeriesIndex].Labels + currLabels := bqss.series[bqss.nextSeriesIndex] seriesIdxStart := bqss.nextSeriesIndex // First series in this group. We might merge with more below. bqss.nextSeriesIndex++ // Chunks may come in multiple responses, but as soon as the response has chunks for a new series, // we can stop searching. Series are sorted. See documentation for StoreClient.Series call for details. // The actually merging of chunks happens in the Iterator() call where chunks are fetched. - for bqss.nextSeriesIndex < len(bqss.series) && mimirpb.CompareLabelAdapters(currLabels, bqss.series[bqss.nextSeriesIndex].Labels) == 0 { + for bqss.nextSeriesIndex < len(bqss.series) && labels.Equal(currLabels, bqss.series[bqss.nextSeriesIndex]) { bqss.nextSeriesIndex++ } - bqss.currSeries = newBlockStreamingQuerierSeries(mimirpb.FromLabelAdaptersToLabels(currLabels), seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress) + bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress) + + // Clear any labels we no longer need, to allow them to be garbage collected when they're no longer needed elsewhere. + clear(bqss.series[seriesIdxStart : bqss.nextSeriesIndex-1]) + return true } diff --git a/pkg/querier/block_streaming_test.go b/pkg/querier/block_streaming_test.go index 501fa8f7c43..48b62329aa9 100644 --- a/pkg/querier/block_streaming_test.go +++ b/pkg/querier/block_streaming_test.go @@ -19,7 +19,6 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc/metadata" - "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util/limiter" @@ -166,9 +165,7 @@ func TestBlockStreamingQuerierSeriesSet(t *testing.T) { t.Run(name, func(t *testing.T) { ss := &blockStreamingQuerierSeriesSet{streamReader: &mockChunkStreamer{series: c.input, causeError: c.errorChunkStreamer}} for _, s := range c.input { - ss.series = append(ss.series, &storepb.StreamingSeries{ - Labels: mimirpb.FromLabelsToLabelAdapters(s.lbls), - }) + ss.series = append(ss.series, s.lbls) } idx := 0 var it chunkenc.Iterator diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 9ce696b3553..a6f17182594 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -780,9 +780,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return err } - // A storegateway client will only fill either of mySeries or myStreamingSeries, and not both. + // A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both. mySeries := []*storepb.Series(nil) - myStreamingSeries := []*storepb.StreamingSeries(nil) + myStreamingSeriesLabels := []labels.Labels(nil) var myWarnings annotations.Annotations myQueriedBlocks := []ulid.ULID(nil) indexBytesFetched := uint64(0) @@ -813,7 +813,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor mySeries = append(mySeries, s) // Add series fingerprint to query limiter; will return error if we are over the limit - if err := queryLimiter.AddSeries(s.Labels); err != nil { + if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil { return err } @@ -853,16 +853,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor } if ss := resp.GetStreamingSeries(); ss != nil { + myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series)) + for _, s := range ss.Series { // Add series fingerprint to query limiter; will return error if we are over the limit - if limitErr := queryLimiter.AddSeries(s.Labels); limitErr != nil { + l := mimirpb.FromLabelAdaptersToLabels(s.Labels) + + if limitErr := queryLimiter.AddSeries(l); limitErr != nil { return limitErr } + + myStreamingSeriesLabels = append(myStreamingSeriesLabels, l) } - myStreamingSeries = append(myStreamingSeries, ss.Series...) + if ss.IsEndOfSeriesStream { // If we aren't expecting any series from this stream, close it now. - if len(myStreamingSeries) == 0 { + if len(myStreamingSeriesLabels) == 0 { util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck } @@ -904,13 +910,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor chunkInfo.EndSeries(i == len(mySeries)-1) } } - } else if len(myStreamingSeries) > 0 { + } else if len(myStreamingSeriesLabels) > 0 { // FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader. - reqStats.AddFetchedSeries(uint64(len(myStreamingSeries))) - streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger) + reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels))) + streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, reqStats, q.metrics, q.logger) level.Debug(log).Log("msg", "received streaming series from store-gateway", "instance", c.RemoteAddress(), - "fetched series", len(myStreamingSeries), + "fetched series", len(myStreamingSeriesLabels), "fetched index bytes", indexBytesFetched, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -925,12 +931,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor mtx.Lock() if len(mySeries) > 0 { seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries}) - } else if len(myStreamingSeries) > 0 { + } else if len(myStreamingSeriesLabels) > 0 { if chunkInfo != nil { chunkInfo.SetMsg("store-gateway streaming") } seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{ - series: myStreamingSeries, + series: myStreamingSeriesLabels, streamReader: streamReader, chunkInfo: chunkInfo, remoteAddress: c.RemoteAddress(), diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index fcd61b88869..0a7333f7721 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -9,9 +9,9 @@ import ( "context" "sync" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" - "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/util/validation" ) @@ -74,12 +74,12 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { } // AddSeries adds the input series and returns an error if the limit is reached. -func (ql *QueryLimiter) AddSeries(seriesLabels []mimirpb.LabelAdapter) validation.LimitError { +func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError { // If the max series is unlimited just return without managing map if ql.maxSeriesPerQuery == 0 { return nil } - fingerprint := mimirpb.FromLabelAdaptersToLabels(seriesLabels).Hash() + fingerprint := seriesLabels.Hash() ql.uniqueSeriesMx.Lock() defer ql.uniqueSeriesMx.Unlock() diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 0d8041e4e4d..73179122241 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" ) @@ -37,15 +36,15 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing reg = prometheus.NewPedanticRegistry() limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg)) ) - err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1)) + err := limiter.AddSeries(series1) assert.NoError(t, err) - err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2)) + err = limiter.AddSeries(series2) assert.NoError(t, err) assert.Equal(t, 2, limiter.uniqueSeriesCount()) assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0) // Re-add previous series to make sure it's not double counted - err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1)) + err = limiter.AddSeries(series1) assert.NoError(t, err) assert.Equal(t, 2, limiter.uniqueSeriesCount()) assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0) @@ -72,21 +71,21 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) { reg = prometheus.NewPedanticRegistry() limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg)) ) - err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1)) + err := limiter.AddSeries(series1) require.NoError(t, err) assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0) - err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2)) + err = limiter.AddSeries(series2) require.Error(t, err) assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0) // Add the same series again and ensure that we don't increment the failed queries metric again. - err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2)) + err = limiter.AddSeries(series2) require.Error(t, err) assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0) // Add another series and ensure that we don't increment the failed queries metric again. - err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series3)) + err = limiter.AddSeries(series3) require.Error(t, err) assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0) } @@ -188,7 +187,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) { reg := prometheus.NewPedanticRegistry() limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg)) for _, s := range series { - err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(s)) + err := limiter.AddSeries(s) assert.NoError(b, err) } }