Skip to content
This repository has been archived by the owner on Dec 23, 2022. It is now read-only.

Commit

Permalink
Adding single run option; Making it closer functionally to shipper.
Browse files Browse the repository at this point in the history
Related: #7

* Changed important log lines to Info
* Added check if ext lset are set for Thanos.
* Added single-run option.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Oct 28, 2019
1 parent 1f8a062 commit cb6f353
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 31 deletions.
50 changes: 33 additions & 17 deletions replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"math/rand"
"time"

Expand All @@ -21,16 +22,21 @@ import (

const replicateComponent = "replicate"

// TODO(bwplotka): Consider moving to Thanos. Consider adding --labels to add to meta.json if empty to match shipper
// logic: https://github.com/observatorium/thanos-replicate/issues/7.
func registerReplicate(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "Runs replication as a long running daemon.")

httpMetricsBindAddr := regHTTPAddrFlag(cmd)

// TODO(bwplotka): Add support for local filesystem bucket implementation.
fromObjStoreConfig := regCommonObjStoreFlags(cmd, "from", false)
toObjStoreConfig := regCommonObjStoreFlags(cmd, "to", false)

matcherStrs := cmd.Flag("matcher", "Only blocks whose labels match this matcher will be replicated.").PlaceHolder("key=\"value\"").Strings()

singleRun := cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
matchers, err := parseFlagMatchers(*matcherStrs)
if err != nil {
Expand All @@ -43,9 +49,10 @@ func registerReplicate(m map[string]setupFunc, app *kingpin.Application, name st
reg,
tracer,
*httpMetricsBindAddr,
labels.Selector(matchers),
matchers,
fromObjStoreConfig,
toObjStoreConfig,
*singleRun,
)
}
}
Expand All @@ -59,6 +66,7 @@ func runReplicate(
labelSelector labels.Selector,
fromObjStoreConfig *extflag.PathOrContent,
toObjStoreConfig *extflag.PathOrContent,
singleRun bool,
) error {
logger = log.With(logger, "component", "replicate")

Expand Down Expand Up @@ -104,35 +112,43 @@ func runReplicate(
}, []string{"result"})
reg.MustRegister(replicationRunCounter)

blockFilter := NewBlockFilter(logger, labelSelector).Filter
blockFilter := NewNonCompactedBlockFilter(logger, labelSelector).Filter
metrics := newReplicationMetrics(reg)
ctx, cancel := context.WithCancel(context.Background())

replicateFn := func() error {
timestamp := time.Now()
entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)
ulid, err := ulid.New(ulid.Timestamp(timestamp), entropy)
if err != nil {
return errors.Wrap(err, "generate replication run-id")
}
logger := log.With(logger, "replication-run-id", ulid.String())

level.Info(logger).Log("msg", "running replication attempt")
if err = newReplicationScheme(logger, metrics, blockFilter, fromBkt, toBkt).execute(ctx); err != nil {
return fmt.Errorf("replication execute: %w", err)
}
return nil
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, fromBkt, "from bucket client")
defer runutil.CloseWithLogOnErr(logger, toBkt, "to bucket client")

return runutil.Repeat(time.Minute, ctx.Done(), func() error {
timestamp := time.Now()
entropy := ulid.Monotonic(rand.New(rand.NewSource(timestamp.UnixNano())), 0)
ulid, err := ulid.New(ulid.Timestamp(timestamp), entropy)
if err != nil {
return errors.Wrap(err, "generate replication run-id")
}
logger := log.With(logger, "replication-run-id", ulid.String())
if singleRun {
return replicateFn()
}

level.Info(logger).Log("msg", "running replication attempt")
err = newReplicationScheme(logger, metrics, blockFilter, fromBkt, toBkt).execute(ctx)
if err != nil {
level.Error(logger).Log("msg", "running replicaton failed", "err", err)
return runutil.Repeat(time.Minute, ctx.Done(), func() error {
if err := replicateFn(); err != nil {
level.Error(logger).Log("msg", "running replication failed", "err", err)
replicationRunCounter.WithLabelValues("error").Inc()
// No matter the error we want to repeat indefinitely.
return nil
}

replicationRunCounter.WithLabelValues("success").Inc()
level.Info(logger).Log("msg", "ran replication successfully")

// No matter the error we want to repeat indefinitely.
return nil
})
}, func(error) {
Expand Down
33 changes: 20 additions & 13 deletions scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
)

type BlockFilter struct {
// NonCompactedBlockFilter is block filter that filters out compacted and unselected blocks.
type NonCompactedBlockFilter struct {
logger log.Logger
labelSelector labels.Selector
}

func NewBlockFilter(logger log.Logger, labelSelector labels.Selector) *BlockFilter {
return &BlockFilter{
// NewNonCompactedBlockFilter returns block filter.
func NewNonCompactedBlockFilter(logger log.Logger, labelSelector labels.Selector) *NonCompactedBlockFilter {
return &NonCompactedBlockFilter{
labelSelector: labelSelector,
logger: logger,
}
}

func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
// Filter return true if block is non-compacted and matches selector.
func (bf *NonCompactedBlockFilter) Filter(b *metadata.Meta) bool {
blockLabels := labels.FromMap(b.Thanos.Labels)

labelMatch := bf.labelSelector.Matches(blockLabels)
Expand Down Expand Up @@ -139,7 +143,7 @@ func (rs *replicationScheme) execute(ctx context.Context) error {

level.Debug(rs.logger).Log("msg", "scanning blocks available blocks for replication")

err := rs.fromBkt.Iter(ctx, "", func(name string) error {
if err := rs.fromBkt.Iter(ctx, "", func(name string) error {
rs.metrics.originIterations.Inc()

id, ok := thanosblock.IsBlockDir(name)
Expand All @@ -161,14 +165,18 @@ func (rs *replicationScheme) execute(ctx context.Context) error {
return fmt.Errorf("load meta for block %v from origin bucket: %w", id.String(), err)
}

if len(meta.Thanos.Labels) == 0 {
// TODO(bwplotka): Allow injecting custom labels as shipper does.
level.Info(rs.logger).Log("msg", "block meta without Thanos external labels set. This is not allowed. Skipping.", "block_uuid", id.String())
return nil
}

level.Debug(rs.logger).Log("msg", "adding block to available blocks", "block_uuid", id.String())

availableBlocks = append(availableBlocks, meta)

return nil
})

if err != nil {
}); err != nil {
return fmt.Errorf("iterate over origin bucket: %w", err)
}

Expand Down Expand Up @@ -211,11 +219,11 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
return fmt.Errorf("get meta file from origin bucket: %w", err)
}

defer originMetaFile.Close()
defer runutil.CloseWithLogOnErr(rs.logger, originMetaFile, "close original meta file")

targetMetaFile, err := rs.toBkt.Get(ctx, metaFile)
if targetMetaFile != nil {
defer targetMetaFile.Close()
defer runutil.CloseWithLogOnErr(rs.logger, targetMetaFile, "close target meta file")
}

if err != nil && !rs.toBkt.IsObjNotFoundErr(err) && err != io.EOF {
Expand Down Expand Up @@ -294,12 +302,11 @@ func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectN

defer r.Close()

err = rs.toBkt.Upload(ctx, objectName, r)
if err != nil {
if err = rs.toBkt.Upload(ctx, objectName, r); err != nil {
return fmt.Errorf("upload %v to target bucket: %w", objectName, err)
}

level.Debug(rs.logger).Log("msg", "object replicated", "object", objectName)
level.Info(rs.logger).Log("msg", "object replicated", "object", objectName)
rs.metrics.objectsReplicated.Inc()

return nil
Expand Down
2 changes: 1 addition & 1 deletion scheme_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestReplicationSchemeAll(t *testing.T) {
if c.selector != nil {
selector = c.selector
}
filter := NewBlockFilter(logger, selector).Filter
filter := NewNonCompactedBlockFilter(logger, selector).Filter

r := newReplicationScheme(
logger,
Expand Down

0 comments on commit cb6f353

Please sign in to comment.