Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPLICATOR: Add TimePartitionMetaFilter to thanos replicate. #2979

Merged
merged 10 commits into from
Nov 3, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility.
- [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range.
- [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`.
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to be albe to replicate blocks within a time frame by passing --min-time and --max-time
OGKevin marked this conversation as resolved.
Show resolved Hide resolved

### Fixed
- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -441,6 +442,10 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
compactions := cmd.Flag("compaction", "Only blocks with these compaction levels will be replicated. Repeated flag.").Default("1", "2", "3", "4").Ints()
matcherStrs := cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()
singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to replicate. Thanos Replicate will replicate only metrics, which happened earlier than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
matchers, err := replicate.ParseFlagMatchers(*matcherStrs)
Expand All @@ -466,6 +471,8 @@ func registerBucketReplicate(app extkingpin.AppClause, objStoreConfig *extflag.P
objStoreConfig,
toObjStoreConfig,
*singleRun,
minTime,
maxTime,
)
})
}
Expand Down
16 changes: 16 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,22 @@ Flags:
--matcher=key="value" ... Only blocks whose external labels exactly match
this matcher will be replicated.
--single-run Run replication only one time, then exit.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to replicate. Thanos
Replicate will replicate only metrics, which
happened later than this value. Option can be a
constant time in RFC3339 format or time
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to replicate. Thanos
Replicate will replicate only metrics, which
happened earlier than this value. Option can be
a constant time in RFC3339 format or time
duration relative to current time, such as -1d
or 2h45m. Valid duration units are ms, s, m, h,
d, w, y.

```

Expand Down
13 changes: 12 additions & 1 deletion pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

thanosmodel "github.com/thanos-io/thanos/pkg/model"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -80,6 +82,7 @@ func RunReplicate(
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
minTime, maxTime *thanosmodel.TimeOrDurationValue,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down Expand Up @@ -161,7 +164,15 @@ func RunReplicate(
replicationRunDuration.WithLabelValues(labelSuccess)
replicationRunDuration.WithLabelValues(labelError)

fetcher, err := thanosblock.NewMetaFetcher(logger, 32, fromBkt, "", reg, nil, nil)
fetcher, err := thanosblock.NewMetaFetcher(
logger,
32,
fromBkt,
"",
reg,
[]thanosblock.MetadataFilter{thanosblock.NewTimePartitionMetaFilter(*minTime, *maxTime)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this only if minTime and maxTime are passed? (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the behaviour be the same? As the max and min are almost infinite. What is the reason for only passing it if its set to non default value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just no risk ;p

nil,
)
if err != nil {
return errors.Wrapf(err, "create meta fetcher with bucket %v", fromBkt)
}
Expand Down
25 changes: 11 additions & 14 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -120,6 +119,8 @@ type replicationMetrics struct {
blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter

blocksMarkedForDeletion prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Expand All @@ -136,19 +137,15 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Name: "thanos_replicate_objects_replicated_total",
Help: "Total number of objects replicated.",
}),
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
OGKevin marked this conversation as resolved.
Show resolved Hide resolved
Name: "thanos_replicate_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
}),
}
return m
}

func newReplicationScheme(
logger log.Logger,
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
to objstore.Bucket,
reg prometheus.Registerer,
) *replicationScheme {
func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme {
OGKevin marked this conversation as resolved.
Show resolved Hide resolved
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -190,7 +187,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
})

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
if err := rs.ensureBlockIsReplicated(ctx, b); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
}
Expand All @@ -200,8 +197,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error {

// ensureBlockIsReplicated ensures that a block present in the origin bucket is
// present in the target bucket.
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error {
blockID := id.String()
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error {
OGKevin marked this conversation as resolved.
Show resolved Hide resolved
blockID := meta.ULID.String()
chunksDir := path.Join(blockID, thanosblock.ChunksDirname)
indexFile := path.Join(blockID, thanosblock.IndexFilename)
metaFile := path.Join(blockID, thanosblock.MetaFilename)
Expand Down Expand Up @@ -241,7 +238,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", id.String())
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String())
rs.metrics.blocksAlreadyReplicated.Inc()

return nil
Expand Down
10 changes: 1 addition & 9 deletions pkg/replicate/scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,7 @@ func TestReplicationSchemeAll(t *testing.T) {
fetcher, err := block.NewMetaFetcher(logger, 32, objstore.WithNoopInstr(originBucket), "", nil, nil, nil)
testutil.Ok(t, err)

r := newReplicationScheme(
logger,
newReplicationMetrics(nil),
filter,
fetcher,
objstore.WithNoopInstr(originBucket),
targetBucket,
nil,
)
r := newReplicationScheme(logger, newReplicationMetrics(nil), filter, fetcher, objstore.WithNoopInstr(originBucket), targetBucket, nil)

err = r.execute(ctx)
testutil.Ok(t, err)
Expand Down