Skip to content

Commit

Permalink
cmd: compact: clean partial / marked blocks concurrently
Browse files Browse the repository at this point in the history
Clean partially uploaded and blocks marked for deletion concurrently
with the whole compaction/downsampling process. One iteration could
potentially take a few days so it should be nice to periodically clean
unneeded blocks in the background. Without this, there are huge spikes
in block storage usage. The spike's size depends on how long it takes to
complete one iteration.

The implementation of this is simple - factored out the deletion part
into a separate function. It is called at the end of an iteration +
concurrently if `--wait` has been specified. Add a mutex to protect from
concurrent runs. Delete blocks from the deletion mark map so that we
wouldn't try to delete same blocks twice or more.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS committed Sep 2, 2020
1 parent e239ef7 commit 2591e95
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
31 changes: 25 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -295,6 +296,19 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

var cleanMtx sync.Mutex
cleanPartialMarked := func() error {
cleanMtx.Lock()
defer cleanMtx.Unlock()

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning marked blocks")
}
return nil
}

compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction")
Expand Down Expand Up @@ -339,12 +353,7 @@ func runCompact(
return errors.Wrap(err, "retention failed")
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksCleaned, blockCleanupFailures)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}
return nil
return cleanPartialMarked()
}

g.Add(func() error {
Expand Down Expand Up @@ -416,6 +425,16 @@ func runCompact(

srv.Handle("/", r)

// Periodically remove partial blocks and blocks marked for deletion
// since one iteration potentially could take a long time.
g.Add(func() error {
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
return cleanPartialMarked()
})
}, func(error) {
cancel()
})

g.Add(func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, conf.waitInterval)
_, _, _ = f.Fetch(iterCtx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/compact/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error {
}
s.blocksCleaned.Inc()
level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID)
// Delete it from the map. It will be repopulated on the next sync if it still exists.
delete(deletionMarkMap, deletionMark.ID)
}
}

Expand Down

0 comments on commit 2591e95

Please sign in to comment.