Skip to content

Commit

Permalink
storegw: Optimized inject label stage of index lookup.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Dec 10, 2020
1 parent 4abb51a commit 52e5e0b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 14 deletions.
44 changes: 30 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (s *bucketSeriesSet) Err() error {
}

func blockSeries(
extLset map[string]string,
extLset labels.Labels,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
Expand Down Expand Up @@ -743,18 +743,7 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

for _, l := range lset {
// Skip if the external labels of the block overrule the series' label.
// NOTE(fabxc): maybe move it to a prefixed version to still ensure uniqueness of series?
if extLset[l.Name] != "" {
continue
}
s.lset = append(s.lset, l)
}
for ln, lv := range extLset {
s.lset = append(s.lset, labels.Label{Name: ln, Value: lv})
}
sort.Sort(s.lset)
s.lset = injectLabels(lset, extLset)
res = append(res, s)
}

Expand Down Expand Up @@ -782,6 +771,31 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func injectLabels(in labels.Labels, extLset labels.Labels) labels.Labels {
out := make(labels.Labels, 0, len(in)+len(extLset))

// Inject external labels in place.
for len(in) > 0 && len(extLset) > 0 {
d := strings.Compare(in[0].Name, extLset[0].Name)
if d == 0 {
// Duplicate, prefer external labels.
// NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series?
out = append(out, extLset[0])
in, extLset = in[1:], extLset[1:]
} else if d < 0 {
out = append(out, in[0])
in = in[1:]
} else if d > 0 {
out = append(out, extLset[0])
extLset = extLset[1:]
}
}
// Append all remaining elements.
out = append(out, in...)
out = append(out, extLset...)
return out
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error {
if in.Encoding() == chunkenc.EncXOR {
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()}
Expand Down Expand Up @@ -943,7 +957,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie

g.Go(func() error {
part, pstats, err := blockSeries(
b.meta.Thanos.Labels,
b.extLset,
indexr,
chunkr,
blockMatchers,
Expand Down Expand Up @@ -1374,6 +1388,7 @@ type bucketBlock struct {
dir string
indexCache storecache.IndexCache
chunkPool pool.BytesPool
extLset labels.Labels

indexHeaderReader indexheader.Reader

Expand Down Expand Up @@ -1410,6 +1425,7 @@ func newBucketBlock(
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
extLset: labels.FromMap(meta.Thanos.Labels),
}

// Translate the block's labels and inject the block ID as a label
Expand Down
68 changes: 68 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,3 +2215,71 @@ func labelNamesFromSeriesSet(series []*storepb.Series) []string {
sort.Strings(labels)
return labels
}

func TestInjectExtLabels(t *testing.T) {
testInjectExtLabels(testutil.NewTB(t))
}

func BenchmarkInjectExtLabels(b *testing.B) {
testInjectExtLabels(testutil.NewTB(b))
}

var x labels.Labels

func testInjectExtLabels(tb testutil.TB) {
in := labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Self-Support", // Should be overwritten.
)
extLset := labels.FromStrings(
"support", "Host-Support",
"replica", "1",
"tenant", "2342",
)
tb.ResetTimer()
for i := 0; i < tb.N(); i++ {
x = injectLabels(in, extLset)

if !tb.IsBenchmark() {
testutil.Equals(tb, labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"replica", "1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Host-Support",
"tenant", "2342",
), x)
}
}
fmt.Fprint(ioutil.Discard, x)
}

0 comments on commit 52e5e0b

Please sign in to comment.