Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storegw: Optimized inject label stage of index lookup. #3568

Merged
merged 3 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ARG SHA="0c38f63cbe19e40123668a48c36466ef72b195e723cbfcbe01e9657a5f14cec6"
# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions.
ARG SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c"
FROM quay.io/prometheus/busybox@sha256:${SHA}
LABEL maintainer="The Thanos Authors"

Expand Down
3 changes: 2 additions & 1 deletion Dockerfile.multi-stage
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ RUN git update-index --refresh; make build

# -----------------------------------------------------------------------------

ARG SHA="0c38f63cbe19e40123668a48c36466ef72b195e723cbfcbe01e9657a5f14cec6"
# By default we pin to amd64 sha. Use make docker to automatically adjust for arm64 versions.
ARG SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c"
FROM quay.io/prometheus/busybox@sha256:${SHA}
LABEL maintainer="The Thanos Authors"

Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ DOCKER_CI_TAG ?= test
SHA=''
arch = $(shell uname -m)
# Run `DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect quay.io/prometheus/busybox:latest` to get SHA
# Update at 2020.07.06
# Update at 2020.12.11
ifeq ($(arch), x86_64)
# amd64
SHA="248b7ec76e03e6b4fbb796fc3cdd2f91dad45546a6d7dee61c322475e0e8a08f"
SHA="fca3819d670cdaee0d785499fda202ea01c0640ca0803d26ae6dbf2a1c8c041c"
else ifeq ($(arch), armv8)
# arm64
SHA="69508e8fdc516eacbacc0379c03c971e3043706cc8211e6bddb35d903edc3628"
SHA="5478a46f1eb37ebe414c399766f8088bc8353345602053485dd429b9a87097e5"
else
echo >&2 "only support amd64 or arm64 arch" && exit 1
endif
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -701,6 +702,7 @@ func parseFlagLabels(s []string) (labels.Labels, error) {
}
lset = append(lset, labels.Label{Name: parts[0], Value: val})
}
sort.Sort(lset)
return lset, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func IsWALDirAccessible(dir string) error {
return nil
}

// ExternalLabels returns external labels from /api/v1/status/config Prometheus endpoint.
// ExternalLabels returns sorted external labels from /api/v1/status/config Prometheus endpoint.
// Note that configuration can be hot reloadable on Prometheus, so this config might change in runtime.
func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labels, error) {
u := *base
Expand Down Expand Up @@ -181,7 +181,10 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe
if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil {
return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML)
}
return labels.FromMap(cfg.Global.ExternalLabels), nil

lset := labels.FromMap(cfg.Global.ExternalLabels)
sort.Sort(lset)
return lset, nil
}

type Flags struct {
Expand Down
9 changes: 6 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand All @@ -43,6 +44,8 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
}

// NewMultiTSDB creates new MultiTSDB.
// NOTE: Passed labels has to be sorted by name.
func NewMultiTSDB(
dataDir string,
l log.Logger,
Expand Down Expand Up @@ -262,7 +265,7 @@ func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
dataDir := t.defaultTenantDataDir(tenantID)

level.Info(logger).Log("msg", "opening TSDB")
Expand All @@ -286,13 +289,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
reg,
dataDir,
t.bucket,
func() labels.Labels { return lbls },
func() labels.Labels { return lset },
metadata.ReceiveSource,
false,
t.allowOutOfOrderUpload,
)
}
tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship)
tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lset), s, ship)
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/rules/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func (p *Prometheus) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer)
return nil
}

// extLset has to be sorted.
func enrichRulesWithExtLabels(groups []*rulespb.RuleGroup, extLset labels.Labels) {
for _, g := range groups {
for _, r := range g.Rules {
r.SetLabels(labelpb.ExtendLabels(r.GetLabels(), extLset))
r.SetLabels(labelpb.ExtendSortedLabels(r.GetLabels(), extLset))
}
}
}
37 changes: 14 additions & 23 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (s *bucketSeriesSet) Err() error {
}

func blockSeries(
extLset map[string]string,
extLset labels.Labels,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
matchers []*labels.Matcher,
Expand Down Expand Up @@ -718,7 +718,7 @@ func blockSeries(
continue
}

s := seriesEntry{lset: make(labels.Labels, 0, len(symbolizedLset)+len(extLset))}
s := seriesEntry{}
if !req.SkipChunks {
// Schedule loading chunks.
s.refs = make([]uint64, 0, len(chks))
Expand All @@ -743,18 +743,7 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

for _, l := range lset {
// Skip if the external labels of the block overrule the series' label.
// NOTE(fabxc): maybe move it to a prefixed version to still ensure uniqueness of series?
if extLset[l.Name] != "" {
continue
}
s.lset = append(s.lset, l)
}
for ln, lv := range extLset {
s.lset = append(s.lset, labels.Label{Name: ln, Value: lv})
}
sort.Sort(s.lset)
s.lset = labelpb.ExtendSortedLabels(lset, extLset)
res = append(res, s)
}

Expand Down Expand Up @@ -943,7 +932,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie

g.Go(func() error {
part, pstats, err := blockSeries(
b.meta.Thanos.Labels,
b.extLset,
indexr,
chunkr,
blockMatchers,
Expand Down Expand Up @@ -1374,6 +1363,7 @@ type bucketBlock struct {
dir string
indexCache storecache.IndexCache
chunkPool pool.BytesPool
extLset labels.Labels

indexHeaderReader indexheader.Reader

Expand Down Expand Up @@ -1410,14 +1400,15 @@ func newBucketBlock(
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
}

// Translate the block's labels and inject the block ID as a label
// to allow to match blocks also by ID.
b.relabelLabels = append(labels.FromMap(meta.Thanos.Labels), labels.Label{
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
})
extLset: labels.FromMap(meta.Thanos.Labels),
// Translate the block's labels and inject the block ID as a label
// to allow to match blocks also by ID.
relabelLabels: append(labels.FromMap(meta.Thanos.Labels), labels.Label{
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
}),
}
sort.Sort(b.extLset)
sort.Sort(b.relabelLabels)

// Get object handles for all chunk files (segment files) from meta.json, if available.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
extLset: extLset,
}
blocks = append(blocks, b)
}
Expand Down
38 changes: 23 additions & 15 deletions pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,35 @@ func (m *ZLabel) Compare(other ZLabel) int {
return strings.Compare(m.Value, other.Value)
}

// ExtendLabels extend given labels by extend in labels format.
// ExtendSortedLabels extend given labels by extend in labels format.
// The type conversion is done safely, which means we don't modify extend labels underlying array.
//
// In case of existing labels already present in given label set, it will be overwritten by external one.
func ExtendLabels(lset labels.Labels, extend labels.Labels) labels.Labels {
overwritten := map[string]struct{}{}
for i, l := range lset {
if v := extend.Get(l.Name); v != "" {
lset[i].Value = v
overwritten[l.Name] = struct{}{}
// NOTE: Labels and extend has to be sorted.
func ExtendSortedLabels(lset labels.Labels, extend labels.Labels) labels.Labels {
ret := make(labels.Labels, 0, len(lset)+len(extend))

// Inject external labels in place.
for len(lset) > 0 && len(extend) > 0 {
d := strings.Compare(lset[0].Name, extend[0].Name)
if d == 0 {
// Duplicate, prefer external labels.
// NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series?
ret = append(ret, extend[0])
lset, extend = lset[1:], extend[1:]
} else if d < 0 {
ret = append(ret, lset[0])
lset = lset[1:]
} else if d > 0 {
ret = append(ret, extend[0])
extend = extend[1:]
}
}

for _, l := range extend {
if _, ok := overwritten[l.Name]; ok {
continue
}
lset = append(lset, l)
}
sort.Sort(lset)
return lset
// Append all remaining elements.
ret = append(ret, lset...)
ret = append(ret, extend...)
return ret
}

func PromLabelSetsToString(lsets []labels.Labels) string {
Expand Down
73 changes: 70 additions & 3 deletions pkg/store/labelpb/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package labelpb

import (
"fmt"
ioutil "io/ioutil"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -52,13 +53,79 @@ func TestLabelMarshal_Unmarshal(t *testing.T) {

func TestExtendLabels(t *testing.T) {
testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}},
ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01")))

testutil.Equals(t, labels.Labels{{Name: "replica", Value: "01"}},
ExtendLabels(labels.Labels{}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{}, labels.FromStrings("replica", "01")))

testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}},
ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "replica", Value: "NOT01"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "NOT01"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01")))

testInjectExtLabels(testutil.NewTB(t))
}

func BenchmarkExtendLabels(b *testing.B) {
testInjectExtLabels(testutil.NewTB(b))
}

var x labels.Labels

func testInjectExtLabels(tb testutil.TB) {
in := labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Self-Support", // Should be overwritten.
)
extLset := labels.FromStrings(
"support", "Host-Support",
"replica", "1",
"tenant", "2342",
)
tb.ResetTimer()
for i := 0; i < tb.N(); i++ {
x = ExtendSortedLabels(in, extLset)

if !tb.IsBenchmark() {
testutil.Equals(tb, labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"replica", "1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Host-Support",
"tenant", "2342",
), x)
}
}
fmt.Fprint(ioutil.Discard, x)
}

var (
Expand Down
Loading