Skip to content

Commit

Permalink
REPLICATOR: Add TimePartitionMetaFilter to thanos replicate. (#2979)
Browse files Browse the repository at this point in the history
* Add TimePartitionMetaFilter to thanos replicate.

This will allow time based replication.

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Update docs and changelog.

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Fix lint

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Add posibility to delete blocks older than max time

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Fix metric naming.

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Fix order of arguments for max and min time.

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Remove deletion of blocks from replicator.

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Fix import formatting

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Revert unneeded changes due to replicator no longer deleting blocs

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>

* Fix typo

Signed-off-by: Kevin Hellemun <17928966+OGKevin@users.noreply.github.com>
  • Loading branch information
OGKevin authored Nov 3, 2020
1 parent 5abda52 commit 3ce844b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility.
- [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range.
- [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`.
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time

### Fixed
- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -441,6 +442,10 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
Expand All @@ -466,6 +471,8 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
objStoreConfig,
toObjStoreConfig,
*singleRun,
minTime,
maxTime,
)
})
}
Expand Down
16 changes: 16 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,22 @@ Flags:
--matcher=key="value" ... Only blocks whose external labels exactly match
this matcher will be replicated.
--single-run Run replication only one time, then exit.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to replicate. Thanos
Replicate will replicate only metrics, which
happened later than this value. Option can be a
constant time in RFC3339 format or time
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to replicate. Thanos
Replicate will replicate only metrics, which
happened earlier than this value. Option can be
a constant time in RFC3339 format or time
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.
```

Expand Down
13 changes: 12 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

thanosmodel "github.com/thanos-io/thanos/pkg/model"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -80,6 +82,7 @@ func RunReplicate(
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down Expand Up @@ -161,7 +164,15 @@ func RunReplicate(
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)

fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil)
fetcher, err := thanosblock.NewMetaFetcher(
logger,
32,
fromBkt,
"",
reg,
[]thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)},
nil,
)
if err != nil {
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String())
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
rs.metrics.blocksAlreadyReplicated.Inc()

return nil
Expand Down
10 changes: 1 addition & 9 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) {
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

r := newReplicationScheme(
logger,
newReplicationMetrics(nil),
filter,
fetcher,
objstore.WithNoopInstr(originBucket),
targetBucket,
nil,
)
r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)

err = r.execute(ctx)
testutil.Ok(t, err)
Expand Down

0 comments on commit 3ce844b

Please sign in to comment.