Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Implement metadata API limit in stores #7652

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7609](https://github.com/thanos-io/thanos/pull/7609) API: Add limit param to metadata APIs (series, label names, label values).
- [#7429](https://github.com/thanos-io/thanos/pull/7429): Reloader: introduce `TolerateEnvVarExpansionErrors` to allow suppressing errors when expanding environment variables in the configuration file. When set, this will ensure that the reloader won't consider the operation to fail when an unset environment variable is encountered. Note that all unset environment variables are left as is, whereas all set environment variables are expanded as usual.
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.

### Changed

Expand Down
9 changes: 6 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,15 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -753,7 +754,7 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -763,6 +764,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -773,7 +775,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -783,6 +785,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req := storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Limit: int64(hints.Limit),
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
Expand Down Expand Up @@ -373,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_values")
defer span.Finish()

Expand All @@ -384,12 +385,18 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand All @@ -411,7 +418,7 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label

// LabelNames returns all the unique label names present in the block in sorted order constrained
// by the given matchers.
func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_names")
defer span.Finish()

Expand All @@ -423,11 +430,16 @@ func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matcher
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand Down
38 changes: 33 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ type blockSeriesClient struct {

mint int64
maxt int64
seriesLimit int
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
Expand Down Expand Up @@ -1083,6 +1084,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
seriesLimit: int(req.Limit),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
Expand Down Expand Up @@ -1162,14 +1164,20 @@ func (b *blockSeriesClient) ExpandPostings(
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// If seriesLimit is set, it can be applied here to limit the amount of series.
// Note: This can only be done when postings are not expanded lazily.
if b.seriesLimit > 0 && len(b.lazyPostings.postings) > b.seriesLimit {
b.lazyPostings.postings = b.lazyPostings.postings[:b.seriesLimit]
}

// Apply series limiter eargerly if lazy postings not enabled.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
if err := seriesLimiter.Reserve(uint64(len(b.lazyPostings.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
if b.batchSize > len(b.lazyPostings.postings) {
b.batchSize = len(b.lazyPostings.postings)
}

b.entries = make([]seriesEntry, 0, b.batchSize)
Expand Down Expand Up @@ -1291,6 +1299,11 @@ OUTER:
}

seriesMatched++
if b.seriesLimit > 0 && seriesMatched > b.seriesLimit {
// Exit early if seriesLimit is set.
b.hasMorePostings = false
break
}
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1694,7 +1707,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
i := 0
for set.Next() {
i++
if req.Limit > 0 && i > int(req.Limit) {
harry671003 marked this conversation as resolved.
Show resolved Hide resolved
break
}
at := set.At()
warn := at.GetWarning()
if warn != "" {
Expand Down Expand Up @@ -1945,8 +1963,13 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error())
}

names := strutil.MergeSlices(sets...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we think it is worth it to pass limit into strutil.MergeSlices and truncate there?

func MergeSlices(a ...[]string) []string {
	if len(a) == 0 {
		return nil
	}
	if len(a) == 1 {
		return a[0]
	}
	l := len(a) / 2
	return mergeTwoStringSlices(MergeSlices(a[:l]...), MergeSlices(a[l:]...))
}

For example, if the limit is < len(a) / 2, we can skip merging the second half of slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is not a blocker, we can do it in further prs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ben. I can look into it. Since this PR has gotten too big, I prefer to do in a follow up PR. :)

if req.Limit > 0 && len(names) > int(req.Limit) {
names = names[:req.Limit]
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
Names: names,
Hints: anyHints,
}, nil
}
Expand Down Expand Up @@ -2160,8 +2183,13 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error())
}

vals := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(vals) > int(req.Limit) {
vals = vals[:req.Limit]
}

return &storepb.LabelValuesResponse{
Values: strutil.MergeSlices(sets...),
Values: vals,
Hints: anyHints,
}, nil
}
Expand Down
135 changes: 135 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3947,3 +3947,138 @@ func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDa
}
return nil
}

func TestBucketStoreMetadataLimit(t *testing.T) {
tb := testutil.NewTB(t)

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, bkt.Close()) }()

uploadTestBlock(tb, tmpDir, bkt, 30000)

instrBkt := objstore.WithNoopInstr(bkt)
logger := log.NewNopLogger()

// Instance a real bucket store we'll use to query the series.
baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt)
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil)
testutil.Ok(tb, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
10,
false,
DefaultPostingOffsetInMemorySampling,
true,
false,
0,
WithLogger(logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))

seriesTests := map[string]struct {
limit int64
expectedResults int
}{
"series without limit": {
expectedResults: 12000,
},
"series with limit": {
limit: 11000,
expectedResults: 11000,
},
}

for testName, testData := range seriesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.SeriesRequest{
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

srv := newStoreSeriesServer(context.Background())
err = store.Series(req, srv)
testutil.Ok(t, err)
testutil.Assert(t, len(srv.SeriesSet) == testData.expectedResults)
})
}

labelNamesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label names without limit": {
expectedResults: []string{"ext1", "i", "j", "n", "uniq"},
},
"label names with limit": {
limit: 3,
expectedResults: []string{"ext1", "i", "j"},
},
}

for testName, testData := range labelNamesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

resp, err := store.LabelNames(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Names)
})
}

labelValuesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label values without limit": {
expectedResults: []string{"bar", "foo"},
},
"label values with limit": {
limit: 1,
expectedResults: []string{"bar"},
},
}

for testName, testData := range labelValuesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelValuesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Label: "j",
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "j", Value: "(foo|bar)"},
},
}

resp, err := store.LabelValues(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Values)
})
}
}
Loading
Loading