From c3e8ed9aedf94819f00064492048ce8031fb5d48 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 10 Dec 2023 05:20:28 +0000 Subject: [PATCH 1/5] add series limit that is applied when streaming using block series client Signed-off-by: Ben Ye --- cmd/thanos/store.go | 5 +++++ pkg/store/bucket.go | 41 ++++++++++++++++++++++++++++++++++------ pkg/store/bucket_test.go | 1 + pkg/store/limiter.go | 2 +- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 9191b77ce8..56e178f614 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -89,6 +89,7 @@ type storeConfig struct { lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration lazyExpandedPostingsEnabled bool + streamingSeriesLimit uint64 } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -186,6 +187,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) + cmd.Flag("store.streaming-series-limit", "The maximum series allowed to match when streaming series. The Series/LabelNames/LabelValues call fails if this limit is exceeded. 0 means no limit."). + Hidden().Default("0").Uint64Var(&sc.streamingSeriesLimit) + cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -388,6 +392,7 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), + store.WithStreamingSeriesLimiterFactory(store.NewSeriesLimiterFactory(conf.streamingSeriesLimit)), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index da81fab93a..d8c0c5590d 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -390,6 +390,9 @@ type BucketStore struct { // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call, // or LabelName and LabelValues calls when used with matchers. seriesLimiterFactory SeriesLimiterFactory + // streamingSeriesLimiterFactory creates a series limiter but applies series limit for actual matched series + // rather than touched series when stremaing series. Can be useful for vertical sharding or lazy postings scenario. + streamingSeriesLimiterFactory SeriesLimiterFactory // bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call. bytesLimiterFactory BytesLimiterFactory @@ -531,6 +534,14 @@ func WithDontResort(true bool) BucketStoreOption { } } +func WithStreamingSeriesLimiterFactory(factory SeriesLimiterFactory) BucketStoreOption { + return func(s *BucketStore) { + if true { + s.streamingSeriesLimiterFactory = factory + } + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -952,8 +963,10 @@ type blockSeriesClient struct { indexr *bucketIndexReader chunkr *bucketChunkReader loadAggregates []storepb.Aggr - chunksLimiter ChunksLimiter - bytesLimiter BytesLimiter + + streamingSeriesLimiter SeriesLimiter + chunksLimiter ChunksLimiter + bytesLimiter BytesLimiter lazyExpandedPostingEnabled bool lazyExpandedPostingsCount prometheus.Counter @@ -986,7 +999,8 @@ func newBlockSeriesClient( logger log.Logger, b *bucketBlock, req *storepb.SeriesRequest, - limiter ChunksLimiter, + streamingSeriesLimiter SeriesLimiter, + chunksLimiter ChunksLimiter, bytesLimiter BytesLimiter, blockMatchers []*labels.Matcher, shardMatcher *storepb.ShardMatcher, @@ -1022,7 +1036,8 @@ func newBlockSeriesClient( maxt: req.MaxTime, indexr: b.indexReader(), chunkr: chunkr, - chunksLimiter: limiter, + streamingSeriesLimiter: streamingSeriesLimiter, + chunksLimiter: chunksLimiter, bytesLimiter: bytesLimiter, skipChunks: req.SkipChunks, seriesFetchDurationSum: seriesFetchDurationSum, @@ -1169,6 +1184,7 @@ func (b *blockSeriesClient) nextBatch(tenant string) error { return errors.Wrap(err, "preload series") } + seriesMatched := 0 b.entries = b.entries[:0] OUTER: for i := 0; i < len(postingsBatch); i++ { @@ -1209,6 +1225,7 @@ OUTER: continue } + seriesMatched++ s := seriesEntry{lset: completeLabelset} if b.skipChunks { b.entries = append(b.entries, s) @@ -1238,6 +1255,11 @@ OUTER: b.entries = append(b.entries, s) } + // Apply series limit before fetching chunks, for actual series matched. + if err := b.streamingSeriesLimiter.Reserve(uint64(seriesMatched)); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + } + if !b.skipChunks { if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil { return errors.Wrap(err, "load chunks") @@ -1405,8 +1427,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false ) @@ -1464,6 +1488,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.logger, blk, req, + streamingSeriesLimiter, chunksLimiter, bytesLimiter, sortedBlockMatchers, @@ -1701,6 +1726,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { @@ -1764,6 +1790,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq s.logger, b, seriesReq, + streamingSeriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, @@ -1901,6 +1928,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { @@ -1967,6 +1995,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR s.logger, b, seriesReq, + streamingSeriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 829a842ee5..e866735d30 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2777,6 +2777,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet nil, blk, req, + seriesLimiter, chunksLimiter, NewBytesLimiterFactory(0)(nil), matchers, diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index f564e11443..cc92b28eb1 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -107,7 +107,7 @@ type SeriesSelectLimits struct { } func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) { - cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) + cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request to touch. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest) } From 290800be29888e7f64217a16727d718f7b4a2a96 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 10 Dec 2023 05:50:48 +0000 Subject: [PATCH 2/5] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/store/limiter.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 10eb9b8435..e0b3e2ff1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram. - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. +- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Added `--store.streaming-series-limit` to apply series limit when streaming series to apply for series actually matched. ### Changed diff --git a/pkg/store/limiter.go b/pkg/store/limiter.go index cc92b28eb1..f564e11443 100644 --- a/pkg/store/limiter.go +++ b/pkg/store/limiter.go @@ -107,7 +107,7 @@ type SeriesSelectLimits struct { } func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) { - cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request to touch. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) + cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest) cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest) } From e693f7c05b8293a77fec3e8f1b574054734a5dda Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 11 Dec 2023 08:02:01 +0000 Subject: [PATCH 3/5] add unit tests Signed-off-by: Ben Ye --- pkg/store/bucket.go | 2 + pkg/store/bucket_test.go | 152 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d8c0c5590d..a052db3f47 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -587,6 +587,8 @@ func NewBucketStore( sortingStrategy: sortingStrategyStore, } + streamingSeriesLimiter := NewLimiter(0, prometheus.NewCounter(prometheus.CounterOpts{})) + s.streamingSeriesLimiterFactory = func(failedCounter prometheus.Counter) SeriesLimiter { return streamingSeriesLimiter } for _, option := range options { option(s) } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e866735d30..c1f7002e8f 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3650,3 +3650,155 @@ func TestQueryStatsMerge(t *testing.T) { s.merge(o) testutil.Equals(t, e, s) } + +func TestBucketStoreStreamingSeriesLimit(t *testing.T) { + logger := log.NewNopLogger() + tmpDir := t.TempDir() + bktDir := filepath.Join(tmpDir, "bkt") + auxDir := filepath.Join(tmpDir, "aux") + metaDir := filepath.Join(tmpDir, "meta") + extLset := labels.FromStrings("region", "eu-west") + + testutil.Ok(t, os.MkdirAll(metaDir, os.ModePerm)) + testutil.Ok(t, os.MkdirAll(auxDir, os.ModePerm)) + + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bkt.Close()) }) + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = tmpDir + headOpts.ChunkRange = 1000 + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, h.Close()) }) + + app := h.Appender(context.Background()) + _, err = app.Append(0, labels.FromStrings("z", "1"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("z", "2"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("z", "3"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("z", "4"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("z", "5"), 0, 1) + testutil.Ok(t, err) + _, err = app.Append(0, labels.FromStrings("z", "6"), 0, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + id := createBlockFromHead(t, auxDir, h) + + auxBlockDir := filepath.Join(auxDir, id.String()) + _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ + Labels: extLset.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, nil) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + + chunkPool, err := NewDefaultChunkBytesPool(2e5) + testutil.Ok(t, err) + + insBkt := objstore.WithNoopInstr(bkt) + baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt) + metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + }) + testutil.Ok(t, err) + + bucketStore, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(10e6), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + ) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) }) + + testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) + + // Vertical sharding enabled and only 4 out of 6 series will be matched. + req := &storepb.SeriesRequest{ + MinTime: timestamp.FromTime(minTime), + MaxTime: timestamp.FromTime(maxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NEQ, Name: "z", Value: ""}, + }, + ShardInfo: &storepb.ShardInfo{ + TotalShards: 3, + ShardIndex: 2, + By: true, + Labels: []string{"z"}, + }, + } + srv := newStoreSeriesServer(context.Background()) + testutil.Ok(t, bucketStore.Series(req, srv)) + testutil.Equals(t, 4, len(srv.SeriesSet)) + + // Create another bucket store with a smaller series limiter. 5 < 6 so it hits series limit. + bucketStore2, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(5), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + ) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bucketStore2.Close()) }) + + testutil.Ok(t, bucketStore2.SyncBlocks(context.Background())) + srv2 := newStoreSeriesServer(context.Background()) + testutil.NotOk(t, bucketStore2.Series(req, srv2)) + + // Create another bucket store with a streaming series limiter set to 5. + // 5 is larger than the actual series matched number 4 so it won't hit series limit. + bucketStore3, err := NewBucketStore( + objstore.WithNoopInstr(bkt), + metaFetcher, + "", + NewChunksLimiterFactory(10e6), + NewSeriesLimiterFactory(0), + NewBytesLimiterFactory(10e6), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 20, + true, + DefaultPostingOffsetInMemorySampling, + false, + false, + 1*time.Minute, + WithChunkPool(chunkPool), + WithFilterConfig(allowAllFilterConf), + WithStreamingSeriesLimiterFactory(NewSeriesLimiterFactory(5)), + ) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bucketStore3.Close()) }) + + testutil.Ok(t, bucketStore3.SyncBlocks(context.Background())) + srv3 := newStoreSeriesServer(context.Background()) + testutil.Ok(t, bucketStore3.Series(req, srv3)) + testutil.Equals(t, 4, len(srv3.SeriesSet)) +} From 59c3cd08d7a559cf8e5f5ac8ae4adac24161ce83 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 12 Dec 2023 09:14:52 +0000 Subject: [PATCH 4/5] address comments Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- cmd/thanos/store.go | 5 --- pkg/store/bucket.go | 63 ++++++++++++----------------- pkg/store/bucket_test.go | 85 ++++++++-------------------------------- 4 files changed, 42 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0b3e2ff1d..2f6a09133a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6925](https://github.com/thanos-io/thanos/pull/6925) Store Gateway: Support float native histogram. - [#6954](https://github.com/thanos-io/thanos/pull/6954) Index Cache: Support tracing for fetch APIs. - [#6943](https://github.com/thanos-io/thanos/pull/6943) Ruler: Added `keep_firing_for` field in alerting rule. -- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Added `--store.streaming-series-limit` to apply series limit when streaming series to apply for series actually matched. +- [#6972](https://github.com/thanos-io/thanos/pull/6972) Store Gateway: Apply series limit when streaming series for series actually matched if lazy postings is enabled. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 56e178f614..9191b77ce8 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -89,7 +89,6 @@ type storeConfig struct { lazyIndexReaderEnabled bool lazyIndexReaderIdleTimeout time.Duration lazyExpandedPostingsEnabled bool - streamingSeriesLimit uint64 } func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -187,9 +186,6 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("store.enable-lazy-expanded-postings", "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings."). Default("false").BoolVar(&sc.lazyExpandedPostingsEnabled) - cmd.Flag("store.streaming-series-limit", "The maximum series allowed to match when streaming series. The Series/LabelNames/LabelValues call fails if this limit is exceeded. 0 means no limit."). - Hidden().Default("0").Uint64Var(&sc.streamingSeriesLimit) - cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb) cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path."). @@ -392,7 +388,6 @@ func runStore( return conf.estimatedMaxChunkSize }), store.WithLazyExpandedPostings(conf.lazyExpandedPostingsEnabled), - store.WithStreamingSeriesLimiterFactory(store.NewSeriesLimiterFactory(conf.streamingSeriesLimit)), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index a052db3f47..3ebd6f06a4 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -390,9 +390,6 @@ type BucketStore struct { // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call, // or LabelName and LabelValues calls when used with matchers. seriesLimiterFactory SeriesLimiterFactory - // streamingSeriesLimiterFactory creates a series limiter but applies series limit for actual matched series - // rather than touched series when stremaing series. Can be useful for vertical sharding or lazy postings scenario. - streamingSeriesLimiterFactory SeriesLimiterFactory // bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call. bytesLimiterFactory BytesLimiterFactory @@ -534,14 +531,6 @@ func WithDontResort(true bool) BucketStoreOption { } } -func WithStreamingSeriesLimiterFactory(factory SeriesLimiterFactory) BucketStoreOption { - return func(s *BucketStore) { - if true { - s.streamingSeriesLimiterFactory = factory - } - } -} - // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -587,8 +576,6 @@ func NewBucketStore( sortingStrategy: sortingStrategyStore, } - streamingSeriesLimiter := NewLimiter(0, prometheus.NewCounter(prometheus.CounterOpts{})) - s.streamingSeriesLimiterFactory = func(failedCounter prometheus.Counter) SeriesLimiter { return streamingSeriesLimiter } for _, option := range options { option(s) } @@ -966,9 +953,9 @@ type blockSeriesClient struct { chunkr *bucketChunkReader loadAggregates []storepb.Aggr - streamingSeriesLimiter SeriesLimiter - chunksLimiter ChunksLimiter - bytesLimiter BytesLimiter + seriesLimiter SeriesLimiter + chunksLimiter ChunksLimiter + bytesLimiter BytesLimiter lazyExpandedPostingEnabled bool lazyExpandedPostingsCount prometheus.Counter @@ -1001,7 +988,7 @@ func newBlockSeriesClient( logger log.Logger, b *bucketBlock, req *storepb.SeriesRequest, - streamingSeriesLimiter SeriesLimiter, + seriesLimiter SeriesLimiter, chunksLimiter ChunksLimiter, bytesLimiter BytesLimiter, blockMatchers []*labels.Matcher, @@ -1038,7 +1025,7 @@ func newBlockSeriesClient( maxt: req.MaxTime, indexr: b.indexReader(), chunkr: chunkr, - streamingSeriesLimiter: streamingSeriesLimiter, + seriesLimiter: seriesLimiter, chunksLimiter: chunksLimiter, bytesLimiter: bytesLimiter, skipChunks: req.SkipChunks, @@ -1108,20 +1095,21 @@ func (b *blockSeriesClient) ExpandPostings( } b.lazyPostings = ps - // If lazy expanded posting enabled, it is possible to fetch more series - // so easier to hit the series limit. - if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) - } - - if b.batchSize > len(ps.postings) { - b.batchSize = len(ps.postings) - } if b.lazyPostings.lazyExpanded() { // Assume lazy expansion could cut actual expanded postings length to 50%. b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2) b.lazyExpandedPostingsCount.Inc() + } else { + // Apply series limiter eargerly if lazy postings not enabled. + if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + } + } + + if b.batchSize > len(ps.postings) { + b.batchSize = len(ps.postings) } + b.entries = make([]seriesEntry, 0, b.batchSize) return nil } @@ -1257,9 +1245,11 @@ OUTER: b.entries = append(b.entries, s) } - // Apply series limit before fetching chunks, for actual series matched. - if err := b.streamingSeriesLimiter.Reserve(uint64(seriesMatched)); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + if b.lazyPostings.lazyExpanded() { + // Apply series limit before fetching chunks, for actual series matched. + if err := b.seriesLimiter.Reserve(uint64(seriesMatched)); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err) + } } if !b.skipChunks { @@ -1430,9 +1420,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store resHints = &hintspb.SeriesResponseHints{} reqBlockMatchers []*labels.Matcher - chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) - seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) - streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks", tenant)) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) queryStatsEnabled = false ) @@ -1490,7 +1479,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store s.logger, blk, req, - streamingSeriesLimiter, + seriesLimiter, chunksLimiter, bytesLimiter, sortedBlockMatchers, @@ -1728,7 +1717,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) - var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { @@ -1792,7 +1780,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq s.logger, b, seriesReq, - streamingSeriesLimiter, + seriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, @@ -1930,7 +1918,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR var mtx sync.Mutex var sets [][]string var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) - var streamingSeriesLimiter = s.streamingSeriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant)) var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant)) for _, b := range s.blocks { @@ -1997,7 +1984,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR s.logger, b, seriesReq, - streamingSeriesLimiter, + seriesLimiter, nil, bytesLimiter, reqSeriesMatchersNoExtLabels, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c1f7002e8f..e80bc2ada9 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3674,17 +3674,17 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { t.Cleanup(func() { testutil.Ok(t, h.Close()) }) app := h.Appender(context.Background()) - _, err = app.Append(0, labels.FromStrings("z", "1"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "1"), 0, 1) testutil.Ok(t, err) - _, err = app.Append(0, labels.FromStrings("z", "2"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "2"), 0, 1) testutil.Ok(t, err) - _, err = app.Append(0, labels.FromStrings("z", "3"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "3"), 0, 1) testutil.Ok(t, err) - _, err = app.Append(0, labels.FromStrings("z", "4"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "4"), 0, 1) testutil.Ok(t, err) - _, err = app.Append(0, labels.FromStrings("z", "5"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "5"), 0, 1) testutil.Ok(t, err) - _, err = app.Append(0, labels.FromStrings("z", "6"), 0, 1) + _, err = app.Append(0, labels.FromStrings("a", "1", "z", "6"), 0, 1) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -3709,12 +3709,14 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { }) testutil.Ok(t, err) + // Set series limit to 1. Only pass if series limiter applies + // for lazy postings only. bucketStore, err := NewBucketStore( objstore.WithNoopInstr(bkt), metaFetcher, "", NewChunksLimiterFactory(10e6), - NewSeriesLimiterFactory(10e6), + NewSeriesLimiterFactory(2), NewBytesLimiterFactory(10e6), NewGapBasedPartitioner(PartitionerMaxGapSize), 20, @@ -3725,80 +3727,25 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { 1*time.Minute, WithChunkPool(chunkPool), WithFilterConfig(allowAllFilterConf), + WithLazyExpandedPostings(true), + WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 { + return 1 + }), ) testutil.Ok(t, err) t.Cleanup(func() { testutil.Ok(t, bucketStore.Close()) }) testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) - // Vertical sharding enabled and only 4 out of 6 series will be matched. req := &storepb.SeriesRequest{ MinTime: timestamp.FromTime(minTime), MaxTime: timestamp.FromTime(maxTime), Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_NEQ, Name: "z", Value: ""}, - }, - ShardInfo: &storepb.ShardInfo{ - TotalShards: 3, - ShardIndex: 2, - By: true, - Labels: []string{"z"}, + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + {Type: storepb.LabelMatcher_RE, Name: "z", Value: "1|2"}, }, } srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(req, srv)) - testutil.Equals(t, 4, len(srv.SeriesSet)) - - // Create another bucket store with a smaller series limiter. 5 < 6 so it hits series limit. - bucketStore2, err := NewBucketStore( - objstore.WithNoopInstr(bkt), - metaFetcher, - "", - NewChunksLimiterFactory(10e6), - NewSeriesLimiterFactory(5), - NewBytesLimiterFactory(10e6), - NewGapBasedPartitioner(PartitionerMaxGapSize), - 20, - true, - DefaultPostingOffsetInMemorySampling, - false, - false, - 1*time.Minute, - WithChunkPool(chunkPool), - WithFilterConfig(allowAllFilterConf), - ) - testutil.Ok(t, err) - t.Cleanup(func() { testutil.Ok(t, bucketStore2.Close()) }) - - testutil.Ok(t, bucketStore2.SyncBlocks(context.Background())) - srv2 := newStoreSeriesServer(context.Background()) - testutil.NotOk(t, bucketStore2.Series(req, srv2)) - - // Create another bucket store with a streaming series limiter set to 5. - // 5 is larger than the actual series matched number 4 so it won't hit series limit. - bucketStore3, err := NewBucketStore( - objstore.WithNoopInstr(bkt), - metaFetcher, - "", - NewChunksLimiterFactory(10e6), - NewSeriesLimiterFactory(0), - NewBytesLimiterFactory(10e6), - NewGapBasedPartitioner(PartitionerMaxGapSize), - 20, - true, - DefaultPostingOffsetInMemorySampling, - false, - false, - 1*time.Minute, - WithChunkPool(chunkPool), - WithFilterConfig(allowAllFilterConf), - WithStreamingSeriesLimiterFactory(NewSeriesLimiterFactory(5)), - ) - testutil.Ok(t, err) - t.Cleanup(func() { testutil.Ok(t, bucketStore3.Close()) }) - - testutil.Ok(t, bucketStore3.SyncBlocks(context.Background())) - srv3 := newStoreSeriesServer(context.Background()) - testutil.Ok(t, bucketStore3.Series(req, srv3)) - testutil.Equals(t, 4, len(srv3.SeriesSet)) + testutil.Equals(t, 2, len(srv.SeriesSet)) } From db4c52cad5e9a401103cc30af9913be2cf1bd91d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 12 Dec 2023 09:19:16 +0000 Subject: [PATCH 5/5] fix comment Signed-off-by: Ben Ye --- pkg/store/bucket_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index e80bc2ada9..87659f5450 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3709,7 +3709,7 @@ func TestBucketStoreStreamingSeriesLimit(t *testing.T) { }) testutil.Ok(t, err) - // Set series limit to 1. Only pass if series limiter applies + // Set series limit to 2. Only pass if series limiter applies // for lazy postings only. bucketStore, err := NewBucketStore( objstore.WithNoopInstr(bkt),