Skip to content

Commit

Permalink
Added missing matcher logic for negative matchers.
Browse files Browse the repository at this point in the history
Needed to refactor logic to add "posting groups".

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Feb 15, 2019
1 parent ee214c3 commit 18a3a88
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 62 deletions.
171 changes: 109 additions & 62 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
return s, nil
}

// Postings returns postings in expanded list instead of index.Postings.
// ExpandedPostings returns postings in expanded list instead of index.Postings.
// This is because we need to have them buffered anyway to perform efficient lookup
// on object storage.
// Found posting IDs (ps) are not strictly required to point to a valid Series, e.g. during
Expand All @@ -1178,32 +1178,33 @@ func (r *bucketIndexReader) lookupSymbol(o uint32) (string, error) {
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, error) {
var postingsToIntersect []index.Postings
var postingGroups []*postingGroup

// NOTE: Derived from tsdb.PostingsForMatchers.
for _, m := range ms {
matching, err := matchingLabels(r.LabelValues, m)
if err != nil {
return nil, errors.Wrap(err, "match labels")
}
if len(matching) == 0 {
matchingGroup := toPostingGroup(r.LabelValues, m)
if matchingGroup == nil {
continue
}

// We need to load all matching postings to tell what postings are intersecting with what.
postings, err := r.fetchPostings(matching)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}

postingsToIntersect = append(postingsToIntersect, postings)
// Each group is separate to tell later what postings are intersecting with what.
postingGroups = append(postingGroups, matchingGroup)
}

if len(postingsToIntersect) == 0 {
if len(postingGroups) == 0 {
return nil, nil
}

ps, err := index.ExpandPostings(index.Intersect(postingsToIntersect...))
if err := r.fetchPostings(postingGroups); err != nil {
return nil, errors.Wrap(err, "get postings")
}

var postings []index.Postings
for _, g := range postingGroups {
postings = append(postings, g.Postings())
}

ps, err := index.ExpandPostings(index.Intersect(postings...))
if err != nil {
return nil, errors.Wrap(err, "expand")
}
Expand All @@ -1219,70 +1220,120 @@ func (r *bucketIndexReader) ExpandedPostings(ms []labels.Matcher) ([]uint64, err
return ps, nil
}

type postingGroup struct {
keys labels.Labels
postings []index.Postings

aggregate func(postings []index.Postings) index.Postings
}

func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
return &postingGroup{
keys: keys,
postings: make([]index.Postings, len(keys)),
aggregate: aggr,
}
}

func (p *postingGroup) Fill(i int, posting index.Postings) {
p.postings[i] = posting
}

func (p *postingGroup) Postings() index.Postings {
return p.aggregate(p.postings)
}

func merge(p []index.Postings) index.Postings {
return index.Merge(p...)
}

func allWithout(p []index.Postings) index.Postings {
return index.Without(p[0], index.Merge(p[1:]...))
}

// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
func matchingLabels(lvalsFn func(name string) []string, m labels.Matcher) (labels.Labels, error) {
func toPostingGroup(lvalsFn func(name string) []string, m labels.Matcher) *postingGroup {
var matchingLabels labels.Labels

// If the matcher selects an empty value, it selects all the series which don't
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") {
// We don't support tsdb.postingsForUnsetLabelMatcher.
// This is because it requires fetching all postings for index.
// This requires additional logic to avoid fetching big bytes range (todo: how big?). See https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
// to what it blocks.
return nil, errors.Errorf("support for <> != <val> matcher is not implemented; empty matcher for label name %s", m.Name())
allName, allValue := index.AllPostingsKey()

matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
for _, val := range lvalsFn(m.Name()) {
if !m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

if len(matchingLabels) == 1 {
// This is known hack to return all series.
// Ask for x != <not existing value>. Allow for that as Prometheus does,
// even though it is expensive.
return newPostingGroup(matchingLabels, merge)
}

return newPostingGroup(matchingLabels, allWithout)
}

// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
return labels.Labels{{Name: em.Name(), Value: em.Value()}}, nil
return newPostingGroup(labels.Labels{{Name: em.Name(), Value: em.Value()}}, merge)
}

var matchingLabels labels.Labels
for _, val := range lvalsFn(m.Name()) {
if m.Matches(val) {
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name(), Value: val})
}
}

return matchingLabels, nil
}
if len(matchingLabels) == 0 {
return nil
}

type postingPtr struct {
key labels.Label
ptr index.Range
return newPostingGroup(matchingLabels, merge)
}

// fetchPostings returns sorted slice of postings that match the selected labels.
func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) {
var (
ptrs []postingPtr
postings = make([]index.Postings, 0, len(keys))
)

// TODO(bwplotka): sort postings?
type postingPtr struct {
groupID int
keyID int
ptr index.Range
}

// fetchPostings fill postings requested by posting groups.
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
var ptrs []postingPtr

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
for i, g := range groups {
for j, key := range g.keys {
// Get postings for the given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, key); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)

for _, k := range keys {
// Get postings for given key from cache first.
if b, ok := r.cache.postings(r.block.meta.ULID, k); ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)
_, l, err := r.dec.Postings(b)
if err != nil {
return errors.Wrap(err, "decode postings")
}
g.Fill(j, l)
continue
}

_, l, err := r.dec.Postings(b)
if err != nil {
return nil, errors.Wrap(err, "decode postings")
// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[key]
if !ok {
// This block does not have any posting for given key.
g.Fill(j, index.EmptyPostings())
continue
}
postings = append(postings, l)
continue
}

// Cache miss; save pointer for actual posting in index stored in object store.
ptr, ok := r.block.postings[k]
if !ok {
// Index malformed? Should not happen.
continue
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
}

ptrs = append(ptrs, postingPtr{ptr: ptr, key: k})
}

sort.Slice(ptrs, func(i, j int) bool {
Expand Down Expand Up @@ -1331,8 +1382,8 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
}

// Return postings and fill LRU cache.
postings = append(postings, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, p.key, c)
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.setPostings(r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
Expand All @@ -1346,11 +1397,7 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e
})
}

if err := g.Run(); err != nil {
return nil, err
}

return index.Merge(postings...), nil
return g.Run()
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
Expand Down
102 changes: 102 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,74 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_RE, Name: "a", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NRE, Name: "not_existing", Value: "1"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
Expand Down Expand Up @@ -191,6 +259,40 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) {
MaxTime: maxt,
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "2"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
{
req: &storepb.SeriesRequest{
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_NEQ, Name: "a", Value: "not_existing"},
},
MinTime: mint,
MaxTime: maxt,
},
expected: [][]storepb.Label{
{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}},
{{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}},
},
},
} {
t.Log("Run ", i)

Expand Down

0 comments on commit 18a3a88

Please sign in to comment.