Skip to content

Commit

Permalink
Reverted addition of deletion mark for partial uploads.
Browse files Browse the repository at this point in the history
Fixes #2459 (quick fix).

This keeps the logic from the 0.11.0 which was good enough.

Some improvement for future: #2470

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 20, 2020
1 parent 7c5bea6 commit 8792747
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func runCompact(
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/compact/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func BestEffortCleanAbortedPartialUploads(
partial map[ulid.ULID]error,
bkt objstore.Bucket,
deleteAttempts prometheus.Counter,
blocksMarkedForDeletion prometheus.Counter,
) {
level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")

Expand All @@ -45,8 +44,11 @@ func BestEffortCleanAbortedPartialUploads(

deleteAttempts.Inc()
level.Info(logger).Log("msg", "found partially uploaded block; marking for deletion", "block", id)
if err := block.MarkForDeletion(ctx, logger, bkt, id, blocksMarkedForDeletion); err != nil {
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
// We don't gather any information about deletion marks for partial blocks, so let's simply remove it. We waited
// long PartialUploadThresholdAge already.
// TODO(bwplotka): Fix some edge cases: https://github.com/thanos-io/thanos/issues/2470
if err := block.Delete(ctx, logger, bkt, id); err != nil {
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; will retry in next iteration", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err)
return
}
level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge)
Expand Down
10 changes: 2 additions & 8 deletions pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,16 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"), &fakeChunk))

deleteAttempts := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

_, partial, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)

BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts, blocksMarkedForDeletion)
BestEffortCleanAbortedPartialUploads(ctx, logger, partial, bkt, deleteAttempts)
testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts))

exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldDeleteID.String(), metadata.DeletionMarkFilename))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
testutil.Equals(t, 1.0, promtest.ToFloat64(blocksMarkedForDeletion))
testutil.Equals(t, false, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001"))
testutil.Ok(t, err)
Expand Down
7 changes: 4 additions & 3 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ func TestCompactWithStoreGateway(t *testing.T) {
// NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many
// compaction groups. Wait for at least first compaction iteration (next is in 5m).
testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) // This should be 1 [BUG no 1]. TODO: fix.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compactor_blocks_cleaned_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3+1), "thanos_compactor_blocks_marked_for_deletion_total")) // 17.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) // We should have 1.
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total"))
// TODO(bwplotka): This is confusing, should be either normal compaction or vertical not both (?)
// TODO(bwplotka): This is confusing, should be either normal compaction or vertical not both (https://github.com/thanos-io/thanos/issues/2469).
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total"))
testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total"))
Expand All @@ -519,7 +519,8 @@ func TestCompactWithStoreGateway(t *testing.T) {

testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(
len(rawBlockIDs)+7+
5, // 5 compactions, 5 newly added blocks.
5+ // 5 compactions, 5 newly added blocks.
-1, // Partial block removed.
)), "thanos_blocks_meta_synced"))
testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total"))

Expand Down

0 comments on commit 8792747

Please sign in to comment.