Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: Add ReplicaLabelRemover as MetaFetcher filter to enable offline vertical compaction/deduplication for replicated data #2250

Merged
Merged
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e
run: make test-e2e-ci
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

### Changed

- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.

- [#2136](https://github.com/thanos-io/thanos/pull/2136) *breaking* store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. If you want to keep existing behavior, you should add `--delete-delay=0s` as a flag.
- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.

## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02
Expand Down
17 changes: 16 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,23 @@ test-local:
.PHONY: test-e2e
test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon.
test-e2e: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -v ./test/e2e/...
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go clean -testcache
@go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/...
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved

.PHONY: install-deps
install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests.
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
issues = append(issues, issueFn)
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down
29 changes: 23 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
"or compactor is ignoring the deletion because it's compacting the block at the same time.").
Default("48h"))

dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
Expand All @@ -157,6 +163,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
*dedupReplicaLabels,
selectorRelabelConf,
*waitInterval,
)
Expand All @@ -183,6 +190,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
waitInterval time.Duration,
) error {
Expand Down Expand Up @@ -278,17 +286,26 @@ func runCompact(
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
block.NewReplicaLabelRemover(logger, dedupReplicaLabels),
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false)
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
Expand Down
14 changes: 7 additions & 7 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,13 @@ func runStore(

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
block.NewDeduplicateFilter().Filter,
)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
})
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
Loading