Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store,Compactor: Thanos sharding #1583

Merged
merged 13 commits into from
Oct 9, 2019
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1534](https://github.com/thanos-io/thanos/pull/1534) Thanos Query Added `/-/ready` and `/-/healthy` endpoints.
- [#1533](https://github.com/thanos-io/thanos/pull/1533) Thanos inspect now supports the timeout flag.
- [#1362](https://github.com/thanos-io/thanos/pull/1362) Optional `replicaLabels` param for `/query` and `/query_range` querier endpoints. When provided overwrite the `query.replica-label` cli flags.
- [#1583](https://github.com/thanos-io/thanos/pull/1583) Thanos sharding:
- Add relabel config (`--selector.relabel-config-file` and `selector.relabel-config`) into Thanos Store and Compact components.
- For store gateway, advertise labels from "approved" blocks.
- Selecting blocks to serve depends on the result of block labels relabeling.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be part of the last sentence.


### Changed

Expand Down
16 changes: 15 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -131,6 +133,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
selectorRelabelConf,
)
}
}
Expand All @@ -153,6 +156,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
selectorRelabelConf *extflag.PathOrContent,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -185,6 +189,16 @@ func runCompact(
return err
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := parseRelabelConfig(relabelContentYaml)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand All @@ -193,7 +207,7 @@ func runCompact(
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ func regCommonTracingFlags(app *kingpin.Application) *extflag.PathOrContent {
false,
)
}

func regSelectorRelabelFlags(cmd *kingpin.CmdClause) *extflag.PathOrContent {
return extflag.RegisterPathOrContent(
cmd,
"selector.relabel-config",
"YAML file that contains relabeling configuration that allows selecting blocks. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config ",
false,
)
}
26 changes: 26 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
)

// registerStore registers a store command.
Expand Down Expand Up @@ -58,6 +60,8 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -87,6 +91,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
MinTime: *minTime,
MaxTime: *maxTime,
},
selectorRelabelConf,
)
}
}
Expand All @@ -113,6 +118,7 @@ func runStore(
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
) error {
statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

Expand All @@ -126,6 +132,16 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
}

relabelConfig, err := parseRelabelConfig(relabelContentYaml)
if err != nil {
return err
}

// Ensure we close up everything properly.
defer func() {
if err != nil {
Expand Down Expand Up @@ -156,6 +172,7 @@ func runStore(
verbose,
blockSyncConcurrency,
filterConf,
relabelConfig,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down Expand Up @@ -211,3 +228,12 @@ func runStore(
level.Info(logger).Log("msg", "starting store node")
return nil
}

func parseRelabelConfig(contentYaml []byte) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

return relabelConfig, nil
}
13 changes: 13 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,18 @@ Flags:
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
follows native Prometheus relabel-config syntax.
See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (lower priority). Content of YAML file that
contains relabeling configuration that allows
selecting blocks. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config

```
14 changes: 14 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ Flags:
RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
--selector.relabel-config-file=<file-path>
Path to YAML file that contains relabeling
configuration that allows selecting blocks. It
follows native Prometheus relabel-config
syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (lower priority). Content of YAML file
that contains relabeling configuration that
allows selecting blocks. It follows native
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config

```

Expand Down
16 changes: 15 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
promlables "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/labels"
Expand Down Expand Up @@ -50,6 +52,7 @@ type Syncer struct {
blockSyncConcurrency int
metrics *syncerMetrics
acceptMalformedIndex bool
relabelConfig []*relabel.Config
}

type syncerMetrics struct {
Expand Down Expand Up @@ -131,7 +134,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -144,6 +147,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
relabelConfig: relabelConfig,
}, nil
}

Expand Down Expand Up @@ -208,6 +212,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
if err == blockTooFreshSentinelError {
continue
}

if err != nil {
if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
continue
Expand All @@ -216,6 +221,15 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return
}

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
lset := promlables.FromMap(meta.Thanos.Labels)
processedLabels := relabel.Process(lset, c.relabelConfig...)
if processedLabels == nil {
level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
continue
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
Expand Down
93 changes: 90 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/labels"
Expand All @@ -25,14 +26,16 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/yaml.v2"
)

func TestSyncer_SyncMetas_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -72,7 +75,6 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, ids[5:], groups[0].IDs())
})

}

func TestSyncer_GarbageCollect_e2e(t *testing.T) {
Expand All @@ -85,6 +87,8 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
var metas []*metadata.Meta
var ids []ulid.ULID

relabelConfig := make([]*relabel.Config, 0)

for i := 0; i < 10; i++ {
var m metadata.Meta

Expand Down Expand Up @@ -134,7 +138,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -354,3 +358,86 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels,

return uid, nil
}

func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
var err error

relabelContentYaml := `
- action: drop
regex: "A"
source_labels:
- cluster
`
var relabelConfig []*relabel.Config
err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig)
testutil.Ok(t, err)

extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}}

objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
testutil.Ok(t, err)

var ids []ulid.ULID
var metas []*metadata.Meta

for i := 0; i < 16; i++ {
id, err := ulid.New(uint64(i), nil)
testutil.Ok(t, err)

var meta metadata.Meta
meta.Version = 1
meta.ULID = id
meta.Thanos = metadata.Thanos{
Labels: extLsets[i%2].Map(),
}

ids = append(ids, id)
metas = append(metas, &meta)
}
for _, m := range metas[:10] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}

testutil.Ok(t, sy.SyncMetas(ctx))

groups, err := sy.Groups()
testutil.Ok(t, err)
var evenIds []ulid.ULID
for i := 0; i < 10; i++ {
if i%2 != 0 {
evenIds = append(evenIds, ids[i])
}
}
testutil.Equals(t, evenIds, groups[0].IDs())

// Upload last 6 blocks.
for _, m := range metas[10:] {
var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&m))
testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf))
}

// Delete first 4 blocks.
for _, m := range metas[:4] {
testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, m.ULID))
}

testutil.Ok(t, sy.SyncMetas(ctx))

groups, err = sy.Groups()
testutil.Ok(t, err)
evenIds = make([]ulid.ULID, 0)
for i := 4; i < 16; i++ {
if i%2 != 0 {
evenIds = append(evenIds, ids[i])
}
}
testutil.Equals(t, evenIds, groups[0].IDs())
})
}
Loading