Skip to content

Commit

Permalink
store: Implement metadata API limit in stores (#7652)
Browse files Browse the repository at this point in the history
* Store: Implement metadata API limit in stores

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

* Apply seriesLimit in nextBatch

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

---------

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 authored Sep 3, 2024
1 parent 295d8a9 commit 2c488dc
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 171 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7609](https://github.com/thanos-io/thanos/pull/7609) API: Add limit param to metadata APIs (series, label names, label values).
- [#7429](https://github.com/thanos-io/thanos/pull/7429): Reloader: introduce `TolerateEnvVarExpansionErrors` to allow suppressing errors when expanding environment variables in the configuration file. When set, this will ensure that the reloader won't consider the operation to fail when an unset environment variable is encountered. Note that all unset environment variables are left as is, whereas all set environment variables are expanded as usual.
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.

### Changed

Expand Down
9 changes: 6 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,15 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -753,7 +754,7 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -763,6 +764,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -773,7 +775,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -783,6 +785,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req := storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Limit: int64(hints.Limit),
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
Expand Down Expand Up @@ -373,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_values")
defer span.Finish()

Expand All @@ -384,12 +385,18 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand All @@ -411,7 +418,7 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label

// LabelNames returns all the unique label names present in the block in sorted order constrained
// by the given matchers.
func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_names")
defer span.Finish()

Expand All @@ -423,11 +430,16 @@ func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matcher
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand Down
38 changes: 33 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ type blockSeriesClient struct {

mint int64
maxt int64
seriesLimit int
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
Expand Down Expand Up @@ -1083,6 +1084,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
seriesLimit: int(req.Limit),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
Expand Down Expand Up @@ -1162,14 +1164,20 @@ func (b *blockSeriesClient) ExpandPostings(
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// If seriesLimit is set, it can be applied here to limit the amount of series.
// Note: This can only be done when postings are not expanded lazily.
if b.seriesLimit > 0 && len(b.lazyPostings.postings) > b.seriesLimit {
b.lazyPostings.postings = b.lazyPostings.postings[:b.seriesLimit]
}

// Apply series limiter eargerly if lazy postings not enabled.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
if err := seriesLimiter.Reserve(uint64(len(b.lazyPostings.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
if b.batchSize > len(b.lazyPostings.postings) {
b.batchSize = len(b.lazyPostings.postings)
}

b.entries = make([]seriesEntry, 0, b.batchSize)
Expand Down Expand Up @@ -1291,6 +1299,11 @@ OUTER:
}

seriesMatched++
if b.seriesLimit > 0 && seriesMatched > b.seriesLimit {
// Exit early if seriesLimit is set.
b.hasMorePostings = false
break
}
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1694,7 +1707,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
i := 0
for set.Next() {
i++
if req.Limit > 0 && i > int(req.Limit) {
break
}
at := set.At()
warn := at.GetWarning()
if warn != "" {
Expand Down Expand Up @@ -1945,8 +1963,13 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error())
}

names := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(names) > int(req.Limit) {
names = names[:req.Limit]
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
Names: names,
Hints: anyHints,
}, nil
}
Expand Down Expand Up @@ -2160,8 +2183,13 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error())
}

vals := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(vals) > int(req.Limit) {
vals = vals[:req.Limit]
}

return &storepb.LabelValuesResponse{
Values: strutil.MergeSlices(sets...),
Values: vals,
Hints: anyHints,
}, nil
}
Expand Down
135 changes: 135 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3947,3 +3947,138 @@ func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDa
}
return nil
}

func TestBucketStoreMetadataLimit(t *testing.T) {
tb := testutil.NewTB(t)

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, bkt.Close()) }()

uploadTestBlock(tb, tmpDir, bkt, 30000)

instrBkt := objstore.WithNoopInstr(bkt)
logger := log.NewNopLogger()

// Instance a real bucket store we'll use to query the series.
baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt)
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil)
testutil.Ok(tb, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
10,
false,
DefaultPostingOffsetInMemorySampling,
true,
false,
0,
WithLogger(logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))

seriesTests := map[string]struct {
limit int64
expectedResults int
}{
"series without limit": {
expectedResults: 12000,
},
"series with limit": {
limit: 11000,
expectedResults: 11000,
},
}

for testName, testData := range seriesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.SeriesRequest{
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

srv := newStoreSeriesServer(context.Background())
err = store.Series(req, srv)
testutil.Ok(t, err)
testutil.Assert(t, len(srv.SeriesSet) == testData.expectedResults)
})
}

labelNamesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label names without limit": {
expectedResults: []string{"ext1", "i", "j", "n", "uniq"},
},
"label names with limit": {
limit: 3,
expectedResults: []string{"ext1", "i", "j"},
},
}

for testName, testData := range labelNamesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

resp, err := store.LabelNames(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Names)
})
}

labelValuesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label values without limit": {
expectedResults: []string{"bar", "foo"},
},
"label values with limit": {
limit: 1,
expectedResults: []string{"bar"},
},
}

for testName, testData := range labelValuesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelValuesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Label: "j",
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "j", Value: "(foo|bar)"},
},
}

resp, err := store.LabelValues(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Values)
})
}
}
Loading

0 comments on commit 2c488dc

Please sign in to comment.