From eeb4150abf7b693fde02757b51a6647c7d71182c Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Tue, 4 Aug 2020 14:36:21 +0200 Subject: [PATCH 01/10] Add TimePartitionMetaFilter to thanos replicate. This will allow time based replication. Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- cmd/thanos/tools_bucket.go | 8 ++++++++ pkg/replicate/replicator.go | 12 +++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 4e7c6b575b..8496d94fe2 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/thanos-io/thanos/pkg/model" "os" "sort" "strconv" @@ -441,6 +442,11 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z")) + maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("9999-12-31T23:59:59Z")) + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { matchers, err := replicate.ParseFlagMatchers(*matcherStrs) @@ -466,6 +472,8 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P objStoreConfig, toObjStoreConfig, *singleRun, + minTime, + maxTime, ) }) } diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index c76a02afbc..b887a3126d 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -5,6 +5,7 @@ package replicate import ( "context" + thanosmodel "github.com/thanos-io/thanos/pkg/model" "math/rand" "strconv" "strings" @@ -80,6 +81,7 @@ func RunReplicate( fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, + maxTime, minTime *thanosmodel.TimeOrDurationValue, ) error { logger = log.With(logger, "component", "replicate") @@ -161,7 +163,15 @@ func RunReplicate( replicationRunDuration.WithLabelValues(labelSuccess) replicationRunDuration.WithLabelValues(labelError) - fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil) + fetcher, err := thanosblock.NewMetaFetcher( + logger, + 32, + fromBkt, + "", + reg, + []thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)}, + nil, + ) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } From 1066cc86f95a9ed96f07f970fed91e4ca75f36fe Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Tue, 4 Aug 2020 14:48:23 +0200 Subject: [PATCH 02/10] Update docs and changelog. Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- docs/components/tools.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/components/tools.md b/docs/components/tools.md index 19638c29c2..2179f9d552 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -472,6 +472,22 @@ Flags: --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. --single-run Run replication only one time, then exit. + --min-time=0000-01-01T00:00:00Z + Start of time range limit to replicate. Thanos + Replicate will replicate only metrics, which + happened later than this value. Option can be a + constant time in RFC3339 format or time + duration relative to current time, such as -1d + or 2h45m. Valid duration units are ms, s, m, h, + d, w, y. + --max-time=9999-12-31T23:59:59Z + End of time range limit to replicate. Thanos + Replicate will replicate only metrics, which + happened earlier than this value. Option can be + a constant time in RFC3339 format or time + duration relative to current time, such as -1d + or 2h45m. Valid duration units are ms, s, m, h, + d, w, y. ``` From c8f6b6e559362c3b9ebf8fb3526e61700cc679f8 Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Tue, 4 Aug 2020 15:10:40 +0200 Subject: [PATCH 03/10] Fix lint Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- pkg/replicate/replicator.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index b887a3126d..e0e9ca6c45 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -5,12 +5,13 @@ package replicate import ( "context" - thanosmodel "github.com/thanos-io/thanos/pkg/model" "math/rand" "strconv" "strings" "time" + thanosmodel "github.com/thanos-io/thanos/pkg/model" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" From abbf93a2118b8931063ac7c1a85792ec54c481ff Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Wed, 5 Aug 2020 13:19:10 +0200 Subject: [PATCH 04/10] Add posibility to delete blocks older than max time Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- cmd/thanos/tools_bucket.go | 2 ++ docs/components/tools.md | 1 + pkg/block/block.go | 16 ++++++++++- pkg/replicate/replicator.go | 19 ++++++++++++- pkg/replicate/scheme.go | 54 +++++++++++++++++++++++------------- pkg/replicate/scheme_test.go | 10 +------ 6 files changed, 71 insertions(+), 31 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 8496d94fe2..57496a9fa0 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -446,6 +446,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P Default("0000-01-01T00:00:00Z")) maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z")) + deleteOldBlocks := cmd.Flag("delete-old-blocks", "Delete blocks that are older then max-time.").Default("false").Bool() cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -474,6 +475,7 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P *singleRun, minTime, maxTime, + *deleteOldBlocks, ) }) } diff --git a/docs/components/tools.md b/docs/components/tools.md index 2179f9d552..c3f6310e5b 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -488,6 +488,7 @@ Flags: duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y. + --delete-old-blocks Delete blocks that are older then max-time. ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 550ebc351f..5d41610a2d 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -123,6 +123,20 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er // MarkForDeletion creates a file which stores information about when the block was marked for deletion. func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error { + return markForDeletion(ctx, logger, bkt, id, time.Now(), markedForDeletion) +} + +// MarkForFutureDeletion creates a file which stores information about when the block should be deleted in the future. +func MarkForFutureDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, futureDeletionTime time.Time, markedForDeletion prometheus.Counter) error { + if time.Now().Before(futureDeletionTime) { + return errors.New(fmt.Sprintf("deletion time %s is not in the future", futureDeletionTime.Format(time.RFC3339))) + } + + return markForDeletion(ctx, logger, bkt, id, futureDeletionTime, markedForDeletion) +} + +// MarkForDeletion creates a file which stores information about when the block should be marked for deletion. +func markForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, deletionTime time.Time, markedForDeletion prometheus.Counter) error { deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile) if err != nil { @@ -135,7 +149,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket deletionMark, err := json.Marshal(metadata.DeletionMark{ ID: id, - DeletionTime: time.Now().Unix(), + DeletionTime: deletionTime.Unix(), Version: metadata.DeletionMarkVersion1, }) if err != nil { diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index e0e9ca6c45..69f0646ece 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -83,6 +83,7 @@ func RunReplicate( toObjStoreConfig *extflag.PathOrContent, singleRun bool, maxTime, minTime *thanosmodel.TimeOrDurationValue, + deleteOldBlocks bool, ) error { logger = log.With(logger, "component", "replicate") @@ -163,6 +164,14 @@ func RunReplicate( }, []string{"result"}) replicationRunDuration.WithLabelValues(labelSuccess) replicationRunDuration.WithLabelValues(labelError) + blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_blocks_cleaned_total", + Help: "Total number of blocks deleted in replicator.", + }) + blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate_block_cleanup_failures_total", + Help: "Failures encountered while deleting blocks in replicator.", + }) fetcher, err := thanosblock.NewMetaFetcher( logger, @@ -186,6 +195,8 @@ func RunReplicate( metrics := newReplicationMetrics(reg) ctx, cancel := context.WithCancel(context.Background()) + blocksCleaner := compact.NewBlocksCleaner(logger, toBkt, thanosblock.NewIgnoreDeletionMarkFilter(logger, toBkt, time.Hour), time.Hour, blocksCleaned, blockCleanupFailures) + replicateFn := func() error { timestamp := time.Now() entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0) @@ -198,10 +209,16 @@ func RunReplicate( logger := log.With(logger, "replication-run-id", ulid.String()) level.Info(logger).Log("msg", "running replication attempt") - if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil { + if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, maxTime, deleteOldBlocks).execute(ctx); err != nil { return errors.Wrap(err, "replication execute") } + if deleteOldBlocks { + if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + return errors.Wrap(err, "failed to delete old blocks") + } + } + return nil } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 893c89865c..a21557907b 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -11,6 +11,9 @@ import ( "io/ioutil" "path" "sort" + "time" + + "github.com/thanos-io/thanos/pkg/model" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -114,12 +117,17 @@ type replicationScheme struct { metrics *replicationMetrics reg prometheus.Registerer + + maxTime *model.TimeOrDurationValue + markBlocksForFutureDeletion bool } type replicationMetrics struct { blocksAlreadyReplicated prometheus.Counter blocksReplicated prometheus.Counter objectsReplicated prometheus.Counter + + blocksMarkedForDeletion prometheus.Counter } func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { @@ -136,31 +144,29 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { Name: "thanos_replicate_objects_replicated_total", Help: "Total number of objects replicated.", }), + blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_replicate _blocks_marked_for_deletion_total", + Help: "Total number of blocks marked for deletion in compactor.", + }), } return m } -func newReplicationScheme( - logger log.Logger, - metrics *replicationMetrics, - blockFilter blockFilterFunc, - fetcher thanosblock.MetadataFetcher, - from objstore.InstrumentedBucketReader, - to objstore.Bucket, - reg prometheus.Registerer, -) *replicationScheme { +func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer, maxTime *model.TimeOrDurationValue, markFoFutureDeletion bool) *replicationScheme { if logger == nil { logger = log.NewNopLogger() } return &replicationScheme{ - logger: logger, - blockFilter: blockFilter, - fetcher: fetcher, - fromBkt: from, - toBkt: to, - metrics: metrics, - reg: reg, + logger: logger, + blockFilter: blockFilter, + fetcher: fetcher, + fromBkt: from, + toBkt: to, + metrics: metrics, + reg: reg, + maxTime: maxTime, + markBlocksForFutureDeletion: markFoFutureDeletion, } } @@ -190,7 +196,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error { }) for _, b := range availableBlocks { - if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { + if err := rs.ensureBlockIsReplicated(ctx, b); err != nil { return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) } } @@ -200,8 +206,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error { // ensureBlockIsReplicated ensures that a block present in the origin bucket is // present in the target bucket. -func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error { - blockID := id.String() +func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error { + blockID := meta.ULID.String() chunksDir := path.Join(blockID, thanosblock.ChunksDirname) indexFile := path.Join(blockID, thanosblock.IndexFilename) metaFile := path.Join(blockID, thanosblock.MetaFilename) @@ -241,7 +247,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli // If the origin meta file content and target meta file content is // equal, we know we have already successfully replicated // previously. - level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String()) + level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String()) rs.metrics.blocksAlreadyReplicated.Inc() return nil @@ -269,6 +275,14 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli return errors.Wrap(err, "upload meta file") } + if rs.markBlocksForFutureDeletion { + deletionTime := time.Unix(meta.MaxTime/1000, 0).Add(time.Duration(*rs.maxTime.Dur)) + if err := thanosblock.MarkForFutureDeletion(ctx, rs.logger, rs.toBkt, meta.ULID, deletionTime, nil); err != nil { + return errors.Wrap(err, "failed to mark block for future deletion") + } + rs.metrics.blocksMarkedForDeletion.Inc() + } + rs.metrics.blocksReplicated.Inc() return nil diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index e8686f90ef..924dd2699f 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) { fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) testutil.Ok(t, err) - r := newReplicationScheme( - logger, - newReplicationMetrics(nil), - filter, - fetcher, - objstore.WithNoopInstr(originBucket), - targetBucket, - nil, - ) + r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil, nil, false) err = r.execute(ctx) testutil.Ok(t, err) From 9318d909f04c26a6cdd224bd79aee474b95ec2f1 Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Thu, 6 Aug 2020 11:08:11 +0200 Subject: [PATCH 05/10] Fix metric naming. Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- pkg/replicate/scheme.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index a21557907b..0e8fa5061c 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -145,7 +145,7 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { Help: "Total number of objects replicated.", }), blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate _blocks_marked_for_deletion_total", + Name: "thanos_replicate_blocks_marked_for_deletion_total", Help: "Total number of blocks marked for deletion in compactor.", }), } From c18d06901958241db946e3e609a5ba934e1ad0d4 Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Thu, 6 Aug 2020 14:22:10 +0200 Subject: [PATCH 06/10] Fix order of arguments for max and min time. Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- pkg/replicate/replicator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 69f0646ece..29ce3487f8 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -82,7 +82,7 @@ func RunReplicate( fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, - maxTime, minTime *thanosmodel.TimeOrDurationValue, + minTime, maxTime *thanosmodel.TimeOrDurationValue, deleteOldBlocks bool, ) error { logger = log.With(logger, "component", "replicate") From a66b40cf2fc7b8dc8c0c3cef320f3e394437133b Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+ogkevin@users.noreply.github.com> Date: Sat, 31 Oct 2020 12:52:49 +0100 Subject: [PATCH 07/10] Remove deletion of blocks from replicator. Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- CHANGELOG.md | 1 + cmd/thanos/tools_bucket.go | 6 ++---- docs/components/tools.md | 1 - pkg/block/block.go | 16 +--------------- pkg/replicate/replicator.go | 19 +------------------ pkg/replicate/scheme.go | 33 ++++++++------------------------- pkg/replicate/scheme_test.go | 2 +- 7 files changed, 14 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4364f7868c..a6bc99c484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility. - [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range. - [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`. +- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to be albe to replicate blocks within a time frame by passing --min-time and --max-time ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 57496a9fa0..15e46c64ea 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -7,7 +7,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/thanos-io/thanos/pkg/model" "os" "sort" "strconv" @@ -15,6 +14,8 @@ import ( "text/template" "time" + "github.com/thanos-io/thanos/pkg/model" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -446,8 +447,6 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P Default("0000-01-01T00:00:00Z")) maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). Default("9999-12-31T23:59:59Z")) - deleteOldBlocks := cmd.Flag("delete-old-blocks", "Delete blocks that are older then max-time.").Default("false").Bool() - cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { matchers, err := replicate.ParseFlagMatchers(*matcherStrs) @@ -475,7 +474,6 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P *singleRun, minTime, maxTime, - *deleteOldBlocks, ) }) } diff --git a/docs/components/tools.md b/docs/components/tools.md index c3f6310e5b..2179f9d552 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -488,7 +488,6 @@ Flags: duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y. - --delete-old-blocks Delete blocks that are older then max-time. ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 5d41610a2d..550ebc351f 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -123,20 +123,6 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er // MarkForDeletion creates a file which stores information about when the block was marked for deletion. func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error { - return markForDeletion(ctx, logger, bkt, id, time.Now(), markedForDeletion) -} - -// MarkForFutureDeletion creates a file which stores information about when the block should be deleted in the future. -func MarkForFutureDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, futureDeletionTime time.Time, markedForDeletion prometheus.Counter) error { - if time.Now().Before(futureDeletionTime) { - return errors.New(fmt.Sprintf("deletion time %s is not in the future", futureDeletionTime.Format(time.RFC3339))) - } - - return markForDeletion(ctx, logger, bkt, id, futureDeletionTime, markedForDeletion) -} - -// MarkForDeletion creates a file which stores information about when the block should be marked for deletion. -func markForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, deletionTime time.Time, markedForDeletion prometheus.Counter) error { deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename) deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile) if err != nil { @@ -149,7 +135,7 @@ func markForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket deletionMark, err := json.Marshal(metadata.DeletionMark{ ID: id, - DeletionTime: deletionTime.Unix(), + DeletionTime: time.Now().Unix(), Version: metadata.DeletionMarkVersion1, }) if err != nil { diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index 29ce3487f8..b186c5e5d3 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -83,7 +83,6 @@ func RunReplicate( toObjStoreConfig *extflag.PathOrContent, singleRun bool, minTime, maxTime *thanosmodel.TimeOrDurationValue, - deleteOldBlocks bool, ) error { logger = log.With(logger, "component", "replicate") @@ -164,14 +163,6 @@ func RunReplicate( }, []string{"result"}) replicationRunDuration.WithLabelValues(labelSuccess) replicationRunDuration.WithLabelValues(labelError) - blocksCleaned := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_blocks_cleaned_total", - Help: "Total number of blocks deleted in replicator.", - }) - blockCleanupFailures := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_block_cleanup_failures_total", - Help: "Failures encountered while deleting blocks in replicator.", - }) fetcher, err := thanosblock.NewMetaFetcher( logger, @@ -195,8 +186,6 @@ func RunReplicate( metrics := newReplicationMetrics(reg) ctx, cancel := context.WithCancel(context.Background()) - blocksCleaner := compact.NewBlocksCleaner(logger, toBkt, thanosblock.NewIgnoreDeletionMarkFilter(logger, toBkt, time.Hour), time.Hour, blocksCleaned, blockCleanupFailures) - replicateFn := func() error { timestamp := time.Now() entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0) @@ -209,16 +198,10 @@ func RunReplicate( logger := log.With(logger, "replication-run-id", ulid.String()) level.Info(logger).Log("msg", "running replication attempt") - if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg, maxTime, deleteOldBlocks).execute(ctx); err != nil { + if err := newReplicationScheme(logger, metrics, blockFilter, fetcher, fromBkt, toBkt, reg).execute(ctx); err != nil { return errors.Wrap(err, "replication execute") } - if deleteOldBlocks { - if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { - return errors.Wrap(err, "failed to delete old blocks") - } - } - return nil } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 0e8fa5061c..20d7d7684e 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -11,13 +11,9 @@ import ( "io/ioutil" "path" "sort" - "time" - - "github.com/thanos-io/thanos/pkg/model" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -117,9 +113,6 @@ type replicationScheme struct { metrics *replicationMetrics reg prometheus.Registerer - - maxTime *model.TimeOrDurationValue - markBlocksForFutureDeletion bool } type replicationMetrics struct { @@ -152,21 +145,19 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { return m } -func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer, maxTime *model.TimeOrDurationValue, markFoFutureDeletion bool) *replicationScheme { +func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme { if logger == nil { logger = log.NewNopLogger() } return &replicationScheme{ - logger: logger, - blockFilter: blockFilter, - fetcher: fetcher, - fromBkt: from, - toBkt: to, - metrics: metrics, - reg: reg, - maxTime: maxTime, - markBlocksForFutureDeletion: markFoFutureDeletion, + logger: logger, + blockFilter: blockFilter, + fetcher: fetcher, + fromBkt: from, + toBkt: to, + metrics: metrics, + reg: reg, } } @@ -275,14 +266,6 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta * return errors.Wrap(err, "upload meta file") } - if rs.markBlocksForFutureDeletion { - deletionTime := time.Unix(meta.MaxTime/1000, 0).Add(time.Duration(*rs.maxTime.Dur)) - if err := thanosblock.MarkForFutureDeletion(ctx, rs.logger, rs.toBkt, meta.ULID, deletionTime, nil); err != nil { - return errors.Wrap(err, "failed to mark block for future deletion") - } - rs.metrics.blocksMarkedForDeletion.Inc() - } - rs.metrics.blocksReplicated.Inc() return nil diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index 924dd2699f..ef02b97065 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -315,7 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) { fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) testutil.Ok(t, err) - r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil, nil, false) + r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil) err = r.execute(ctx) testutil.Ok(t, err) From 7c7e09a8bc0274b4de4159195ec39b7e6a30683b Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Sat, 31 Oct 2020 13:02:02 +0100 Subject: [PATCH 08/10] Fix import formatting Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- cmd/thanos/tools_bucket.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 15e46c64ea..a2e944a7cf 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -14,8 +14,6 @@ import ( "text/template" "time" - "github.com/thanos-io/thanos/pkg/model" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -38,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" From 431021a173be9738a086f2399a44cc6db5994862 Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Mon, 2 Nov 2020 20:12:55 +0100 Subject: [PATCH 09/10] Revert unneeded changes due to replicator no longer deleting blocs Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- CHANGELOG.md | 2 +- pkg/replicate/scheme.go | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6bc99c484..74fddfeae3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility. - [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range. - [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`. -- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to be albe to replicate blocks within a time frame by passing --min-time and --max-time +- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-tim ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 20d7d7684e..7e1de26552 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -119,8 +120,6 @@ type replicationMetrics struct { blocksAlreadyReplicated prometheus.Counter blocksReplicated prometheus.Counter objectsReplicated prometheus.Counter - - blocksMarkedForDeletion prometheus.Counter } func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { @@ -137,15 +136,19 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics { Name: "thanos_replicate_objects_replicated_total", Help: "Total number of objects replicated.", }), - blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_replicate_blocks_marked_for_deletion_total", - Help: "Total number of blocks marked for deletion in compactor.", - }), } return m } -func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme { +func newReplicationScheme( + logger log.Logger, + metrics *replicationMetrics, + blockFilter blockFilterFunc, + fetcher thanosblock.MetadataFetcher, + from objstore.InstrumentedBucketReader, + to objstore.Bucket, + reg prometheus.Registerer, +) *replicationScheme { if logger == nil { logger = log.NewNopLogger() } @@ -187,7 +190,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error { }) for _, b := range availableBlocks { - if err := rs.ensureBlockIsReplicated(ctx, b); err != nil { + if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil { return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String()) } } @@ -197,8 +200,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error { // ensureBlockIsReplicated ensures that a block present in the origin bucket is // present in the target bucket. -func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error { - blockID := meta.ULID.String() +func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error { + blockID := id.String() chunksDir := path.Join(blockID, thanosblock.ChunksDirname) indexFile := path.Join(blockID, thanosblock.IndexFilename) metaFile := path.Join(blockID, thanosblock.MetaFilename) @@ -238,7 +241,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta * // If the origin meta file content and target meta file content is // equal, we know we have already successfully replicated // previously. - level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String()) + level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID) rs.metrics.blocksAlreadyReplicated.Inc() return nil From 47073a6aba8134565c8259189f157864500db359 Mon Sep 17 00:00:00 2001 From: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> Date: Tue, 3 Nov 2020 09:50:51 +0100 Subject: [PATCH 10/10] Fix typo Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74fddfeae3..edd58d23db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,7 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility. - [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range. - [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`. -- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-tim +- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.