Skip to content

Commit

Permalink
Added missing matcher logic for negative matchers. (#839)
Browse files Browse the repository at this point in the history
Needed to refactor logic to add "posting groups".

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Feb 15, 2019
1 parent ee214c3 commit cb38508
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 62 deletions.
171 changes: 109 additions & 62 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
return s, nil
}

// Postings returns postings in expanded list instead of index.Postings.
// ExpandedPostings returns postings in expanded list instead of index.Postings.
// This is because we need to have them buffered anyway to perform efficient lookup
// on object storage.
// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during
Expand All @@ -1178,32 +1178,33 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) {
var postingsToIntersect []index.Postings
var postingGroups []*postingGroup

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
matching, err := matchingLabels(r.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "match labels")
}
if len(matching) == 0 {
matchingGroup := toPostingGroup(r.LabelValues, m)
if matchingGroup == nil {
continue
}

// We need to load all matching postings to tell what postings are intersecting with what.
postings, err := r.fetchPostings(matching)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

postingsToIntersect = append(postingsToIntersect, postings)
// Each group is separate to tell later what postings are intersecting with what.
postingGroups = append(postingGroups, matchingGroup)
}

if len(postingsToIntersect) == 0 {
if len(postingGroups) == 0 {
return nil, nil
}

ps, err := index.ExpandPostings(index.Intersect(postingsToIntersect...))
if err := r.fetchPostings(postingGroups); err != nil {
return nil, errors.Wrap(err, "get postings")
}

var postings []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1219,70 +1220,120 @@ func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, err
return ps, nil
}

type postingGroup struct {
keys labels.Labels
postings []index.Postings

aggregate func(postings []index.Postings) index.Postings
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
}

func (p *postingGroup) Postings() index.Postings {
return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func matchingLabels(lvalsFn func(name string) []string, m labels.Matcher) (labels.Labels, error) {
func toPostingGroup(lvalsFn func(name string) []string, m labels.Matcher) *postingGroup {
var matchingLabels labels.Labels

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") {
// We don't support tsdb.postingsForUnsetLabelMatcher.
// This is because it requires fetching all postings for index.
// This requires additional logic to avoid fetching big bytes range (todo: how big?). See https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
// to what it blocks.
return nil, errors.Errorf("support for <> != <val> matcher is not implemented; empty matcher for label name %s", m.Name())
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
for _, val := range lvalsFn(m.Name()) {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge)
}

return newPostingGroup(matchingLabels, allWithout)
}

// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
return labels.Labels{{Name: em.Name(), Value: em.Value()}}, nil
return newPostingGroup(labels.Labels{{Name: em.Name(), Value: em.Value()}}, merge)
}

var matchingLabels labels.Labels
for _, val := range lvalsFn(m.Name()) {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

return matchingLabels, nil
}
if len(matchingLabels) == 0 {
return nil
}

type postingPtr struct {
key labels.Label
ptr index.Range
return newPostingGroup(matchingLabels, merge)
}

// fetchPostings returns sorted slice of postings that match the selected labels.
func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) {
var (
ptrs []postingPtr
postings = make([]index.Postings, 0, len(keys))
)

// TODO(bwplotka): sort postings?
type postingPtr struct {
groupID int
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
var ptrs []postingPtr

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, key); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

for _, k := range keys {
// Get postings for given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, k); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)
_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
g.Fill(j, l)
continue
}

_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[key]
if !ok {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
postings = append(postings, l)
continue
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[k]
if !ok {
// Index malformed? Should not happen.
continue
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
}

ptrs = append(ptrs, postingPtr{ptr: ptr, key: k})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1331,8 +1382,8 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
}

// Return postings and fill LRU cache.
postings = append(postings, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, p.key, c)
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1346,11 +1397,7 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
})
}

if err := g.Run(); err != nil {
return nil, err
}

return index.Merge(postings...), nil
return g.Run()
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
Expand Down
102 changes: 102 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,74 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "not_existing", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -191,6 +259,40 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
t.Log("Run ", i)

Expand Down

0 comments on commit cb38508

Please sign in to comment.