From 09d60b528f35f0df0c97e752517ddbe8db683f16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 2 Sep 2020 17:54:09 +0300 Subject: [PATCH] *: update changelog, e2e tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 4 ++++ cmd/thanos/compact.go | 38 +++++++++++++++++++++++++++++------ docs/components/compact.md | 6 ++++++ pkg/compact/blocks_cleaner.go | 2 -- test/e2e/compact_test.go | 18 +++++++++++------ 5 files changed, 54 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 891d3c8e01..03658986a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +### Added + +- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes. + ## [v0.15.0](https://github.com/thanos-io/thanos/releases) - in release process. :warning: **WARNING** :warning: Thanos Rule's `/api/v1/rules` endpoint no longer returns the old, deprecated `partial_response_strategy`. The old, deprecated value has been fixed to `WARN` for quite some time. _Please_ use `partialResponseStrategy`. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 88fdc53d26..eecbcae1f7 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -297,6 +297,7 @@ func runCompact( } var cleanMtx sync.Mutex + // TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex. cleanPartialMarked := func() error { cleanMtx.Lock() defer cleanMtx.Unlock() @@ -306,9 +307,24 @@ func runCompact( if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { return errors.Wrap(err, "error cleaning marked blocks") } + + if err := sy.SyncMetas(ctx); err != nil { + level.Error(logger).Log("msg", "failed to sync metas", "err", err) + } return nil } + // Do it once at the beginning to ensure that it runs at least once before + // the main loop. + if err := sy.SyncMetas(ctx); err != nil { + cancel() + return errors.Wrap(err, "syncing metas") + } + if err := cleanPartialMarked(); err != nil { + cancel() + return errors.Wrap(err, "cleaning partial and marked blocks") + } + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction") @@ -427,13 +443,20 @@ func runCompact( // Periodically remove partial blocks and blocks marked for deletion // since one iteration potentially could take a long time. - g.Add(func() error { - return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { - return cleanPartialMarked() + if conf.cleanupBlocksInterval > 0 { + g.Add(func() error { + // Wait the whole period at the beginning because we've executed this on boot. + select { + case <-time.After(conf.cleanupBlocksInterval): + case <-ctx.Done(): + } + return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error { + return cleanPartialMarked() + }) + }, func(error) { + cancel() }) - }, func(error) { - cancel() - }) + } g.Add(func() error { iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval) @@ -477,6 +500,7 @@ type compactConfig struct { disableDownsampling bool blockSyncConcurrency int blockViewerSyncBlockInterval time.Duration + cleanupBlocksInterval time.Duration compactionConcurrency int deleteDelay model.Duration dedupReplicaLabels []string @@ -526,6 +550,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { Default("20").IntVar(&cc.blockSyncConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval) + cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration."). + Default("5m").DurationVar(&cc.cleanupBlocksInterval) cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). Default("1").IntVar(&cc.compactionConcurrency) diff --git a/docs/components/compact.md b/docs/components/compact.md index 9bf7688caf..4e11815b15 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -147,6 +147,12 @@ Flags: Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI. + --compact.cleanup-interval=5m + How often we should clean up partially uploaded + blocks and blocks with deletion mark in the + background when --wait has been enabled. Setting + it to "0s" disables it - the cleaning will only + happen at the end of an iteration. --compact.concurrency=1 Number of goroutines to use when compacting groups. --delete-delay=48h Time before a block marked for deletion is diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go index f171a4ecc3..7381505118 100644 --- a/pkg/compact/blocks_cleaner.go +++ b/pkg/compact/blocks_cleaner.go @@ -51,8 +51,6 @@ func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error { } s.blocksCleaned.Inc() level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID) - // Delete it from the map. It will be repopulated on the next sync if it still exists. - delete(deletionMarkMap, deletionMark.ID) } } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 560020030b..0a1954348f 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -480,7 +480,10 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}}, } - t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) { + // No replica label with overlaps should halt compactor. This test is sequential + // because we do not want two Thanos Compact instances deleting the same partially + // uploaded blocks and blocks with deletion marks. + { c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(c)) @@ -499,22 +502,25 @@ func TestCompactWithStoreGateway(t *testing.T) { // We expect no ops. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total")) + // However, the blocks have been cleaned because that happens concurrently. + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global")) ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded")) testutil.Ok(t, s.Stop(c)) - }) + } + t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) { // We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction. c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica") @@ -524,10 +530,10 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))