Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't hold labels from store-gateways in two forms, and don't convert them multiple times #9914

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [ENHANCEMENT] PromQL: make `sort_by_label` stable. #9879
* [ENHANCEMENT] Distributor: Initialize ha_tracker cache before ha_tracker and distributor reach running state and begin serving writes. #9826
* [ENHANCEMENT] Ingester: `-ingest-storage.kafka.max-buffered-bytes` to limit the memory for buffered records when using concurrent fetching. #9892
* [ENHANCEMENT] Querier: improve performance and memory consumption of queries that select many series. #9914
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2557,7 +2557,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

result := make([]labels.Labels, 0, len(metrics))
for _, m := range metrics {
if err := queryLimiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(m)); err != nil {
if err := queryLimiter.AddSeries(m); err != nil {
return nil, err
}
result = append(result, m)
Expand Down
10 changes: 6 additions & 4 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return ingesterQueryResult{}, limitErr
}
}
Expand All @@ -285,7 +285,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(series.Labels); err != nil {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return ingesterQueryResult{}, err
}
}
Expand All @@ -300,7 +300,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
if err := queryLimiter.AddSeries(s.Labels); err != nil {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)

if err := queryLimiter.AddSeries(l); err != nil {
return ingesterQueryResult{}, err
}

Expand All @@ -313,7 +315,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return ingesterQueryResult{}, err
}

labelsBatch = append(labelsBatch, mimirpb.FromLabelAdaptersToLabels(s.Labels))
labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
Expand Down
13 changes: 8 additions & 5 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
Expand All @@ -31,7 +30,7 @@ import (

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockStreamingQuerierSeriesSet struct {
series []*storepb.StreamingSeries
series []labels.Labels
streamReader chunkStreamReader

// next response to process
Expand All @@ -55,18 +54,22 @@ func (bqss *blockStreamingQuerierSeriesSet) Next() bool {
return false
}

currLabels := bqss.series[bqss.nextSeriesIndex].Labels
currLabels := bqss.series[bqss.nextSeriesIndex]
seriesIdxStart := bqss.nextSeriesIndex // First series in this group. We might merge with more below.
bqss.nextSeriesIndex++

// Chunks may come in multiple responses, but as soon as the response has chunks for a new series,
// we can stop searching. Series are sorted. See documentation for StoreClient.Series call for details.
// The actually merging of chunks happens in the Iterator() call where chunks are fetched.
for bqss.nextSeriesIndex < len(bqss.series) && mimirpb.CompareLabelAdapters(currLabels, bqss.series[bqss.nextSeriesIndex].Labels) == 0 {
for bqss.nextSeriesIndex < len(bqss.series) && labels.Equal(currLabels, bqss.series[bqss.nextSeriesIndex]) {
bqss.nextSeriesIndex++
}

bqss.currSeries = newBlockStreamingQuerierSeries(mimirpb.FromLabelAdaptersToLabels(currLabels), seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)
bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, bqss.nextSeriesIndex-1, bqss.streamReader, bqss.chunkInfo, bqss.nextSeriesIndex >= len(bqss.series), bqss.remoteAddress)

// Clear any labels we no longer need, to allow them to be garbage collected when they're no longer needed elsewhere.
clear(bqss.series[seriesIdxStart : bqss.nextSeriesIndex-1])

return true
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/block_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc/metadata"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util/limiter"
Expand Down Expand Up @@ -166,9 +165,7 @@ func TestBlockStreamingQuerierSeriesSet(t *testing.T) {
t.Run(name, func(t *testing.T) {
ss := &blockStreamingQuerierSeriesSet{streamReader: &mockChunkStreamer{series: c.input, causeError: c.errorChunkStreamer}}
for _, s := range c.input {
ss.series = append(ss.series, &storepb.StreamingSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(s.lbls),
})
ss.series = append(ss.series, s.lbls)
}
idx := 0
var it chunkenc.Iterator
Expand Down
30 changes: 18 additions & 12 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return err
}

// A storegateway client will only fill either of mySeries or myStreamingSeries, and not both.
// A storegateway client will only fill either of mySeries or myStreamingSeriesLabels, and not both.
mySeries := []*storepb.Series(nil)
myStreamingSeries := []*storepb.StreamingSeries(nil)
myStreamingSeriesLabels := []labels.Labels(nil)
var myWarnings annotations.Annotations
myQueriedBlocks := []ulid.ULID(nil)
indexBytesFetched := uint64(0)
Expand Down Expand Up @@ -813,7 +813,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
mySeries = append(mySeries, s)

// Add series fingerprint to query limiter; will return error if we are over the limit
if err := queryLimiter.AddSeries(s.Labels); err != nil {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil {
return err
}

Expand Down Expand Up @@ -853,16 +853,22 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
}

if ss := resp.GetStreamingSeries(); ss != nil {
myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series))

for _, s := range ss.Series {
// Add series fingerprint to query limiter; will return error if we are over the limit
if limitErr := queryLimiter.AddSeries(s.Labels); limitErr != nil {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)

if limitErr := queryLimiter.AddSeries(l); limitErr != nil {
return limitErr
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, l)
}
myStreamingSeries = append(myStreamingSeries, ss.Series...)

if ss.IsEndOfSeriesStream {
// If we aren't expecting any series from this stream, close it now.
if len(myStreamingSeries) == 0 {
if len(myStreamingSeriesLabels) == 0 {
util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck
}

Expand Down Expand Up @@ -904,13 +910,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
chunkInfo.EndSeries(i == len(mySeries)-1)
}
}
} else if len(myStreamingSeries) > 0 {
} else if len(myStreamingSeriesLabels) > 0 {
// FetchedChunks and FetchedChunkBytes are added by the SeriesChunksStreamReader.
reqStats.AddFetchedSeries(uint64(len(myStreamingSeries)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeries), queryLimiter, reqStats, q.metrics, q.logger)
reqStats.AddFetchedSeries(uint64(len(myStreamingSeriesLabels)))
streamReader = newStoreGatewayStreamReader(reqCtx, stream, len(myStreamingSeriesLabels), queryLimiter, reqStats, q.metrics, q.logger)
level.Debug(log).Log("msg", "received streaming series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", len(myStreamingSeries),
"fetched series", len(myStreamingSeriesLabels),
"fetched index bytes", indexBytesFetched,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))
Expand All @@ -925,12 +931,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
mtx.Lock()
if len(mySeries) > 0 {
seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries})
} else if len(myStreamingSeries) > 0 {
} else if len(myStreamingSeriesLabels) > 0 {
if chunkInfo != nil {
chunkInfo.SetMsg("store-gateway streaming")
}
seriesSets = append(seriesSets, &blockStreamingQuerierSeriesSet{
series: myStreamingSeries,
series: myStreamingSeriesLabels,
streamReader: streamReader,
chunkInfo: chunkInfo,
remoteAddress: c.RemoteAddress(),
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"context"
"sync"

"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util/validation"
)
Expand Down Expand Up @@ -74,12 +74,12 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
}

// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels []mimirpb.LabelAdapter) validation.LimitError {
func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError {
// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
}
fingerprint := mimirpb.FromLabelAdaptersToLabels(seriesLabels).Hash()
fingerprint := seriesLabels.Hash()

ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()
Expand Down
17 changes: 8 additions & 9 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
)

Expand All @@ -37,15 +36,15 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err := limiter.AddSeries(series1)
assert.NoError(t, err)
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
assert.NoError(t, err)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

// Re-add previous series to make sure it's not double counted
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err = limiter.AddSeries(series1)
assert.NoError(t, err)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
Expand All @@ -72,21 +71,21 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series1))
err := limiter.AddSeries(series1)
require.NoError(t, err)
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add the same series again and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series2))
err = limiter.AddSeries(series2)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add another series and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(series3))
err = limiter.AddSeries(series3)
require.Error(t, err)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
}
Expand Down Expand Up @@ -188,7 +187,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg))
for _, s := range series {
err := limiter.AddSeries(mimirpb.FromLabelsToLabelAdapters(s))
err := limiter.AddSeries(s)
assert.NoError(b, err)
}
}
Expand Down
Loading