Skip to content

Commit

Permalink
store: Implement metadata API limit in stores (thanos-io#7652)
Browse files Browse the repository at this point in the history
* Store: Implement metadata API limit in stores

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

* Apply seriesLimit in nextBatch

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>

---------

Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 authored and jnyi committed Oct 16, 2024
1 parent 7a79f58 commit e52720b
Show file tree
Hide file tree
Showing 14 changed files with 638 additions and 171 deletions.
9 changes: 6 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,15 @@ func (c *Client) get2xxResultWithGRPCErrors(ctx context.Context, spanName string

// SeriesInGRPC returns the labels from Prometheus series API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]map[string]string, error) {
func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]map[string]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/series")
q := u.Query()

q.Add("match[]", storepb.PromMatchersToString(matchers...))
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -753,7 +754,7 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []*la

// LabelNamesInGRPC returns all known label names constrained by the given matchers. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()
Expand All @@ -763,6 +764,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand All @@ -773,7 +775,7 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers [

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []*labels.Matcher, startTime, endTime int64, limit int) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()
Expand All @@ -783,6 +785,7 @@ func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label str
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
q.Add("limit", strconv.Itoa(limit))
u.RawQuery = q.Encode()

var m struct {
Expand Down
16 changes: 14 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req := storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Limit: int64(hints.Limit),
Matchers: sms,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: aggrs,
Expand Down Expand Up @@ -446,7 +447,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_values")
defer span.Finish()

Expand All @@ -457,12 +458,18 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand All @@ -484,7 +491,7 @@ func (q *querier) LabelValues(ctx context.Context, name string, _ *storage.Label

// LabelNames returns all the unique label names present in the block in sorted order constrained
// by the given matchers.
func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
span, ctx := tracing.StartSpan(ctx, "querier_label_names")
defer span.Finish()

Expand All @@ -496,11 +503,16 @@ func (q *querier) LabelNames(ctx context.Context, _ *storage.LabelHints, matcher
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

if hints == nil {
hints = &storage.LabelHints{}
}

req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
Limit: int64(hints.Limit),
}

if q.isDedupEnabled() {
Expand Down
38 changes: 33 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@ type blockSeriesClient struct {

mint int64
maxt int64
seriesLimit int
indexr *bucketIndexReader
chunkr *bucketChunkReader
loadAggregates []storepb.Aggr
Expand Down Expand Up @@ -1093,6 +1094,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
seriesLimit: int(req.Limit),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
Expand Down Expand Up @@ -1172,14 +1174,20 @@ func (b *blockSeriesClient) ExpandPostings(
b.expandedPostings = make([]storage.SeriesRef, 0, len(b.lazyPostings.postings)/2)
b.lazyExpandedPostingsCount.Inc()
} else {
// If seriesLimit is set, it can be applied here to limit the amount of series.
// Note: This can only be done when postings are not expanded lazily.
if b.seriesLimit > 0 && len(b.lazyPostings.postings) > b.seriesLimit {
b.lazyPostings.postings = b.lazyPostings.postings[:b.seriesLimit]
}

// Apply series limiter eargerly if lazy postings not enabled.
if err := seriesLimiter.Reserve(uint64(len(ps.postings))); err != nil {
if err := seriesLimiter.Reserve(uint64(len(b.lazyPostings.postings))); err != nil {
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded series limit: %s", err)
}
}

if b.batchSize > len(ps.postings) {
b.batchSize = len(ps.postings)
if b.batchSize > len(b.lazyPostings.postings) {
b.batchSize = len(b.lazyPostings.postings)
}

b.entries = make([]seriesEntry, 0, b.batchSize)
Expand Down Expand Up @@ -1301,6 +1309,11 @@ OUTER:
}

seriesMatched++
if b.seriesLimit > 0 && seriesMatched > b.seriesLimit {
// Exit early if seriesLimit is set.
b.hasMorePostings = false
break
}
s := seriesEntry{lset: completeLabelset}
if b.skipChunks {
b.entries = append(b.entries, s)
Expand Down Expand Up @@ -1704,7 +1717,12 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
set := NewResponseDeduplicator(NewProxyResponseLoserTree(respSets...))
i := 0
for set.Next() {
i++
if req.Limit > 0 && i > int(req.Limit) {
break
}
at := set.At()
warn := at.GetWarning()
if warn != "" {
Expand Down Expand Up @@ -1955,8 +1973,13 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error())
}

names := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(names) > int(req.Limit) {
names = names[:req.Limit]
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
Names: names,
Hints: anyHints,
}, nil
}
Expand Down Expand Up @@ -2170,8 +2193,13 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error())
}

vals := strutil.MergeSlices(sets...)
if req.Limit > 0 && len(vals) > int(req.Limit) {
vals = vals[:req.Limit]
}

return &storepb.LabelValuesResponse{
Values: strutil.MergeSlices(sets...),
Values: vals,
Hints: anyHints,
}, nil
}
Expand Down
135 changes: 135 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3947,3 +3947,138 @@ func (m *compositeBytesLimiterMock) ReserveWithType(num uint64, dataType StoreDa
}
return nil
}

func TestBucketStoreMetadataLimit(t *testing.T) {
tb := testutil.NewTB(t)

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, bkt.Close()) }()

uploadTestBlock(tb, tmpDir, bkt, 30000)

instrBkt := objstore.WithNoopInstr(bkt)
logger := log.NewNopLogger()

// Instance a real bucket store we'll use to query the series.
baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt)
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil)
testutil.Ok(tb, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
10,
false,
DefaultPostingOffsetInMemorySampling,
true,
false,
0,
WithLogger(logger),
WithIndexCache(indexCache),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))

seriesTests := map[string]struct {
limit int64
expectedResults int
}{
"series without limit": {
expectedResults: 12000,
},
"series with limit": {
limit: 11000,
expectedResults: 11000,
},
}

for testName, testData := range seriesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.SeriesRequest{
MinTime: timestamp.FromTime(minTime),
MaxTime: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

srv := newStoreSeriesServer(context.Background())
err = store.Series(req, srv)
testutil.Ok(t, err)
testutil.Assert(t, len(srv.SeriesSet) == testData.expectedResults)
})
}

labelNamesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label names without limit": {
expectedResults: []string{"ext1", "i", "j", "n", "uniq"},
},
"label names with limit": {
limit: 3,
expectedResults: []string{"ext1", "i", "j"},
},
}

for testName, testData := range labelNamesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelNamesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "j", Value: "foo"},
},
}

resp, err := store.LabelNames(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Names)
})
}

labelValuesTests := map[string]struct {
limit int64
expectedResults []string
}{
"label values without limit": {
expectedResults: []string{"bar", "foo"},
},
"label values with limit": {
limit: 1,
expectedResults: []string{"bar"},
},
}

for testName, testData := range labelValuesTests {
t.Run(testName, func(t *testing.T) {
req := &storepb.LabelValuesRequest{
Start: timestamp.FromTime(minTime),
End: timestamp.FromTime(maxTime),
Label: "j",
Limit: testData.limit,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "j", Value: "(foo|bar)"},
},
}

resp, err := store.LabelValues(context.Background(), req)
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedResults, resp.Values)
})
}
}
12 changes: 6 additions & 6 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

if r.SkipChunks {
finalExtLset := rmLabels(extLset.Copy(), extLsetToRemove)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime)
labelMaps, err := p.client.SeriesInGRPC(s.Context(), p.base, matchers, r.MinTime, r.MaxTime, int(r.Limit))
if err != nil {
return err
}
Expand Down Expand Up @@ -571,12 +571,12 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR

var lbls []string
if len(matchers) == 0 || p.labelCallsSupportMatchers() {
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End)
lbls, err = p.client.LabelNamesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
} else {
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err := p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -642,7 +642,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if len(matchers) == 0 {
return &storepb.LabelValuesResponse{Values: []string{val}}, nil
}
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand All @@ -653,12 +653,12 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
}

if len(matchers) == 0 || p.labelCallsSupportMatchers() {
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End)
vals, err = p.client.LabelValuesInGRPC(ctx, p.base, r.Label, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
} else {
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End)
sers, err = p.client.SeriesInGRPC(ctx, p.base, matchers, r.Start, r.End, int(r.Limit))
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit e52720b

Please sign in to comment.