Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPLICATOR: Add TimePartitionMetaFilter to thanos replicate. #2979

Merged
merged 10 commits into from
Nov 3, 2020
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3312](https://github.com/thanos-io/thanos/pull/3312) s3: add list_objects_version config option for compatibility.
- [#3356](https://github.com/thanos-io/thanos/pull/3356) Query Frontend: Add a flag to disable step alignment middleware for query range.
- [#3378](https://github.com/thanos-io/thanos/pull/3378) Ruler: added the ability to send queries via the HTTP method POST. Helps when alerting/recording rules are extra long because it encodes the actual parameters inside of the body instead of the URI. Thanos Ruler now uses POST by default unless `--query.http-method` is set `GET`.
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to be albe to replicate blocks within a time frame by passing --min-time and --max-time
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-tim
OGKevin marked this conversation as resolved.
Show resolved Hide resolved

### Fixed
- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
Expand Down
25 changes: 14 additions & 11 deletions pkg/replicate/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -119,8 +120,6 @@ type replicationMetrics struct {
blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter

blocksMarkedForDeletion prometheus.Counter
}

func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Expand All @@ -137,15 +136,19 @@ func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
Name: "thanos_replicate_objects_replicated_total",
Help: "Total number of objects replicated.",
}),
blocksMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
}),
}
return m
}

func newReplicationScheme(logger log.Logger, metrics *replicationMetrics, blockFilter blockFilterFunc, fetcher thanosblock.MetadataFetcher, from objstore.InstrumentedBucketReader, to objstore.Bucket, reg prometheus.Registerer) *replicationScheme {
func newReplicationScheme(
logger log.Logger,
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
to objstore.Bucket,
reg prometheus.Registerer,
) *replicationScheme {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -187,7 +190,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
})

for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b); err != nil {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
}
Expand All @@ -197,8 +200,8 @@ func (rs *replicationScheme) execute(ctx context.Context) error {

// ensureBlockIsReplicated ensures that a block present in the origin bucket is
// present in the target bucket.
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *metadata.Meta) error {
blockID := meta.ULID.String()
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error {
blockID := id.String()
chunksDir := path.Join(blockID, thanosblock.ChunksDirname)
indexFile := path.Join(blockID, thanosblock.IndexFilename)
metaFile := path.Join(blockID, thanosblock.MetaFilename)
Expand Down Expand Up @@ -238,7 +241,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, meta *
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", meta.ULID.String())
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
rs.metrics.blocksAlreadyReplicated.Inc()

return nil
Expand Down