From cb385087b741d8966577038f685338b2c0f23cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20P=C5=82otka?= Date: Fri, 15 Feb 2019 20:33:14 +0000 Subject: [PATCH] Added missing matcher logic for negative matchers. (#839) Needed to refactor logic to add "posting groups". Signed-off-by: Bartek Plotka --- pkg/store/bucket.go | 171 ++++++++++++++++++++++------------- pkg/store/bucket_e2e_test.go | 102 +++++++++++++++++++++ 2 files changed, 211 insertions(+), 62 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index db2a280c21..91fb304eb8 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1168,7 +1168,7 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { return s, nil } -// Postings returns postings in expanded list instead of index.Postings. +// ExpandedPostings returns postings in expanded list instead of index.Postings. // This is because we need to have them buffered anyway to perform efficient lookup // on object storage. // Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during @@ -1178,32 +1178,33 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) { // chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by // single label name=value. func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) { - var postingsToIntersect []index.Postings + var postingGroups []*postingGroup // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { - matching, err := matchingLabels(r.LabelValues, m) - if err != nil { - return nil, errors.Wrap(err, "match labels") - } - if len(matching) == 0 { + matchingGroup := toPostingGroup(r.LabelValues, m) + if matchingGroup == nil { continue } - // We need to load all matching postings to tell what postings are intersecting with what. - postings, err := r.fetchPostings(matching) - if err != nil { - return nil, errors.Wrap(err, "get postings") - } - - postingsToIntersect = append(postingsToIntersect, postings) + // Each group is separate to tell later what postings are intersecting with what. + postingGroups = append(postingGroups, matchingGroup) } - if len(postingsToIntersect) == 0 { + if len(postingGroups) == 0 { return nil, nil } - ps, err := index.ExpandPostings(index.Intersect(postingsToIntersect...)) + if err := r.fetchPostings(postingGroups); err != nil { + return nil, errors.Wrap(err, "get postings") + } + + var postings []index.Postings + for _, g := range postingGroups { + postings = append(postings, g.Postings()) + } + + ps, err := index.ExpandPostings(index.Intersect(postings...)) if err != nil { return nil, errors.Wrap(err, "expand") } @@ -1219,70 +1220,120 @@ func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, err return ps, nil } +type postingGroup struct { + keys labels.Labels + postings []index.Postings + + aggregate func(postings []index.Postings) index.Postings +} + +func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup { + return &postingGroup{ + keys: keys, + postings: make([]index.Postings, len(keys)), + aggregate: aggr, + } +} + +func (p *postingGroup) Fill(i int, posting index.Postings) { + p.postings[i] = posting +} + +func (p *postingGroup) Postings() index.Postings { + return p.aggregate(p.postings) +} + +func merge(p []index.Postings) index.Postings { + return index.Merge(p...) +} + +func allWithout(p []index.Postings) index.Postings { + return index.Without(p[0], index.Merge(p[1:]...)) +} + // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. -func matchingLabels(lvalsFn func(name string) []string, m labels.Matcher) (labels.Labels, error) { +func toPostingGroup(lvalsFn func(name string) []string, m labels.Matcher) *postingGroup { + var matchingLabels labels.Labels + // If the matcher selects an empty value, it selects all the series which don't // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 if m.Matches("") { - // We don't support tsdb.postingsForUnsetLabelMatcher. - // This is because it requires fetching all postings for index. - // This requires additional logic to avoid fetching big bytes range (todo: how big?). See https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 - // to what it blocks. - return nil, errors.Errorf("support for <> != matcher is not implemented; empty matcher for label name %s", m.Name()) + allName, allValue := index.AllPostingsKey() + + matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue}) + for _, val := range lvalsFn(m.Name()) { + if !m.Matches(val) { + matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val}) + } + } + + if len(matchingLabels) == 1 { + // This is known hack to return all series. + // Ask for x != . Allow for that as Prometheus does, + // even though it is expensive. + return newPostingGroup(matchingLabels, merge) + } + + return newPostingGroup(matchingLabels, allWithout) } // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - return labels.Labels{{Name: em.Name(), Value: em.Value()}}, nil + return newPostingGroup(labels.Labels{{Name: em.Name(), Value: em.Value()}}, merge) } - var matchingLabels labels.Labels for _, val := range lvalsFn(m.Name()) { if m.Matches(val) { matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val}) } } - return matchingLabels, nil -} + if len(matchingLabels) == 0 { + return nil + } -type postingPtr struct { - key labels.Label - ptr index.Range + return newPostingGroup(matchingLabels, merge) } -// fetchPostings returns sorted slice of postings that match the selected labels. -func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) { - var ( - ptrs []postingPtr - postings = make([]index.Postings, 0, len(keys)) - ) - - // TODO(bwplotka): sort postings? +type postingPtr struct { + groupID int + keyID int + ptr index.Range +} + +// fetchPostings fill postings requested by posting groups. +func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { + var ptrs []postingPtr + + // Iterate over all groups and fetch posting from cache. + // If we have a miss, mark key to be fetched in `ptrs` slice. + // Overlaps are well handled by partitioner, so we don't need to deduplicate keys. + for i, g := range groups { + for j, key := range g.keys { + // Get postings for the given key from cache first. + if b, ok := r.cache.postings(r.block.meta.ULID, key); ok { + r.stats.postingsTouched++ + r.stats.postingsTouchedSizeSum += len(b) - for _, k := range keys { - // Get postings for given key from cache first. - if b, ok := r.cache.postings(r.block.meta.ULID, k); ok { - r.stats.postingsTouched++ - r.stats.postingsTouchedSizeSum += len(b) + _, l, err := r.dec.Postings(b) + if err != nil { + return errors.Wrap(err, "decode postings") + } + g.Fill(j, l) + continue + } - _, l, err := r.dec.Postings(b) - if err != nil { - return nil, errors.Wrap(err, "decode postings") + // Cache miss; save pointer for actual posting in index stored in object store. + ptr, ok := r.block.postings[key] + if !ok { + // This block does not have any posting for given key. + g.Fill(j, index.EmptyPostings()) + continue } - postings = append(postings, l) - continue - } - // Cache miss; save pointer for actual posting in index stored in object store. - ptr, ok := r.block.postings[k] - if !ok { - // Index malformed? Should not happen. - continue + ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j}) } - - ptrs = append(ptrs, postingPtr{ptr: ptr, key: k}) } sort.Slice(ptrs, func(i, j int) bool { @@ -1331,8 +1382,8 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e } // Return postings and fill LRU cache. - postings = append(postings, fetchedPostings) - r.cache.setPostings(r.block.meta.ULID, p.key, c) + groups[p.groupID].Fill(p.keyID, fetchedPostings) + r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c) // If we just fetched it we still have to update the stats for touched postings. r.stats.postingsTouched++ @@ -1346,11 +1397,7 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e }) } - if err := g.Run(); err != nil { - return nil, err - } - - return index.Merge(postings...), nil + return g.Run() } func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index ca67956644..d900cea9b0 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -153,6 +153,74 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, }, }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NRE, Name: "a", Value: "2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NRE, Name: "a", Value: "not_existing"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NRE, Name: "not_existing", Value: "1"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, { req: &storepb.SeriesRequest{ Matchers: []storepb.LabelMatcher{ @@ -191,6 +259,40 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { MaxTime: maxt, }, }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "not_existing"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, } { t.Log("Run ", i)