diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 1cc20d0dc2..d3af411328 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -25,4 +25,4 @@ jobs: key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - name: Run e2e docker-based tests. - run: make test-e2e + run: make test-e2e-ci diff --git a/CHANGELOG.md b/CHANGELOG.md index c62c7f4a44..1185e4f3d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,12 +21,12 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled. +- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks. ### Changed -- [#2136](https://github.com/thanos-io/thanos/pull/2136) store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage. -Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. - +- [#2136](https://github.com/thanos-io/thanos/pull/2136) *breaking* store, compact, bucket: schedule block deletion by adding deletion-mark.json. This adds a consistent way for multiple readers and writers to access object storage. +Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion. If you want to keep existing behavior, you should add `--delete-delay=0s` as a flag. - [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more. - [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most. diff --git a/Makefile b/Makefile index ca5193f454..47c81e6335 100644 --- a/Makefile +++ b/Makefile @@ -245,8 +245,23 @@ test-local: .PHONY: test-e2e test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon. test-e2e: docker + @echo ">> cleaning docker environment." + @docker system prune -f --volumes + @echo ">> cleaning e2e test garbage." + @rm -rf ./test/e2e/e2e_integration_test* @echo ">> running /test/e2e tests." - @go test -v ./test/e2e/... + @go test -failfast -timeout 5m -v ./test/e2e/... + +.PHONY: test-e2e-ci +test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon. +test-e2e-ci: docker + @echo ">> cleaning docker environment." + @docker system prune -f --volumes + @echo ">> cleaning e2e test garbage." + @rm -rf ./test/e2e/e2e_integration_test* + @echo ">> running /test/e2e tests." + @go clean -testcache + @go test -failfast -parallel 1 -timeout 5m -v ./test/e2e/... .PHONY: install-deps install-deps: ## Installs dependencies for integration tests. It installs supported versions of Prometheus and alertmanager to test against in integration tests. diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 9eaf8d2ce5..2f321ef9ac 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name issues = append(issues, issueFn) } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name return err } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } @@ -458,7 +458,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati return errors.Wrap(err, "bucket client") } - fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil) if err != nil { return err } diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index e926b6fa4e..6fe4ddbd18 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -133,6 +133,12 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { "or compactor is ignoring the deletion because it's compacting the block at the same time."). Default("48h")) + dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+ + "Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."+ + "Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+ + "This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."). + Hidden().Strings() + selectorRelabelConf := regSelectorRelabelFlags(cmd) m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -157,6 +163,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { *maxCompactionLevel, *blockSyncConcurrency, *compactionConcurrency, + *dedupReplicaLabels, selectorRelabelConf, *waitInterval, ) @@ -183,6 +190,7 @@ func runCompact( maxCompactionLevel int, blockSyncConcurrency int, concurrency int, + dedupReplicaLabels []string, selectorRelabelConf *extflag.PathOrContent, waitInterval time.Duration, ) error { @@ -278,17 +286,26 @@ func runCompact( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second) duplicateBlocksFilter := block.NewDeduplicateFilter() prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, - ignoreDeletionMarkFilter.Filter, - duplicateBlocksFilter.Filter, + + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{ + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), + ignoreDeletionMarkFilter, + duplicateBlocksFilter, + }, + block.NewReplicaLabelRemover(logger, dedupReplicaLabels), ) if err != nil { return errors.Wrap(err, "create meta fetcher") } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, false) + enableVerticalCompaction := false + if len(dedupReplicaLabels) > 0 { + enableVerticalCompaction = true + level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ",")) + } + + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction) if err != nil { return errors.Wrap(err, "create syncer") } diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 293ef59adf..beb45b95a9 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -72,7 +72,7 @@ func RunDownsample( return err } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil) if err != nil { return errors.Wrap(err, "create meta fetcher") } diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d44844d768..58e340db3d 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) { Name: metricIndexGenerateName, Help: metricIndexGenerateHelp, }) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir)) @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir)) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 7a4f88a038..8031cd84e8 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -236,13 +236,13 @@ func runStore( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay) prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, - ignoreDeletionMarkFilter.Filter, - block.NewDeduplicateFilter().Filter, - ) + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer), + ignoreDeletionMarkFilter, + block.NewDeduplicateFilter(), + }) if err != nil { return errors.Wrap(err, "meta fetcher") } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 2ab188bd0f..a2a5be2723 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -32,54 +32,67 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" ) -type syncMetrics struct { +type fetcherMetrics struct { syncs prometheus.Counter syncFailures prometheus.Counter syncDuration prometheus.Histogram - synced *extprom.TxGaugeVec + synced *extprom.TxGaugeVec + modified *extprom.TxGaugeVec +} + +func (s *fetcherMetrics) submit() { + s.synced.Submit() + s.modified.Submit() +} + +func (s *fetcherMetrics) resetTx() { + s.synced.ResetTx() + s.modified.ResetTx() } const ( - syncMetricSubSys = "blocks_meta" + fetcherSubSys = "blocks_meta" corruptedMeta = "corrupted-meta-json" noMeta = "no-meta-json" loadedMeta = "loaded" failedMeta = "failed" - // Filter's label values. + // Synced label values. labelExcludedMeta = "label-excluded" timeExcludedMeta = "time-excluded" tooFreshMeta = "too-fresh" duplicateMeta = "duplicate" - // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, // but don't have a replacement block yet. markedForDeletionMeta = "marked-for-deletion" + + // Modified label values. + replicaRemovedMeta = "replica-label-removed" ) -func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { - var m syncMetrics +func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { + var m fetcherMetrics m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: syncMetricSubSys, + Subsystem: fetcherSubSys, Name: "syncs_total", Help: "Total blocks metadata synchronization attempts", }) m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Subsystem: syncMetricSubSys, + Subsystem: fetcherSubSys, Name: "sync_failures_total", Help: "Total blocks metadata synchronization failures", }) m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Subsystem: syncMetricSubSys, + Subsystem: fetcherSubSys, Name: "sync_duration_seconds", Help: "Duration of the blocks metadata synchronization in seconds", Buckets: []float64{0.01, 1, 10, 100, 1000}, }) m.synced = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ - Subsystem: syncMetricSubSys, + Subsystem: fetcherSubSys, Name: "synced", Help: "Number of block metadata synced", }, @@ -94,6 +107,14 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics { []string{duplicateMeta}, []string{markedForDeletionMeta}, ) + m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{ + Subsystem: fetcherSubSys, + Name: "modified", + Help: "Number of blocks that their metadata modified", + }, + []string{"modified"}, + []string{replicaRemovedMeta}, + ) return &m } @@ -101,11 +122,13 @@ type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) } -type GaugeLabeled interface { - WithLabelValues(lvs ...string) prometheus.Gauge +type MetadataFilter interface { + Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error } -type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) error +type MetadataModifier interface { + Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, incompleteView bool) error +} // MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Not go-routine safe. @@ -116,15 +139,16 @@ type MetaFetcher struct { // Optional local directory to cache meta.json files. cacheDir string - metrics *syncMetrics + metrics *fetcherMetrics - filters []MetaFetcherFilter + filters []MetadataFilter + modifiers []MetadataModifier cached map[ulid.ULID]*metadata.Meta } // NewMetaFetcher constructs MetaFetcher. -func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters ...MetaFetcherFilter) (*MetaFetcher, error) { +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters []MetadataFilter, modifiers ...MetadataModifier) (*MetaFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -142,8 +166,9 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade concurrency: concurrency, bkt: bkt, cacheDir: cacheDir, - metrics: newSyncMetrics(r), + metrics: newFetcherMetrics(r), filters: filters, + modifiers: modifiers, cached: map[ulid.ULID]*metadata.Meta{}, }, nil } @@ -254,7 +279,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. metaErrs tsdberrors.MultiError ) - s.metrics.synced.ResetTx() + s.metrics.resetTx() for i := 0; i < s.concurrency; i++ { wg.Add(1) @@ -351,13 +376,20 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. for _, f := range s.filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := f(ctx, metas, s.metrics.synced, incompleteView); err != nil { + if err := f.Filter(ctx, metas, s.metrics.synced, incompleteView); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } + for _, m := range s.modifiers { + // NOTE: modifier can update modified metric accordingly to the reason of the modification. + if err := m.Modify(ctx, metas, s.metrics.modified, incompleteView); err != nil { + return nil, nil, errors.Wrap(err, "modify metas") + } + } + s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) - s.metrics.synced.Submit() + s.metrics.submit() if incompleteView { return metas, partial, errors.Wrap(metaErrs, "incomplete view") @@ -367,7 +399,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata. return metas, partial, nil } -var _ MetaFetcherFilter = (&TimePartitionMetaFilter{}).Filter +var _ MetadataFilter = &TimePartitionMetaFilter{} // TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. // Not go-routine safe. @@ -381,7 +413,7 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim } // Filter filters out blocks that are outside of specified time range. -func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { +func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { for id, m := range metas { if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { continue @@ -392,7 +424,7 @@ func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID] return nil } -var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter +var _ MetadataFilter = &LabelShardedMetaFilter{} // LabelShardedMetaFilter represents struct that allows sharding. // Not go-routine safe. @@ -409,7 +441,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet const blockIDLabel = "__block_id" // Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels. -func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { +func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { var lbls labels.Labels for id, m := range metas { lbls = lbls[:0] @@ -439,7 +471,7 @@ func NewDeduplicateFilter() *DeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { +func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { var wg sync.WaitGroup metasByResolution := make(map[int64][]*metadata.Meta) @@ -465,7 +497,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad return nil } -func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) { +func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced *extprom.TxGaugeVec) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -540,6 +572,34 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool { return true } +// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. +type ReplicaLabelRemover struct { + logger log.Logger + + replicaLabels []string +} + +// NewReplicaLabelRemover creates a ReplicaLabelRemover. +func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover { + return &ReplicaLabelRemover{logger: logger, replicaLabels: replicaLabels} +} + +// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it. +func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, view bool) error { + for u, meta := range metas { + labels := meta.Thanos.Labels + for _, replicaLabel := range r.replicaLabels { + if _, exists := labels[replicaLabel]; exists { + level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel) + delete(labels, replicaLabel) + modified.WithLabelValues(replicaRemovedMeta).Inc() + } + } + metas[u].Thanos.Labels = labels + } + return nil +} + // ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. // Not go-routine safe. type ConsistencyDelayMetaFilter struct { @@ -566,7 +626,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura } // Filter filters out blocks that filters blocks that have are created before a specified consistency delay. -func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { +func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { for id, meta := range metas { // TODO(khyatisoneji): Remove the checks about Thanos Source // by implementing delete delay to fetch metas. @@ -612,7 +672,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata. // Filter filters out blocks that are marked for deletion after a given delay. // It also returns the blocks that can be deleted since they were uploaded delay duration before current time. -func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { +func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error { f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) for id := range metas { diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 3c181886dd..99920b8e5f 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -22,7 +22,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" @@ -35,6 +34,25 @@ import ( "gopkg.in/yaml.v2" ) +func newTestFetcherMetrics() *fetcherMetrics { + return &fetcherMetrics{ + synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}), + modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}), + } +} + +type ulidFilter struct { + ulidToDelete *ulid.ULID +} + +func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error { + if _, ok := metas[*f.ulidToDelete]; ok { + synced.WithLabelValues("filtered").Inc() + delete(metas, *f.ulidToDelete) + } + return nil +} + func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) } func ULIDs(is ...int) []ulid.ULID { @@ -57,12 +75,8 @@ func TestMetaFetcher_Fetch(t *testing.T) { var ulidToDelete ulid.ULID r := prometheus.NewRegistry() - f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) error { - if _, ok := metas[ulidToDelete]; ok { - synced.WithLabelValues("filtered").Inc() - delete(metas, ulidToDelete) - } - return nil + f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, []MetadataFilter{ + &ulidFilter{ulidToDelete: &ulidToDelete}, }) testutil.Ok(t, err) @@ -337,9 +351,10 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { ULID(6): input[ULID(6)], } - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) - testutil.Equals(t, 3.0, promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + + testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) } @@ -434,10 +449,11 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { } deleted := len(input) - len(expected) - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Equals(t, expected, input) - testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) }) @@ -497,9 +513,10 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { ULID(4): input[ULID(4)], } - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) - testutil.Equals(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(timeExcludedMeta))) + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + + testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) } @@ -830,7 +847,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) { } { f := NewDeduplicateFilter() if ok := t.Run(tcase.name, func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestFetcherMetrics() metas := make(map[ulid.ULID]*metadata.Meta) inputLen := len(tcase.input) for id, metaInfo := range tcase.input { @@ -848,15 +865,65 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - testutil.Ok(t, f.Filter(ctx, metas, synced, false)) + testutil.Ok(t, f.Filter(ctx, metas, m.synced, false)) compareSliceWithMapKeys(t, metas, tcase.expected) - testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(synced.WithLabelValues(duplicateMeta))) + testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta))) }); !ok { return } } } +func TestReplicaLabelRemover_Modify(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + rm := NewReplicaLabelRemover(log.NewNopLogger(), []string{"replica", "rule_replica"}) + + for _, tcase := range []struct { + name string + input map[ulid.ULID]*metadata.Meta + expected map[ulid.ULID]*metadata.Meta + modified float64 + }{ + { + name: "without replica labels", + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something1"}}}, + }, + modified: 0, + }, + { + name: "with replica labels", + input: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"replica": "cluster1", "message": "something"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"replica": "cluster1", "rule_replica": "rule1", "message": "something"}}}, + ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{"replica": "cluster1", "rule_replica": "rule1"}}}, + }, + expected: map[ulid.ULID]*metadata.Meta{ + ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(2): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(3): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}}, + ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{}}}, + }, + modified: 5.0, + }, + } { + m := newTestFetcherMetrics() + testutil.Ok(t, rm.Modify(ctx, tcase.input, m.modified, false)) + + testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta))) + testutil.Equals(t, tcase.expected, tcase.input) + } +} + func compareSliceWithMapKeys(tb testing.TB, m map[ulid.ULID]*metadata.Meta, s []ulid.ULID) { _, file, line, _ := runtime.Caller(1) matching := true @@ -945,7 +1012,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { } t.Run("consistency 0 (turned off)", func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestFetcherMetrics() expected := map[ulid.ULID]*metadata.Meta{} // Copy all. for _, id := range u.created { @@ -956,13 +1023,13 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) - testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) t.Run("consistency 30m.", func(t *testing.T) { - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + m := newTestFetcherMetrics() expected := map[ulid.ULID]*metadata.Meta{} // Only certain sources and those with 30m or more age go through. for i, id := range u.created { @@ -981,8 +1048,8 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) - testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta))) + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) } @@ -1033,9 +1100,9 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { ULID(4): {}, } - synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) - testutil.Ok(t, f.Filter(ctx, input, synced, false)) - testutil.Equals(t, 1.0, promtest.ToFloat64(synced.WithLabelValues(markedForDeletionMeta))) + m := newTestFetcherMetrics() + testutil.Ok(t, f.Filter(ctx, input, m.synced, false)) + testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta))) testutil.Equals(t, expected, input) }) } diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index 74cfbaaf5a..4507b3f5a5 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -29,7 +29,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { bkt := inmem.NewBucket() logger := log.NewNopLogger() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil) testutil.Ok(t, err) // 1. No meta, old block, should be removed. diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 5520a9a848..52e4147623 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -90,9 +90,9 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { } duplicateBlocksFilter := block.NewDeduplicateFilter() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, - duplicateBlocksFilter.Filter, - ) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ + duplicateBlocksFilter, + }) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) @@ -178,10 +178,10 @@ func TestGroup_Compact_e2e(t *testing.T) { ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour) duplicateBlocksFilter := block.NewDeduplicateFilter() - metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, - ignoreDeletionMarkFilter.Filter, - duplicateBlocksFilter.Filter, - ) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{ + ignoreDeletionMarkFilter, + duplicateBlocksFilter, + }) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 5c9813c117..980b883839 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -245,7 +245,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 1e674fa6a4..d419681f85 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -150,7 +150,7 @@ func RunReplicate( Help: "The Duration of replication runs split by success and error.", }, []string{"result"}) - fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg) + fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 9aaf9cff05..6b441a572b 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -313,7 +313,7 @@ func TestReplicationSchemeAll(t *testing.T) { } filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter - fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil) + fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil, nil) testutil.Ok(t, err) r := newReplicationScheme( diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6bdc6a5950..821cd70d27 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -147,10 +147,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m maxTime: maxTime, } - metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, - block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConfig).Filter, - ) + metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConfig), + }) testutil.Ok(t, err) store, err := NewBucketStore( diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index a95372ec8f..75cf446643 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -710,10 +710,10 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf)) rec := &recorder{Bucket: bkt} - metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, - block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime).Filter, - block.NewLabelShardedMetaFilter(relabelConf).Filter, - ) + metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, []block.MetadataFilter{ + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime), + block.NewLabelShardedMetaFilter(relabelConf), + }) testutil.Ok(t, err) bucketStore, err := NewBucketStore( diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go new file mode 100644 index 0000000000..a906117905 --- /dev/null +++ b/test/e2e/compact_test.go @@ -0,0 +1,559 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2e_test + +import ( + "context" + "os" + "path" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/client" + "github.com/thanos-io/thanos/pkg/objstore/s3" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" +) + +func TestCompact(t *testing.T) { + t.Parallel() + + l := log.NewLogfmtLogger(os.Stdout) + + // blockDesc describes a recipe to generate blocks from the given series and external labels. + type blockDesc struct { + series []labels.Labels + extLset labels.Labels + mint int64 + maxt int64 + samplesPerSeries int + } + + type retention struct { + resRaw string + res5m string + res1h string + } + + delay := 30 * time.Minute + now := time.Now() + + for i, tcase := range []struct { + name string + blocks []blockDesc + replicaLabels []string + downsamplingEnabled bool + retention *retention + query string + + expected []model.Metric + expectNumModBlocks float64 + expectNumBlocks uint64 + expectedStats tsdb.BlockStats + }{ + { + name: "(full) vertically overlapping blocks with replica labels", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + }, + replicaLabels: []string{"replica", "rule_replica"}, + downsamplingEnabled: true, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + }, + }, + expectNumModBlocks: 3, + expectNumBlocks: 1, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 1, + NumSamples: 120, + }, + }, + { + name: "(full) vertically overlapping blocks without replica labels", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 12, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 12, + }, + }, + downsamplingEnabled: true, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "2", + }, + }, + expectNumModBlocks: 0, + expectNumBlocks: 2, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 2, + NumSamples: 24, + }, + }, + { + name: "(full) vertically overlapping blocks with replica labels downsampling disabled", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1", "ext2", "value2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + }, + replicaLabels: []string{"replica", "rule_replica"}, + downsamplingEnabled: false, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "ext2": "value2", + }, + }, + expectNumModBlocks: 3, + expectNumBlocks: 1, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 1, + NumSamples: 120, + }, + }, + { + name: "(full) vertically overlapping blocks with replica labels, downsampling disabled and extra blocks", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1", "ext2", "value2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("c", "1", "d", "2")}, + extLset: labels.FromStrings("ext1", "value1", "ext2", "value2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("c", "1", "d", "2")}, + extLset: labels.FromStrings("ext3", "value3"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + }, + replicaLabels: []string{"replica", "rule_replica"}, + downsamplingEnabled: false, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "ext2": "value2", + }, + }, + expectNumModBlocks: 3, + expectNumBlocks: 2, + expectedStats: tsdb.BlockStats{ + NumChunks: 6, + NumSeries: 3, + NumSamples: 360, + }, + }, + { + name: "(partial) vertically overlapping blocks with replica labels", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "ext2", "value2", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 119, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext2", "value2", "ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(1 * time.Hour)), + samplesPerSeries: 59, + }, + }, + replicaLabels: []string{"replica"}, + downsamplingEnabled: true, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "ext2": "value2", + }, + }, + expectNumModBlocks: 2, + expectNumBlocks: 1, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 1, + NumSamples: 119, + }, + }, + { + name: "(shifted) vertically overlapping blocks with replica labels", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "1"), + mint: timestamp.FromTime(now.Add(30 * time.Minute)), + maxt: timestamp.FromTime(now.Add(150 * time.Minute)), + samplesPerSeries: 119, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(120 * time.Minute)), + samplesPerSeries: 119, + }, + }, + replicaLabels: []string{"replica"}, + downsamplingEnabled: true, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + }, + }, + expectNumModBlocks: 2, + expectNumBlocks: 1, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 1, + NumSamples: 149, + }, + }, + { + name: "(full) vertically overlapping blocks with replica labels retention specified", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "replica", "2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1", "rule_replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 120, + }, + }, + replicaLabels: []string{"replica", "rule_replica"}, + downsamplingEnabled: true, + retention: &retention{ + resRaw: "0d", + res5m: "5m", + res1h: "5m", + }, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + }, + }, + expectNumModBlocks: 3, + expectNumBlocks: 1, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 1, + NumSamples: 120, + }, + }, + { + name: "(full) vertically overlapping blocks without replica labels", + blocks: []blockDesc{ + { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 2, + }, { + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("ext1", "value2"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + samplesPerSeries: 2, + }, + }, + replicaLabels: []string{"replica"}, + downsamplingEnabled: true, + query: "{a=\"1\"}", + + expected: []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + }, + { + "a": "1", + "b": "2", + "ext1": "value2", + }, + }, + expectNumModBlocks: 0, + expectNumBlocks: 2, + expectedStats: tsdb.BlockStats{ + NumChunks: 2, + NumSeries: 2, + NumSamples: 4, + }, + }, + } { + i := i + tcase := tcase + t.Run(tcase.name, func(t *testing.T) { + s, err := e2e.NewScenario("e2e_test_compact_" + strconv.Itoa(i)) + testutil.Ok(t, err) + defer s.Close() // TODO(kakkoyun): Change with t.CleanUp after go 1.14 update. + + dir := filepath.Join(s.SharedDir(), "tmp_"+strconv.Itoa(i)) + testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), dir), os.ModePerm)) + + bucket := "thanos_" + strconv.Itoa(i) + + // TODO(kakkoyun): Move to shared minio to improve test speed. + m := e2edb.NewMinio(8080+i, bucket) + testutil.Ok(t, s.StartAndWaitReady(m)) + + bkt, err := s3.NewBucketWithConfig(l, s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.HTTPEndpoint(), // We need separate client config, when connecting to minio from outside. + Insecure: true, + }, "test-feed") + testutil.Ok(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() // TODO(kakkoyun): Change with t.CleanUp after go 1.14 update. + + var rawBlockIds []ulid.ULID + for _, b := range tcase.blocks { + id, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, b.series, b.samplesPerSeries, b.mint, b.maxt, delay, b.extLset, 0) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id.String()), id.String())) + rawBlockIds = append(rawBlockIds, id) + } + + dedupFlags := make([]string, 0, len(tcase.replicaLabels)) + for _, l := range tcase.replicaLabels { + dedupFlags = append(dedupFlags, "--deduplication.replica-label="+l) + } + + retenFlags := make([]string, 0, 3) + if tcase.retention != nil { + retenFlags = append(retenFlags, "--retention.resolution-raw="+tcase.retention.resRaw) + retenFlags = append(retenFlags, "--retention.resolution-5m="+tcase.retention.res5m) + retenFlags = append(retenFlags, "--retention.resolution-1h="+tcase.retention.res1h) + } + + cmpt, err := e2ethanos.NewCompactor(s.SharedDir(), strconv.Itoa(i), client.BucketConfig{ + Type: client.S3, + Config: s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.NetworkHTTPEndpoint(), + Insecure: true, + }, + }, + nil, // relabel configs. + tcase.downsamplingEnabled, + append(dedupFlags, retenFlags...)..., + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(cmpt)) + testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIds))), "thanos_blocks_meta_synced")) + testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) + testutil.Ok(t, cmpt.WaitSumMetrics(e2e.Equals(tcase.expectNumModBlocks), "thanos_blocks_meta_modified")) + + str, err := e2ethanos.NewStoreGW(s.SharedDir(), "compact_"+strconv.Itoa(i), client.BucketConfig{ + Type: client.S3, + Config: s3.Config{ + Bucket: bucket, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + Endpoint: m.NetworkHTTPEndpoint(), + Insecure: true, + }, + }) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(str)) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(tcase.expectNumBlocks)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + q, err := e2ethanos.NewQuerier(s.SharedDir(), "compact_"+strconv.Itoa(i), []string{str.GRPCNetworkEndpoint()}, nil) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel = context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + queryAndAssert(t, ctx, q.HTTPEndpoint(), + tcase.query, + promclient.QueryOptions{ + Deduplicate: false, // This should be false, so that we can be sure deduplication was offline. + }, + tcase.expected, + ) + + var ( + actualNumBlocks uint64 + actual tsdb.BlockStats + sources []ulid.ULID + ) + testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error { + id, ok := block.IsBlockDir(n) + if !ok { + return nil + } + + actualNumBlocks += 1 + + meta, err := block.DownloadMeta(ctx, l, bkt, id) + if err != nil { + return err + } + + actual.NumChunks += meta.Stats.NumChunks + actual.NumSeries += meta.Stats.NumSeries + actual.NumSamples += meta.Stats.NumSamples + sources = append(sources, meta.Compaction.Sources...) + return nil + })) + + // Make sure only necessary amount of blocks fetched from store, to observe affects of offline deduplication. + testutil.Equals(t, tcase.expectNumBlocks, actualNumBlocks) + if len(rawBlockIds) < int(tcase.expectNumBlocks) { // check sources only if compacted. + testutil.Equals(t, rawBlockIds, sources) + } + testutil.Equals(t, tcase.expectedStats.NumChunks, actual.NumChunks) + testutil.Equals(t, tcase.expectedStats.NumSeries, actual.NumSeries) + testutil.Equals(t, tcase.expectedStats.NumSamples, actual.NumSamples) + }) + } +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index afd20b6edc..49887505ba 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -10,8 +10,10 @@ import ( "os" "path/filepath" "strconv" + "time" "github.com/cortexproject/cortex/integration/e2e" + "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" @@ -25,6 +27,11 @@ import ( const logLevel = "info" +var defaultBackoffConfig = util.BackoffConfig{ + MinBackoff: 300 * time.Millisecond, + MaxBackoff: 15 * time.Second, +} + // TODO(bwplotka): Run against multiple? func DefaultPrometheusImage() string { return "quay.io/prometheus/prometheus:v2.16.0" @@ -69,6 +76,7 @@ func NewPrometheus(sharedDir string, name string, config, promImage string) (*e2 9090, ) prom.SetUser("root") + prom.SetBackoff(defaultBackoffConfig) return prom, container, nil } @@ -78,6 +86,7 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con if err != nil { return nil, nil, err } + prom.SetBackoff(defaultBackoffConfig) sidecar := NewService( fmt.Sprintf("sidecar-%s", name), @@ -95,6 +104,8 @@ func NewPrometheusWithSidecar(sharedDir string, netName string, name string, con 80, 9091, ) + sidecar.SetBackoff(defaultBackoffConfig) + return prom, sidecar, nil } @@ -109,6 +120,7 @@ func NewQuerier(sharedDir string, name string, storeAddresses []string, fileSDSt "--query.replica-label": replicaLabel, "--store.sd-dns-interval": "5s", "--log.level": logLevel, + "--query.max-concurrent": "1", "--store.sd-interval": "5s", }) for _, addr := range storeAddresses { @@ -139,14 +151,17 @@ func NewQuerier(sharedDir string, name string, storeAddresses []string, fileSDSt args = append(args, "--store.sd-files="+filepath.Join(container, "filesd.yaml")) } - return NewService( + querier := NewService( fmt.Sprintf("querier-%v", name), DefaultImage(), e2e.NewCommand("query", args...), e2e.NewReadinessProbe(80, "/-/ready", 200), 80, 9091, - ), nil + ) + querier.SetBackoff(defaultBackoffConfig) + + return querier, nil } func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) } @@ -171,7 +186,7 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF return nil, errors.Wrap(err, "creating receive config") } - return NewService( + receiver := NewService( fmt.Sprintf("receive-%v", name), DefaultImage(), // TODO(bwplotka): BuildArgs should be interface. @@ -193,7 +208,10 @@ func NewReceiver(sharedDir string, networkName string, name string, replicationF 80, 9091, 81, - ), nil + ) + receiver.SetBackoff(defaultBackoffConfig) + + return receiver, nil } func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { @@ -215,7 +233,7 @@ func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.Al return nil, errors.Wrapf(err, "generate query file: %v", queryCfg) } - return NewService( + ruler := NewService( fmt.Sprintf("rule-%v", name), DefaultImage(), e2e.NewCommand("rule", e2e.BuildArgs(map[string]string{ @@ -226,7 +244,7 @@ func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.Al "--label": fmt.Sprintf(`replica="%s"`, name), "--data-dir": container, "--rule-file": filepath.Join(e2e.ContainerSharedDir, ruleSubDir, "*.yaml"), - "--eval-interval": "1s", + "--eval-interval": "3s", "--alertmanagers.config": string(amCfgBytes), "--alertmanagers.sd-dns-interval": "1s", "--log.level": logLevel, @@ -237,7 +255,10 @@ func NewRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.Al e2e.NewReadinessProbe(80, "/-/ready", 200), 80, 9091, - ), nil + ) + ruler.SetBackoff(defaultBackoffConfig) + + return ruler, nil } func NewAlertmanager(sharedDir string, name string) (*e2e.HTTPService, error) { @@ -263,15 +284,19 @@ receivers: fmt.Sprintf("alertmanager-%v", name), DefaultAlertmanagerImage(), e2e.NewCommandWithoutEntrypoint("/bin/alertmanager", e2e.BuildArgs(map[string]string{ - "--config.file": filepath.Join(container, "config.yaml"), - "--web.listen-address": "0.0.0.0:80", - "--log.level": logLevel, - "--storage.path": container, + "--config.file": filepath.Join(container, "config.yaml"), + "--web.listen-address": "0.0.0.0:80", + "--log.level": logLevel, + "--storage.path": container, + "--web.get-concurrency": "1", + "--web.timeout": "2m", })...), e2e.NewReadinessProbe(80, "/-/ready", 200), 80, ) s.SetUser("root") + s.SetBackoff(defaultBackoffConfig) + return s, nil } @@ -292,7 +317,7 @@ func NewStoreGW(sharedDir string, name string, bucketConfig client.BucketConfig, return nil, errors.Wrapf(err, "generate store relabel file: %v", relabelConfig) } - return NewService( + store := NewService( fmt.Sprintf("store-gw-%v", name), DefaultImage(), e2e.NewCommand("store", append(e2e.BuildArgs(map[string]string{ @@ -304,12 +329,61 @@ func NewStoreGW(sharedDir string, name string, bucketConfig client.BucketConfig, "--data-dir": container, "--objstore.config": string(bktConfigBytes), // Accelerated sync time for quicker test (3m by default). - "--sync-block-duration": "1s", - "--selector.relabel-config": string(relabelConfigBytes), - "--consistency-delay": "30m", + "--sync-block-duration": "3s", + "--block-sync-concurrency": "1", + "--store.grpc.series-max-concurrency": "1", + "--selector.relabel-config": string(relabelConfigBytes), + "--consistency-delay": "30m", }), "--experimental.enable-index-header")...), e2e.NewReadinessProbe(80, "/-/ready", 200), 80, 9091, - ), nil + ) + store.SetBackoff(defaultBackoffConfig) + + return store, nil +} + +func NewCompactor(sharedDir string, name string, bucketConfig client.BucketConfig, relabelConfig []relabel.Config, downsamplingEnabled bool, extArgs ...string) (*e2e.HTTPService, error) { + dir := filepath.Join(sharedDir, "data", "compact", name) + container := filepath.Join(e2e.ContainerSharedDir, "data", "compact", name) + + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, errors.Wrap(err, "create compact dir") + } + + bktConfigBytes, err := yaml.Marshal(bucketConfig) + if err != nil { + return nil, errors.Wrapf(err, "generate compact config file: %v", bucketConfig) + } + + relabelConfigBytes, err := yaml.Marshal(relabelConfig) + if err != nil { + return nil, errors.Wrapf(err, "generate compact relabel file: %v", relabelConfig) + } + + if !downsamplingEnabled { + extArgs = append(extArgs, "--downsampling.disable") + } + + compactor := e2e.NewHTTPService( + fmt.Sprintf("compact-%s", name), + DefaultImage(), + e2e.NewCommand("compact", append(e2e.BuildArgs(map[string]string{ + "--debug.name": fmt.Sprintf("compact-%s", name), + "--log.level": logLevel, + "--data-dir": container, + "--objstore.config": string(bktConfigBytes), + "--http-address": ":80", + "--delete-delay": "0s", + "--block-sync-concurrency": "1", + "--selector.relabel-config": string(relabelConfigBytes), + "--wait": "", + }), extArgs...)...), + e2e.NewReadinessProbe(80, "/-/ready", 200), + 80, + ) + compactor.SetBackoff(defaultBackoffConfig) + + return compactor, nil } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 5e41c5f941..a6d7d845ad 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -179,8 +179,6 @@ func (m *mockAlertmanager) ServeHTTP(resp http.ResponseWriter, req *http.Request func TestRule_AlertmanagerHTTPClient(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") - t.Parallel() - s, err := e2e.NewScenario("e2e_test_rule_am_http_client") testutil.Ok(t, err) defer s.Close() @@ -312,7 +310,7 @@ func TestRule(t *testing.T) { { // FileSD which will be used to register discover dynamically q. Files: []string{filepath.Join(e2e.ContainerSharedDir, queryTargetsSubDir, "*.yaml")}, - RefreshInterval: model.Duration(time.Hour), + RefreshInterval: model.Duration(time.Second), }, }, Scheme: "http", @@ -435,7 +433,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, r.WaitSumMetrics(e2e.Equals(1), "thanos_ruler_alertmanagers_dns_provider_results")) }) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) defer cancel() queryAndAssert(t, ctx, q.HTTPEndpoint(), "ALERTS", promclient.QueryOptions{