Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: Add ReplicaLabelRemover as MetaFetcher filter to enable offline vertical compaction/deduplication for replicated data #2250

Merged
Merged
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e
run: make test-e2e-ci
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

### Changed

- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2136](https://github.com/thanos-io/thanos/pull/2136) *breaking* store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. If you want to keep existing behavior, you should add `--delete-delay=0s` as a flag.
- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02
Expand Down
18 changes: 16 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,23 @@ test-local:

.PHONY: test-e2e
test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon.
test-e2e: docker
test-e2e: docker-multi-stage
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -v ./test/e2e/...
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
18 changes: 17 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h"))

dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -157,6 +163,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*dedupReplicaLabels,
selectorRelabelConf,
*waitInterval,
)
Expand All @@ -183,6 +190,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
) error {
Expand Down Expand Up @@ -278,17 +286,25 @@ func runCompact(
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,
block.NewReplicaLabelRemover(logger, dedupReplicaLabels).Modify,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false)
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
109 changes: 77 additions & 32 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,54 +32,67 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
)

type syncMetrics struct {
type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram

synced *extprom.TxGaugeVec
synced *extprom.TxGaugeVec
modified *extprom.TxGaugeVec
}

func (s *fetcherMetrics) submit() {
s.synced.Submit()
s.modified.Submit()
}

func (s *fetcherMetrics) resetTx() {
s.synced.ResetTx()
s.modified.ResetTx()
}

const (
syncMetricSubSys = "blocks_meta"
fetcherSubSys = "blocks_meta"

corruptedMeta = "corrupted-meta-json"
noMeta = "no-meta-json"
loadedMeta = "loaded"
failedMeta = "failed"

// Filter's label values.
// Fetcher's synced label values.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"

// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// Fetcher's synced label values.
replicaRemovedMeta = "replica-label-removed"
)

func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
var m syncMetrics
func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
var m fetcherMetrics

m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "syncs_total",
Help: "Total blocks metadata synchronization attempts",
})
m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "sync_failures_total",
Help: "Total blocks metadata synchronization failures",
})
m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "sync_duration_seconds",
Help: "Duration of the blocks metadata synchronization in seconds",
Buckets: []float64{0.01, 1, 10, 100, 1000},
})
m.synced = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "synced",
Help: "Number of block metadata synced",
},
Expand All @@ -94,18 +107,22 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
)
m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of block metadata that modified",
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
},
[]string{"modified"},
[]string{replicaRemovedMeta},
)
return &m
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
}

type GaugeLabeled interface {
WithLabelValues(lvs ...string) prometheus.Gauge
}

type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) error
type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error

// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Not go-routine safe.
Expand All @@ -116,7 +133,7 @@ type MetaFetcher struct {

// Optional local directory to cache meta.json files.
cacheDir string
metrics *syncMetrics
metrics *fetcherMetrics

filters []MetaFetcherFilter

Expand All @@ -142,7 +159,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
concurrency: concurrency,
bkt: bkt,
cacheDir: cacheDir,
metrics: newSyncMetrics(r),
metrics: newFetcherMetrics(r),
filters: filters,
cached: map[ulid.ULID]*metadata.Meta{},
}, nil
Expand Down Expand Up @@ -254,7 +271,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
metaErrs tsdberrors.MultiError
)

s.metrics.synced.ResetTx()
s.metrics.resetTx()

for i := 0; i < s.concurrency; i++ {
wg.Add(1)
Expand Down Expand Up @@ -351,13 +368,13 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.

for _, f := range s.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if err := f(ctx, metas, s.metrics.synced, incompleteView); err != nil {
if err := f(ctx, metas, s.metrics, incompleteView); err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}

s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
s.metrics.synced.Submit()
s.metrics.submit()

if incompleteView {
return metas, partial, errors.Wrap(metaErrs, "incomplete view")
Expand All @@ -381,12 +398,12 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim
}

// Filter filters out blocks that are outside of specified time range.
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error {
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
for id, m := range metas {
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
}
synced.WithLabelValues(timeExcludedMeta).Inc()
metrics.synced.WithLabelValues(timeExcludedMeta).Inc()
delete(metas, id)
}
return nil
Expand All @@ -409,7 +426,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
const blockIDLabel = "__block_id"

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error {
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
var lbls labels.Labels
for id, m := range metas {
lbls = lbls[:0]
Expand All @@ -419,7 +436,7 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*
}

if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
synced.WithLabelValues(labelExcludedMeta).Inc()
metrics.synced.WithLabelValues(labelExcludedMeta).Inc()
delete(metas, id)
}
}
Expand All @@ -439,7 +456,7 @@ func NewDeduplicateFilter() *DeduplicateFilter {

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error {
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
var wg sync.WaitGroup

metasByResolution := make(map[int64][]*metadata.Meta)
Expand All @@ -456,7 +473,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}), metasByResolution[res], metas, res, metrics.synced)
}(res)
}

Expand All @@ -465,7 +482,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
return nil
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) {
func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced *extprom.TxGaugeVec) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down Expand Up @@ -540,6 +557,34 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
return true
}

// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
type ReplicaLabelRemover struct {
logger log.Logger

replicaLabels []string
}

// NewReplicaLabelRemover creates a ReplicaLabelRemover.
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover {
return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels}
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, view bool) error {
for u, meta := range metas {
labels := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
delete(labels, replicaLabel)
metrics.modified.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
}
return nil
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
// Not go-routine safe.
type ConsistencyDelayMetaFilter struct {
Expand All @@ -566,7 +611,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error {
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
Expand All @@ -577,7 +622,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(tooFreshMeta).Inc()
metrics.synced.WithLabelValues(tooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down Expand Up @@ -612,7 +657,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error {
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, m *fetcherMetrics, _ bool) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
Expand All @@ -629,7 +674,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
m.synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
}
Expand Down
Loading