diff --git a/CHANGELOG.md b/CHANGELOG.md index 4364f7868c..edd58d23db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility. - [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range. - [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`. +- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 4e7c6b575b..a2e944a7cf 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -36,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -441,6 +442,10 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints() matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings() singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool() + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z")) + maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("9999-12-31T23:59:59Z")) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { matchers, err := replicate.ParseFlagMatchers(*matcherStrs) @@ -466,6 +471,8 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P objStoreConfig, toObjStoreConfig, *singleRun, + minTime, + maxTime, ) }) } diff --git a/docs/components/tools.md b/docs/components/tools.md index 19638c29c2..2179f9d552 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -472,6 +472,22 @@ Flags: --matcher=key="value" ... Only blocks whose external labels exactly match this matcher will be replicated. --single-run Run replication only one time, then exit. + --min-time=0000-01-01T00:00:00Z + Start of time range limit to replicate. Thanos + Replicate will replicate only metrics, which + happened later than this value. Option can be a + constant time in RFC3339 format or time + duration relative to current time, such as -1d + or 2h45m. Valid duration units are ms, s, m, h, + d, w, y. + --max-time=9999-12-31T23:59:59Z + End of time range limit to replicate. Thanos + Replicate will replicate only metrics, which + happened earlier than this value. Option can be + a constant time in RFC3339 format or time + duration relative to current time, such as -1d + or 2h45m. Valid duration units are ms, s, m, h, + d, w, y. ``` diff --git a/pkg/replicate/replicator.go b/pkg/replicate/replicator.go index c76a02afbc..b186c5e5d3 100644 --- a/pkg/replicate/replicator.go +++ b/pkg/replicate/replicator.go @@ -10,6 +10,8 @@ import ( "strings" "time" + thanosmodel "github.com/thanos-io/thanos/pkg/model" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -80,6 +82,7 @@ func RunReplicate( fromObjStoreConfig *extflag.PathOrContent, toObjStoreConfig *extflag.PathOrContent, singleRun bool, + minTime, maxTime *thanosmodel.TimeOrDurationValue, ) error { logger = log.With(logger, "component", "replicate") @@ -161,7 +164,15 @@ func RunReplicate( replicationRunDuration.WithLabelValues(labelSuccess) replicationRunDuration.WithLabelValues(labelError) - fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil) + fetcher, err := thanosblock.NewMetaFetcher( + logger, + 32, + fromBkt, + "", + reg, + []thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)}, + nil, + ) if err != nil { return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt) } diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go index 893c89865c..7e1de26552 100644 --- a/pkg/replicate/scheme.go +++ b/pkg/replicate/scheme.go @@ -241,7 +241,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli // If the origin meta file content and target meta file content is // equal, we know we have already successfully replicated // previously. - level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String()) + level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID) rs.metrics.blocksAlreadyReplicated.Inc() return nil diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go index e8686f90ef..ef02b97065 100644 --- a/pkg/replicate/scheme_test.go +++ b/pkg/replicate/scheme_test.go @@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) { fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil) testutil.Ok(t, err) - r := newReplicationScheme( - logger, - newReplicationMetrics(nil), - filter, - fetcher, - objstore.WithNoopInstr(originBucket), - targetBucket, - nil, - ) + r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil) err = r.execute(ctx) testutil.Ok(t, err)