Skip to content

Commit

Permalink
Add posibility to delete blocks older than max time
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>
  • Loading branch information
OGKevin committed Aug 5, 2020
1 parent b3651dd commit bdc6443
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 32 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
Default("0000-01-01T00:00:00Z"))
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))
deleteOldBlocks := cmd.Flag("delete-old-blocks", "Delete blocks that are older then max-time.").Default("false").Bool()

m[name+" replicate"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
Expand Down Expand Up @@ -471,6 +472,7 @@ func registerBucketReplicate(m map[string]setupFunc, root *kingpin.CmdClause, na
*singleRun,
minTime,
maxTime,
*deleteOldBlocks,
)
}

Expand Down
1 change: 1 addition & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ Flags:
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.
--delete-old-blocks Delete blocks that are older then max-time.
```

Expand Down
16 changes: 15 additions & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
return markForDeletion(ctx, logger, bkt, id, time.Now(), markedForDeletion)
}

// MarkForFutureDeletion creates a file which stores information about when the block should be deleted in the future.
func MarkForFutureDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, futureDeletionTime time.Time, markedForDeletion prometheus.Counter) error {
if time.Now().Before(futureDeletionTime) {
return errors.New(fmt.Sprintf("deletion time %s is not in the future", futureDeletionTime.Format(time.RFC3339)))
}

return markForDeletion(ctx, logger, bkt, id, futureDeletionTime, markedForDeletion)
}

// MarkForDeletion creates a file which stores information about when the block should be marked for deletion.
func markForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, deletionTime time.Time, markedForDeletion prometheus.Counter) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
if err != nil {
Expand All @@ -135,7 +149,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket

deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
DeletionTime: deletionTime.Unix(),
Version: metadata.DeletionMarkVersion1,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/deletionmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var ErrorDeletionMarkNotFound = errors.New("deletion-mark.json not found")
// or the deletion-mark.json file is not a valid json file.
var ErrorUnmarshalDeletionMark = errors.New("unmarshal deletion-mark.json")

// DeletionMark stores block id and when block was marked for deletion.
// DeletionMark stores block id| and when block was marked for deletion.
type DeletionMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
Expand Down
19 changes: 18 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func RunReplicate(
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
maxTime, minTime *thanosmodel.TimeOrDurationValue,
deleteOldBlocks bool,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down Expand Up @@ -163,6 +164,14 @@ func RunReplicate(
}, []string{"result"})
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)
blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_cleaned_total",
Help: "Total number of blocks deleted in replicator.",
})
blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in replicator.",
})

fetcher, err := thanosblock.NewMetaFetcher(
logger,
Expand All @@ -186,6 +195,8 @@ func RunReplicate(
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())

blocksCleaner := compact.NewBlocksCleaner(logger, toBkt, thanosblock.NewIgnoreDeletionMarkFilter(logger, toBkt, time.Hour), time.Hour, blocksCleaned, blockCleanupFailures)

replicateFn := func() error {
timestamp := time.Now()
entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)
Expand All @@ -198,10 +209,16 @@ func RunReplicate(
logger := log.With(logger, "replication-run-id", ulid.String())
level.Info(logger).Log("msg", "running replication attempt")

if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil {
if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, maxTime, deleteOldBlocks).execute(ctx); err != nil {
return errors.Wrap(err, "replication execute")
}

if deleteOldBlocks {
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "failed to delete old blocks")
}
}

return nil
}

Expand Down
54 changes: 34 additions & 20 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"io/ioutil"
"path"
"sort"
"time"

"github.com/thanos-io/thanos/pkg/model"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -114,6 +117,9 @@ type replicationScheme struct {
metrics *replicationMetrics

reg prometheus.Registerer

maxTime *model.TimeOrDurationValue
markBlocksForFutureDeletion bool
}

type replicationMetrics struct {
Expand All @@ -124,6 +130,8 @@ type replicationMetrics struct {
blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter

blocksMarkedForDeletion prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Expand Down Expand Up @@ -152,31 +160,29 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Name: "thanos_replicate_objects_replicated_total",
Help: "Total number of objects replicated.",
}),
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate _blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
}),
}
return m
}

func newReplicationScheme(
logger log.Logger,
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
to objstore.Bucket,
reg prometheus.Registerer,
) *replicationScheme {
func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer, maxTime *model.TimeOrDurationValue, markFoFutureDeletion bool) *replicationScheme {
if logger == nil {
logger = log.NewNopLogger()
}

return &replicationScheme{
logger: logger,
blockFilter: blockFilter,
fetcher: fetcher,
fromBkt: from,
toBkt: to,
metrics: metrics,
reg: reg,
logger: logger,
blockFilter: blockFilter,
fetcher: fetcher,
fromBkt: from,
toBkt: to,
metrics: metrics,
reg: reg,
maxTime: maxTime,
markBlocksForFutureDeletion: markFoFutureDeletion,
}
}

Expand Down Expand Up @@ -231,7 +237,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
})

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
if err := rs.ensureBlockIsReplicated(ctx, b); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
}
Expand All @@ -241,8 +247,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error {

// ensureBlockIsReplicated ensures that a block present in the origin bucket is
// present in the target bucket.
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error {
blockID := id.String()
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error {
blockID := meta.ULID.String()
chunksDir := path.Join(blockID, thanosblock.ChunksDirname)
indexFile := path.Join(blockID, thanosblock.IndexFilename)
metaFile := path.Join(blockID, thanosblock.MetaFilename)
Expand Down Expand Up @@ -281,7 +287,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String())
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String())
rs.metrics.blocksAlreadyReplicated.Inc()

return nil
Expand Down Expand Up @@ -309,6 +315,14 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
return errors.Wrap(err, "upload meta file")
}

if rs.markBlocksForFutureDeletion {
deletionTime := time.Unix(meta.MaxTime/1000, 0).Add(time.Duration(*rs.maxTime.Dur))
if err := thanosblock.MarkForFutureDeletion(ctx, rs.logger, rs.toBkt, meta.ULID, deletionTime, nil); err != nil {
return errors.Wrap(err, "failed to mark block for future deletion")
}
rs.metrics.blocksMarkedForDeletion.Inc()
}

rs.metrics.blocksReplicated.Inc()

return nil
Expand Down
10 changes: 1 addition & 9 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) {
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

r := newReplicationScheme(
logger,
newReplicationMetrics(nil),
filter,
fetcher,
objstore.WithNoopInstr(originBucket),
targetBucket,
nil,
)
r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil, nil, false)

err = r.execute(ctx)
testutil.Ok(t, err)
Expand Down

0 comments on commit bdc6443

Please sign in to comment.