Skip to content

Commit

Permalink
Improved helper method instead.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Dec 11, 2020
1 parent ace88ff commit 0e372f9
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 155 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -701,6 +702,7 @@ func parseFlagLabels(s []string) (labels.Labels, error) {
}
lset = append(lset, labels.Label{Name: parts[0], Value: val})
}
sort.Sort(lset)
return lset, nil
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func IsWALDirAccessible(dir string) error {
return nil
}

// ExternalLabels returns external labels from /api/v1/status/config Prometheus endpoint.
// ExternalLabels returns sorted external labels from /api/v1/status/config Prometheus endpoint.
// Note that configuration can be hot reloadable on Prometheus, so this config might change in runtime.
func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labels, error) {
u := *base
Expand Down Expand Up @@ -181,7 +181,10 @@ func (c *Client) ExternalLabels(ctx context.Context, base *url.URL) (labels.Labe
if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil {
return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML)
}
return labels.FromMap(cfg.Global.ExternalLabels), nil

lset := labels.FromMap(cfg.Global.ExternalLabels)
sort.Sort(lset)
return lset, nil
}

type Flags struct {
Expand Down
9 changes: 6 additions & 3 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand All @@ -43,6 +44,8 @@ type MultiTSDB struct {
allowOutOfOrderUpload bool
}

// NewMultiTSDB creates new MultiTSDB.
// NOTE: Passed labels has to be sorted by name.
func NewMultiTSDB(
dataDir string,
l log.Logger,
Expand Down Expand Up @@ -262,7 +265,7 @@ func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {

func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant) error {
reg := prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantID}, t.reg)
lbls := append(t.labels, labels.Label{Name: t.tenantLabelName, Value: tenantID})
lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
dataDir := t.defaultTenantDataDir(tenantID)

level.Info(logger).Log("msg", "opening TSDB")
Expand All @@ -286,13 +289,13 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
reg,
dataDir,
t.bucket,
func() labels.Labels { return lbls },
func() labels.Labels { return lset },
metadata.ReceiveSource,
false,
t.allowOutOfOrderUpload,
)
}
tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lbls), s, ship)
tenant.set(store.NewTSDBStore(logger, reg, s, component.Receive, lset), s, ship)
level.Info(logger).Log("msg", "TSDB is now ready")
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/rules/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ func (p *Prometheus) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer)
return nil
}

// extLset has to be sorted.
func enrichRulesWithExtLabels(groups []*rulespb.RuleGroup, extLset labels.Labels) {
for _, g := range groups {
for _, r := range g.Rules {
r.SetLabels(labelpb.ExtendLabels(r.GetLabels(), extLset))
r.SetLabels(labelpb.ExtendSortedLabels(r.GetLabels(), extLset))
}
}
}
43 changes: 9 additions & 34 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

s.lset = injectLabels(lset, extLset)
s.lset = labelpb.ExtendSortedLabels(lset, extLset)
res = append(res, s)
}

Expand Down Expand Up @@ -771,31 +771,6 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func injectLabels(in labels.Labels, extLset labels.Labels) labels.Labels {
out := make(labels.Labels, 0, len(in)+len(extLset))

// Inject external labels in place.
for len(in) > 0 && len(extLset) > 0 {
d := strings.Compare(in[0].Name, extLset[0].Name)
if d == 0 {
// Duplicate, prefer external labels.
// NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series?
out = append(out, extLset[0])
in, extLset = in[1:], extLset[1:]
} else if d < 0 {
out = append(out, in[0])
in = in[1:]
} else if d > 0 {
out = append(out, extLset[0])
extLset = extLset[1:]
}
}
// Append all remaining elements.
out = append(out, in...)
out = append(out, extLset...)
return out
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr) error {
if in.Encoding() == chunkenc.EncXOR {
out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: in.Bytes()}
Expand Down Expand Up @@ -1426,14 +1401,14 @@ func newBucketBlock(
meta: meta,
indexHeaderReader: indexHeadReader,
extLset: labels.FromMap(meta.Thanos.Labels),
}

// Translate the block's labels and inject the block ID as a label
// to allow to match blocks also by ID.
b.relabelLabels = append(labels.FromMap(meta.Thanos.Labels), labels.Label{
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
})
// Translate the block's labels and inject the block ID as a label
// to allow to match blocks also by ID.
relabelLabels: append(labels.FromMap(meta.Thanos.Labels), labels.Label{
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
}),
}
sort.Sort(b.extLset)
sort.Sort(b.relabelLabels)

// Get object handles for all chunk files (segment files) from meta.json, if available.
Expand Down
69 changes: 1 addition & 68 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
extLset: extLset,
}
blocks = append(blocks, b)
}
Expand Down Expand Up @@ -2215,71 +2216,3 @@ func labelNamesFromSeriesSet(series []*storepb.Series) []string {
sort.Strings(labels)
return labels
}

func TestInjectExtLabels(t *testing.T) {
testInjectExtLabels(testutil.NewTB(t))
}

func BenchmarkInjectExtLabels(b *testing.B) {
testInjectExtLabels(testutil.NewTB(b))
}

var x labels.Labels

func testInjectExtLabels(tb testutil.TB) {
in := labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Self-Support", // Should be overwritten.
)
extLset := labels.FromStrings(
"support", "Host-Support",
"replica", "1",
"tenant", "2342",
)
tb.ResetTimer()
for i := 0; i < tb.N(); i++ {
x = injectLabels(in, extLset)

if !tb.IsBenchmark() {
testutil.Equals(tb, labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"replica", "1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Host-Support",
"tenant", "2342",
), x)
}
}
fmt.Fprint(ioutil.Discard, x)
}
38 changes: 23 additions & 15 deletions pkg/store/labelpb/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,35 @@ func (m *ZLabel) Compare(other ZLabel) int {
return strings.Compare(m.Value, other.Value)
}

// ExtendLabels extend given labels by extend in labels format.
// ExtendSortedLabels extend given labels by extend in labels format.
// The type conversion is done safely, which means we don't modify extend labels underlying array.
//
// In case of existing labels already present in given label set, it will be overwritten by external one.
func ExtendLabels(lset labels.Labels, extend labels.Labels) labels.Labels {
overwritten := map[string]struct{}{}
for i, l := range lset {
if v := extend.Get(l.Name); v != "" {
lset[i].Value = v
overwritten[l.Name] = struct{}{}
// NOTE: Labels and extend has to be sorted.
func ExtendSortedLabels(lset labels.Labels, extend labels.Labels) labels.Labels {
ret := make(labels.Labels, 0, len(lset)+len(extend))

// Inject external labels in place.
for len(lset) > 0 && len(extend) > 0 {
d := strings.Compare(lset[0].Name, extend[0].Name)
if d == 0 {
// Duplicate, prefer external labels.
// NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series?
ret = append(ret, extend[0])
lset, extend = lset[1:], extend[1:]
} else if d < 0 {
ret = append(ret, lset[0])
lset = lset[1:]
} else if d > 0 {
ret = append(ret, extend[0])
extend = extend[1:]
}
}

for _, l := range extend {
if _, ok := overwritten[l.Name]; ok {
continue
}
lset = append(lset, l)
}
sort.Sort(lset)
return lset
// Append all remaining elements.
ret = append(ret, lset...)
ret = append(ret, extend...)
return ret
}

func PromLabelSetsToString(lsets []labels.Labels) string {
Expand Down
73 changes: 70 additions & 3 deletions pkg/store/labelpb/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package labelpb

import (
"fmt"
ioutil "io/ioutil"
"reflect"
"sort"
"testing"
Expand Down Expand Up @@ -52,13 +53,79 @@ func TestLabelMarshal_Unmarshal(t *testing.T) {

func TestExtendLabels(t *testing.T) {
testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}},
ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01")))

testutil.Equals(t, labels.Labels{{Name: "replica", Value: "01"}},
ExtendLabels(labels.Labels{}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{}, labels.FromStrings("replica", "01")))

testutil.Equals(t, labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}},
ExtendLabels(labels.Labels{{Name: "xb", Value: "2"}, {Name: "replica", Value: "NOT01"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01")))
ExtendSortedLabels(labels.Labels{{Name: "a", Value: "1"}, {Name: "replica", Value: "NOT01"}, {Name: "xb", Value: "2"}}, labels.FromStrings("replica", "01")))

testInjectExtLabels(testutil.NewTB(t))
}

func BenchmarkExtendLabels(b *testing.B) {
testInjectExtLabels(testutil.NewTB(b))
}

var x labels.Labels

func testInjectExtLabels(tb testutil.TB) {
in := labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Self-Support", // Should be overwritten.
)
extLset := labels.FromStrings(
"support", "Host-Support",
"replica", "1",
"tenant", "2342",
)
tb.ResetTimer()
for i := 0; i < tb.N(); i++ {
x = ExtendSortedLabels(in, extLset)

if !tb.IsBenchmark() {
testutil.Equals(tb, labels.FromStrings(
"__name__", "subscription_labels",
"_id", "0dfsdfsdsfdsffd1e96-4432-9abe-e33436ea969a",
"account", "1afsdfsddsfsdfsdfsdfsdfs",
"ebs_account", "1asdasdad45",
"email_domain", "asdasddgfkw.example.com",
"endpoint", "metrics",
"external_organization", "dfsdfsdf",
"instance", "10.128.4.231:8080",
"job", "sdd-acct-mngr-metrics",
"managed", "false",
"namespace", "production",
"organization", "dasdadasdasasdasaaFGDSG",
"pod", "sdd-acct-mngr-6669c947c8-xjx7f",
"prometheus", "telemeter-production/telemeter",
"prometheus_replica", "prometheus-telemeter-1",
"replica", "1",
"risk", "5",
"service", "sdd-acct-mngr-metrics",
"support", "Host-Support",
"tenant", "2342",
), x)
}
}
fmt.Fprint(ioutil.Discard, x)
}

var (
Expand Down
Loading

0 comments on commit 0e372f9

Please sign in to comment.