Skip to content

Commit

Permalink
*: update changelog, e2e tests
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Sep 3, 2020
1 parent 2591e95 commit 1fd6a5f
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Added

- [#3115](https://github.com/thanos-io/thanos/pull/3115) compact: now deletes partially uploaded and blocks with deletion marks concurrently. It does that at the beginning and then every `--compact.cleanup-interval` time period. By default it is 5 minutes.

## [v0.15.0](https://github.com/thanos-io/thanos/releases) - in release process.

:warning: **WARNING** :warning: Thanos Rule's `/api/v1/rules` endpoint no longer returns the old, deprecated `partial_response_strategy`. The old, deprecated value has been fixed to `WARN` for quite some time. _Please_ use `partialResponseStrategy`.
Expand Down
22 changes: 16 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func runCompact(
}

var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
cleanMtx.Lock()
defer cleanMtx.Unlock()
Expand All @@ -306,6 +307,10 @@ func runCompact(
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning marked blocks")
}

if err := sy.SyncMetas(ctx); err != nil {
level.Error(logger).Log("msg", "failed to sync metas", "err", err)
}
return nil
}

Expand Down Expand Up @@ -427,13 +432,15 @@ func runCompact(

// Periodically remove partial blocks and blocks marked for deletion
// since one iteration potentially could take a long time.
g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
return cleanPartialMarked()
if conf.cleanupBlocksInterval > 0 {
g.Add(func() error {
return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error {
return cleanPartialMarked()
})
}, func(error) {
cancel()
})
}, func(error) {
cancel()
})
}

g.Add(func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval)
Expand Down Expand Up @@ -477,6 +484,7 @@ type compactConfig struct {
disableDownsampling bool
blockSyncConcurrency int
blockViewerSyncBlockInterval time.Duration
cleanupBlocksInterval time.Duration
compactionConcurrency int
deleteDelay model.Duration
dedupReplicaLabels []string
Expand Down Expand Up @@ -526,6 +534,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("compact.cleanup-interval", "How often we should clean up partially uploaded blocks and blocks with deletion mark in the background when --wait has been enabled. Setting it to \"0s\" disables it - the cleaning will only happen at the end of an iteration.").
Default("5m").DurationVar(&cc.cleanupBlocksInterval)

cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
Expand Down
6 changes: 6 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ Flags:
Repeat interval for syncing the blocks between
local and remote view for /global Block Viewer
UI.
--compact.cleanup-interval=5m
How often we should clean up partially uploaded
blocks and blocks with deletion mark in the
background when --wait has been enabled. Setting
it to "0s" disables it - the cleaning will only
happen at the end of an iteration.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.
--delete-delay=48h Time before a block marked for deletion is
Expand Down
2 changes: 0 additions & 2 deletions pkg/compact/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error {
}
s.blocksCleaned.Inc()
level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID)
// Delete it from the map. It will be repopulated on the next sync if it still exists.
delete(deletionMarkMap, deletionMark.ID)
}
}

Expand Down
77 changes: 41 additions & 36 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,41 +480,46 @@ func TestCompactWithStoreGateway(t *testing.T) {
{Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "full-replica-overlap-dedup-ready", "replica": "1"}},
}

t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// Expect compactor halted.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compactor_halted"))

// The compact directory is still there.
dataDir := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt")
empty, err := isEmptyDir(dataDir)
testutil.Ok(t, err)
testutil.Equals(t, false, empty, "directory %s should not be empty", dataDir)

// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))

testutil.Ok(t, s.Stop(c))
t.Run("negative tests", func(t *testing.T) {
t.Run("no replica label with overlaps should halt compactor", func(t *testing.T) {
c, err := e2ethanos.NewCompactor(s.SharedDir(), "expect-to-halt", svcConfig, nil)
testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(c))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7)), "thanos_blocks_meta_synced"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified"))

// Expect compactor halted.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compactor_halted"))

// The compact directory is still there.
dataDir := filepath.Join(s.SharedDir(), "data", "compact", "expect-to-halt")
empty, err := isEmptyDir(dataDir)
testutil.Ok(t, err)
testutil.Equals(t, false, empty, "directory %s should not be empty", dataDir)

// We expect no ops.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_compaction_runs_started_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compact_group_compaction_runs_completed_total"))

// However, the blocks have been cleaned because that happens concurrently.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))

// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "global"))
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(c.HTTPEndpoint(), "loaded"))

testutil.Ok(t, s.Stop(c))
})
})

t.Run("dedup enabled; compactor should work as expected", func(t *testing.T) {
// We expect 2x 4-block compaction, 2-block vertical compaction, 2x 3-block compaction.
c, err := e2ethanos.NewCompactor(s.SharedDir(), "working", svcConfig, nil, "--deduplication.replica-label=replica", "--deduplication.replica-label=rule_replica")
Expand All @@ -524,10 +529,10 @@ func TestCompactWithStoreGateway(t *testing.T) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
Expand Down

0 comments on commit 1fd6a5f

Please sign in to comment.