From 466d5b0c180643d5afd436620c5fd3dc92368d1c Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Tue, 25 Sep 2018 17:52:03 +0100 Subject: [PATCH] compact: add backoff to the retry to upload/download buckets Add backoff reply for a single object storage query request, except Range and Iter methods. Error handler splits errors on net/http and others, and replies the request to the object storage for the former. Fixes #318 --- Gopkg.lock | 9 + cmd/thanos/bucket.go | 6 +- cmd/thanos/compact.go | 15 +- cmd/thanos/downsample.go | 25 +-- cmd/thanos/query.go | 7 +- cmd/thanos/rule.go | 10 +- cmd/thanos/sidecar.go | 19 +- cmd/thanos/store.go | 7 +- docs/components/compact.md | 4 +- pkg/block/block.go | 35 +-- pkg/cluster/cluster.go | 4 +- pkg/cluster/cluster_test.go | 5 +- pkg/compact/compact.go | 2 +- pkg/compact/retention.go | 2 +- pkg/objstore/metric_bucket.go | 203 ++++++++++++++++++ pkg/objstore/objstore.go | 339 ++++++++++++------------------ pkg/reloader/reloader.go | 3 +- pkg/runutil/runutil.go | 32 ++- pkg/shipper/shipper.go | 2 +- pkg/store/bucket.go | 22 ++ pkg/store/bucket_e2e_test.go | 5 +- pkg/verifier/index_issue.go | 2 +- pkg/verifier/overlapped_blocks.go | 2 +- test/e2e/query_test.go | 5 +- test/e2e/rule_test.go | 3 +- 25 files changed, 471 insertions(+), 297 deletions(-) create mode 100644 pkg/objstore/metric_bucket.go diff --git a/Gopkg.lock b/Gopkg.lock index 511d81c3d54..7b8dd0bd483 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -71,6 +71,14 @@ pruneopts = "" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" +[[projects]] + digest = "1:f619cb9b07aebe5416262cdd8b86082e8d5bdc5264cb3b615ff858df0b645f97" + name = "github.com/cenkalti/backoff" + packages = ["."] + pruneopts = "" + revision = "2ea60e5f094469f9e65adb9cd103795b73ae743e" + version = "v2.0.0" + [[projects]] digest = "1:1660bb2e30cca08494f29b5593e387c6090fbe8936970ba947185b0ca000aec0" name = "github.com/cespare/xxhash" @@ -710,6 +718,7 @@ "github.com/NYTimes/gziphandler", "github.com/armon/go-metrics", "github.com/armon/go-metrics/prometheus", + "github.com/cenkalti/backoff", "github.com/fortytw2/leaktest", "github.com/fsnotify/fsnotify", "github.com/go-kit/kit/log", diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index a51a4d14dc7..06b054797ce 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -147,7 +147,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin } case "wide": printBlock = func(id ulid.ULID) error { - m, err := block.DownloadMeta(ctx, logger, bkt, id) + m, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return err } @@ -167,7 +167,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin enc.SetIndent("", "\t") printBlock = func(id ulid.ULID) error { - m, err := block.DownloadMeta(ctx, logger, bkt, id) + m, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return err } @@ -179,7 +179,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin return errors.Wrap(err, "invalid template") } printBlock = func(id ulid.ULID) error { - m, err := block.DownloadMeta(ctx, logger, bkt, id) + m, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return err } diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 42f5a1a344c..5ba572da92c 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -6,6 +6,7 @@ import ( "path" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/compact" @@ -41,8 +42,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri retention5m := modelDuration(cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. 0d - disables this retention").Default("0d")) retention1h := modelDuration(cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. 0d - disables this retention").Default("0d")) - wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). - Short('w').Bool() + repeat := cmd.Flag("repeat", "Do not exit after all compactions have been processed and repeat after N minutes for new work."). + Short('r').Default("5m").Duration() // TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed. disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+ @@ -56,7 +57,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *bucketConfFile, time.Duration(*syncDelay), *haltOnError, - *wait, + *repeat, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), compact.ResolutionLevel5m: time.Duration(*retention5m), @@ -77,7 +78,7 @@ func runCompact( bucketConfFile string, syncDelay time.Duration, haltOnError bool, - wait bool, + repeat time.Duration, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, disableDownsampling bool, @@ -178,12 +179,12 @@ func runCompact( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - if !wait { + if repeat == 0 { return f() } - // --wait=true is specified. - return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { + // --repeat is specified. + return runutil.Repeat(backoff.NewConstantBackOff(repeat), ctx.Done(), func() error { err := f() if err == nil { return nil diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index a6e92a696f0..06356cf6b55 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -2,12 +2,11 @@ package main import ( "context" - "encoding/json" "os" - "path" "path/filepath" "time" + "github.com/improbable-eng/thanos/pkg/store" "github.com/prometheus/tsdb/chunkenc" "github.com/go-kit/kit/log" @@ -102,28 +101,8 @@ func downsampleBucket( if err := os.MkdirAll(dir, 0777); err != nil { return errors.Wrap(err, "create dir") } - var metas []*block.Meta - err := bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - - rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename)) - if err != nil { - return errors.Wrapf(err, "get meta for block %s", id) - } - defer runutil.CloseWithLogOnErr(logger, rc, "block reader") - - var m block.Meta - if err := json.NewDecoder(rc).Decode(&m); err != nil { - return errors.Wrap(err, "decode meta") - } - metas = append(metas, &m) - - return nil - }) + metas, err := store.GetMetas(ctx, logger, bkt) if err != nil { return errors.Wrap(err, "retrieve bucket block metas") } diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index a692b960e07..09bd6105d12 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -11,6 +11,7 @@ import ( "net/http" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/grpc-ecosystem/go-grpc-middleware" @@ -35,6 +36,10 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +const ( + DefaultStorageSyncInterval = 5 * time.Second +) + // registerQuery registers a query command. func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") @@ -255,7 +260,7 @@ func runQuery( { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { + return runutil.Repeat(backoff.NewConstantBackOff(DefaultStorageSyncInterval), ctx.Done(), func() error { stores.Update(ctx) return nil }) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 0319caea1b8..89be6ae4f96 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,6 +19,7 @@ import ( "syscall" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" @@ -47,6 +48,11 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +const ( + AlertManagerUpdateInterval = 30 * time.Second + RuleSyncInterval = 30 * time.Second +) + // registerRule registers a rule command. func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket") @@ -292,7 +298,7 @@ func runRule( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.Repeat(backoff.NewConstantBackOff(AlertManagerUpdateInterval), ctx.Done(), func() error { if err := alertmgrs.update(ctx); err != nil { level.Warn(logger).Log("msg", "refreshing Alertmanagers failed", "err", err) } @@ -445,7 +451,7 @@ func runRule( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.Repeat(backoff.NewConstantBackOff(RuleSyncInterval), ctx.Done(), func() error { s.Sync(ctx) minTime, _, err := s.Timestamps() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index c9d24f7cae4..c6251e98446 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -31,6 +32,12 @@ import ( yaml "gopkg.in/yaml.v2" ) +const ( + PrometheusReloadInterval = 30 * time.Second + PrometheusHealthCheckInterval = 2 * time.Second + ShipperSyncInterval = 30 * time.Second +) + func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "sidecar for Prometheus server") @@ -127,7 +134,7 @@ func runSidecar( g.Add(func() error { // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. - err := runutil.Retry(2*time.Second, ctx.Done(), func() error { + err := runutil.Retry(backoff.NewConstantBackOff(PrometheusHealthCheckInterval), ctx.Done(), func() error { if err := metadata.UpdateLabels(ctx, logger); err != nil { level.Warn(logger).Log( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", @@ -161,7 +168,7 @@ func runSidecar( // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating // the external labels we apply. - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.Repeat(backoff.NewConstantBackOff(PrometheusReloadInterval), ctx.Done(), func() error { iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second) defer iterCancel() @@ -201,10 +208,10 @@ func runSidecar( } logger := log.With(logger, "component", "sidecar") - var client http.Client + var c http.Client promStore, err := store.NewPrometheusStore( - logger, &client, promURL, metadata.Labels, metadata.Timestamps) + logger, &c, promURL, metadata.Labels, metadata.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -228,7 +235,7 @@ func runSidecar( var uploads = true // The background shipper continuously scans the data directory and uploads - // new blocks to Google Cloud Storage or an S3-compatible storage service. + // new blocks to object storage service. bkt, err := client.NewBucket(logger, bucketConfFile, reg, component) if err != nil && err != client.ErrNotFound { return err @@ -253,7 +260,7 @@ func runSidecar( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.Repeat(backoff.NewConstantBackOff(ShipperSyncInterval), ctx.Done(), func() error { s.Sync(ctx) minTime, _, err := s.Timestamps() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 3f6dd29f094..d4f9554f6a3 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/cluster" @@ -21,6 +22,10 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +const ( + StoreSyncInterval = 3 * time.Minute +) + // registerStore registers a store command. func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) { cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS / S3.") @@ -120,7 +125,7 @@ func runStore( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - err := runutil.Repeat(3*time.Minute, ctx.Done(), func() error { + err := runutil.Repeat(backoff.NewConstantBackOff(StoreSyncInterval), ctx.Done(), func() error { if err := bs.SyncBlocks(ctx); err != nil { level.Warn(logger).Log("msg", "syncing blocks failed", "err", err) } diff --git a/docs/components/compact.md b/docs/components/compact.md index df738f747f2..eca249c30a0 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -60,7 +60,7 @@ Flags: --retention.resolution-1h=0d How long to retain samples of resolution 2 (1 hour) in bucket. 0d - disables this retention - -w, --wait Do not exit after all compactions have been processed - and wait for new work. + -r, --repeat=5m Do not exit after all compactions have been processed + and repeat after N minutes for new work. ``` diff --git a/pkg/block/block.go b/pkg/block/block.go index 118b7ea96c8..c47f222b44c 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -5,13 +5,13 @@ package block import ( "context" "encoding/json" + "fmt" "io/ioutil" "os" "path" "path/filepath" - "fmt" - + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/runutil" @@ -71,8 +71,8 @@ type ThanosDownsampleMeta struct { // WriteMetaFile writes the given meta into /meta.json. func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { // Make any changes to the file appear atomic. - path := filepath.Join(dir, MetaFilename) - tmp := path + ".tmp" + pathMetaFilename := filepath.Join(dir, MetaFilename) + tmp := pathMetaFilename + ".tmp" f, err := os.Create(tmp) if err != nil { @@ -89,7 +89,7 @@ func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error { if err := f.Close(); err != nil { return err } - return renameFile(logger, tmp, path) + return renameFile(logger, tmp, pathMetaFilename) } // ReadMetaFile reads the given meta from /meta.json. @@ -209,23 +209,30 @@ func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error { return err } -// Delete removes directory that is mean to be block directory. +// Delete removes directory that means to be block directory. // NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake. func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { + // Delete directory. + del := func() error { + return objstore.DeleteDir(ctx, bucket, id.String()) + } + // Retry uploading operations on the network failures, see closer to default backoff configs. + if err := backoff.Retry(del, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil { + return errors.Wrapf(err, "delete block dir %s", id.String()) + } return objstore.DeleteDir(ctx, bucket, id.String()) } -// DownloadMeta downloads only meta file from bucket by block ID. -func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) { - rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename)) +// GetMeta downloads only meta file from bucket by block ID. +func GetMeta(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID) (Meta, error) { + var m Meta + data, err := objstore.GetFile(ctx, logger, bucket, path.Join(id.String(), MetaFilename)) if err != nil { - return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + return m, errors.Wrapf(err, "get meta for block %s", id) } - defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client") - var m Meta - if err := json.NewDecoder(rc).Decode(&m); err != nil { - return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String()) + if err := json.Unmarshal(data, &m); err != nil { + return m, errors.Wrapf(err, "decode meta.json for block %s", id.String()) } return m, nil } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ba1ecb1aead..92869f49444 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/hashicorp/memberlist" @@ -45,6 +46,7 @@ type Peer struct { const ( DefaultRefreshInterval = model.Duration(60 * time.Second) + PeerSyncInterval = 2 * time.Second // Peer's network types. These are used as a predefined peer configurations for a specified network type. LocalNetworkPeerType = "local" @@ -462,7 +464,7 @@ func resolvePeers(ctx context.Context, peers []string, myAddress string, res net retryCtx, cancel := context.WithCancel(ctx) defer cancel() - err := runutil.Retry(2*time.Second, retryCtx.Done(), func() error { + err := runutil.Retry(backoff.NewConstantBackOff(PeerSyncInterval), retryCtx.Done(), func() error { if lookupErrSpotted { // We need to invoke cancel in next run of retry when lookupErrSpotted to preserve LookupIPAddr error. cancel() diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 1f9b2e3d520..3fb3e49fdb3 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/cenkalti/backoff" "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/hashicorp/go-sockaddr" @@ -91,7 +92,7 @@ func TestPeers_PropagatingState(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { + testutil.Ok(t, runutil.Retry(backoff.NewConstantBackOff(1*time.Second), ctx.Done(), func() error { if len(peer1.data.Data()) > 1 { return nil } @@ -126,7 +127,7 @@ func TestPeers_PropagatingState(t *testing.T) { // Check if peer2 got the updated meta about peer1. ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) defer cancel2() - testutil.Ok(t, runutil.Retry(1*time.Second, ctx2.Done(), func() error { + testutil.Ok(t, runutil.Retry(backoff.NewConstantBackOff(1*time.Second), ctx2.Done(), func() error { for _, st := range peer2.PeerStates(PeerTypeSource) { if st.StoreAPIAddr != "sidecar-address:1" { continue diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 09810d59435..15c8525a0f2 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -173,7 +173,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { level.Debug(c.logger).Log("msg", "download meta", "block", id) - meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) + meta, err := block.GetMeta(ctx, c.logger, c.bkt, id) if err != nil { return errors.Wrapf(err, "downloading meta.json for %s", id) } diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index a4348570bbb..c2b1fb5a86a 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -20,7 +20,7 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk if !ok { return nil } - m, err := block.DownloadMeta(ctx, logger, bkt, id) + m, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return errors.Wrap(err, "download metadata") } diff --git a/pkg/objstore/metric_bucket.go b/pkg/objstore/metric_bucket.go new file mode 100644 index 00000000000..c1df4e3c7ef --- /dev/null +++ b/pkg/objstore/metric_bucket.go @@ -0,0 +1,203 @@ +package objstore + +import ( + "context" + "io" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +// BucketWithMetrics takes a bucket and registers metrics with the given registry for +// operations run against the bucket. +func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { + bkt := &metricBucket{ + bkt: b, + + ops: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_objstore_bucket_operations_total", + Help: "Total number of operations against a bucket.", + ConstLabels: prometheus.Labels{"bucket": name}, + }, []string{"operation"}), + + opsFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_objstore_bucket_operation_failures_total", + Help: "Total number of operations against a bucket that failed.", + ConstLabels: prometheus.Labels{"bucket": name}, + }, []string{"operation"}), + + opsDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_objstore_bucket_operation_duration_seconds", + Help: "Duration of operations against the bucket", + ConstLabels: prometheus.Labels{"bucket": name}, + Buckets: []float64{0.005, 0.01, 0.02, 0.04, 0.08, 0.15, 0.3, 0.6, 1, 1.5, 2.5, 5, 10, 20, 30}, + }, []string{"operation"}), + lastSuccessfullUploadTime: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_objstore_bucket_last_successful_upload_time", + Help: "Second timestamp of the last successful upload to the bucket.", + ConstLabels: prometheus.Labels{"bucket": name}}), + } + if r != nil { + r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfullUploadTime) + } + return bkt +} + +type metricBucket struct { + bkt Bucket + + ops *prometheus.CounterVec + opsFailures *prometheus.CounterVec + opsDuration *prometheus.HistogramVec + lastSuccessfullUploadTime prometheus.Gauge +} + +func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error { + const op = "iter" + + err := b.bkt.Iter(ctx, dir, f) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + } + b.ops.WithLabelValues(op).Inc() + + return err +} + +func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + const op = "get" + b.ops.WithLabelValues(op).Inc() + + rc, err := b.bkt.Get(ctx, name) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + return nil, err + } + rc = newTimingReadCloser( + rc, + op, + b.opsDuration, + b.opsFailures, + ) + + return rc, nil +} + +func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + const op = "get_range" + b.ops.WithLabelValues(op).Inc() + + rc, err := b.bkt.GetRange(ctx, name, off, length) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + return nil, err + } + rc = newTimingReadCloser( + rc, + op, + b.opsDuration, + b.opsFailures, + ) + + return rc, nil +} + +func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { + const op = "exists" + start := time.Now() + + ok, err := b.bkt.Exists(ctx, name) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + } + b.ops.WithLabelValues(op).Inc() + b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + + return ok, err +} + +func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { + const op = "upload" + start := time.Now() + + err := b.bkt.Upload(ctx, name, r) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + } else { + //TODO: Use SetToCurrentTime() once we update the Prometheus client_golang + b.lastSuccessfullUploadTime.Set(float64(time.Now().UnixNano()) / 1e9) + } + b.ops.WithLabelValues(op).Inc() + b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + + return err +} + +func (b *metricBucket) Delete(ctx context.Context, name string) error { + const op = "delete" + start := time.Now() + + err := b.bkt.Delete(ctx, name) + if err != nil { + b.opsFailures.WithLabelValues(op).Inc() + } + b.ops.WithLabelValues(op).Inc() + b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + + return err +} + +func (b *metricBucket) IsObjNotFoundErr(err error) bool { + return b.bkt.IsObjNotFoundErr(err) +} + +func (b *metricBucket) Close() error { + return b.bkt.Close() +} + +func (b *metricBucket) Name() string { + return b.bkt.Name() +} + +type timingReadCloser struct { + io.ReadCloser + + ok bool + start time.Time + op string + duration *prometheus.HistogramVec + failed *prometheus.CounterVec +} + +func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec) *timingReadCloser { + // Initialize the metrics with 0. + dur.WithLabelValues(op) + failed.WithLabelValues(op) + return &timingReadCloser{ + ReadCloser: rc, + ok: true, + start: time.Now(), + op: op, + duration: dur, + failed: failed, + } +} + +func (rc *timingReadCloser) Close() error { + err := rc.ReadCloser.Close() + rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds()) + if rc.ok && err != nil { + rc.failed.WithLabelValues(rc.op).Inc() + rc.ok = false + } + return err +} + +func (rc *timingReadCloser) Read(b []byte) (n int, err error) { + n, err = rc.ReadCloser.Read(b) + if rc.ok && err != nil && err != io.EOF { + rc.failed.WithLabelValues(rc.op).Inc() + rc.ok = false + } + return n, err +} diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 81ae6fbb8fb..51578235137 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -1,18 +1,20 @@ package objstore import ( + "bytes" "context" "io" + "net" "os" "path/filepath" "strings" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" ) // Bucket provides read and write access to an object storage bucket. @@ -82,7 +84,19 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str } defer runutil.CloseWithLogOnErr(logger, r, "close file %s", src) - if err := bkt.Upload(ctx, dst, r); err != nil { + errorNotifier := func(err error, d time.Duration) { + level.Warn(logger).Log("msg", "unsuccessful attempt uploading", "src", src, "dst", dst, "err", err, "next", d.String()) + } + + do := func() error { + if err := bkt.Upload(ctx, dst, r); err != nil { + return handleErrors(errors.Wrapf(err, "upload file %s", dst)) + } + return nil + } + + // TODO: should we clean an uploaded file before retry? + if err := backoff.RetryNotify(do, getCustomBackoff(ctx), errorNotifier); err != nil { return errors.Wrapf(err, "upload file %s as %s", src, dst) } return nil @@ -92,16 +106,61 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str const DirDelim = "/" // DeleteDir removes all objects prefixed with dir from the bucket. -func DeleteDir(ctx context.Context, bkt Bucket, dir string) error { - return bkt.Iter(ctx, dir, func(name string) error { +func DeleteDir(ctx context.Context, bucket Bucket, dir string) error { + // Delete directory. + do := func() error { + if err := deleteDir(ctx, bucket, dir); err != nil { + return handleErrors(errors.Wrapf(err, "delete block dir %s", dir)) + } + return nil + } + + if err := backoff.Retry(do, getCustomBackoff(ctx)); err != nil { + return errors.Wrapf(err, "delete block dir %s", dir) + } + return nil +} + +// deleteDir is an internal function to delete directory recursively without retries. +func deleteDir(ctx context.Context, bucket Bucket, dir string) error { + return bucket.Iter(ctx, dir, func(name string) error { // If we hit a directory, call DeleteDir recursively. if strings.HasSuffix(name, DirDelim) { - return DeleteDir(ctx, bkt, name) + return deleteDir(ctx, bucket, name) } - return bkt.Delete(ctx, name) + return bucket.Delete(ctx, name) }) } +// GetFile downloads file from bucket and returns bytes. +func GetFile(ctx context.Context, logger log.Logger, bucket BucketReader, src string) ([]byte, error) { + buf := bytes.NewBuffer(nil) + + errorNotifier := func(err error, d time.Duration) { + level.Warn(logger).Log("msg", "unsuccessful attempt to download", "err", err, "next", d.String()) + } + + do := func() error { + rc, err := bucket.Get(ctx, src) + if err != nil { + return handleErrors(errors.Wrapf(err, "download file %s from bucket", src)) + } + defer runutil.CloseWithLogOnErr(logger, rc, "download file") + + if _, err := io.Copy(buf, rc); err != nil { + return handleErrors(errors.Wrap(err, "download file")) + } + + return nil + } + + if err := backoff.RetryNotify(do, getCustomBackoff(ctx), errorNotifier); err != nil { + return nil, errors.Wrapf(err, "download file %s", src) + } + + return buf.Bytes(), nil +} + // DownloadFile downloads the src file from the bucket to dst. If dst is an existing // directory, a file with the same name as the source is created in dst. // If destination file is already existing, download file will overwrite it. @@ -114,12 +173,6 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, return err } - rc, err := bkt.Get(ctx, src) - if err != nil { - return errors.Wrap(err, "get file") - } - defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") - f, err := os.Create(dst) if err != nil { return errors.Wrap(err, "create file") @@ -133,232 +186,104 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } } }() - if _, err = io.Copy(f, rc); err != nil { - return errors.Wrap(err, "copy object to file") - } - return nil -} -// DownloadDir downloads all object found in the directory into the local directory. -func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) error { - if err := os.MkdirAll(dst, 0777); err != nil { - return errors.Wrap(err, "create dir") + errorNotifier := func(err error, d time.Duration) { + level.Warn(logger).Log("msg", "unsuccessful attempt to download", "err", err, "next", d.String()) } - var downloadedFiles []string - if err := bkt.Iter(ctx, src, func(name string) error { - if strings.HasSuffix(name, DirDelim) { - return DownloadDir(ctx, logger, bkt, name, filepath.Join(dst, filepath.Base(name))) - } - if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { - return err + do := func() error { + rc, err := bkt.Get(ctx, src) + if err != nil { + return handleErrors(errors.Wrap(err, "get file")) } + defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") - downloadedFiles = append(downloadedFiles, dst) - return nil - }); err != nil { - // Best-effort cleanup if the download failed. - for _, f := range downloadedFiles { - if rerr := os.Remove(f); rerr != nil { - level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rerr) - } + if _, err = io.Copy(f, rc); err != nil { + return handleErrors(errors.Wrap(err, "copy object to file")) } - return err - } - - return nil -} - -// BucketWithMetrics takes a bucket and registers metrics with the given registry for -// operations run against the bucket. -func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { - bkt := &metricBucket{ - bkt: b, - - ops: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_objstore_bucket_operations_total", - Help: "Total number of operations against a bucket.", - ConstLabels: prometheus.Labels{"bucket": name}, - }, []string{"operation"}), - - opsFailures: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "thanos_objstore_bucket_operation_failures_total", - Help: "Total number of operations against a bucket that failed.", - ConstLabels: prometheus.Labels{"bucket": name}, - }, []string{"operation"}), - - opsDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "thanos_objstore_bucket_operation_duration_seconds", - Help: "Duration of operations against the bucket", - ConstLabels: prometheus.Labels{"bucket": name}, - Buckets: []float64{0.005, 0.01, 0.02, 0.04, 0.08, 0.15, 0.3, 0.6, 1, 1.5, 2.5, 5, 10, 20, 30}, - }, []string{"operation"}), - lastSuccessfullUploadTime: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "thanos_objstore_bucket_last_successful_upload_time", - Help: "Second timestamp of the last successful upload to the bucket.", - ConstLabels: prometheus.Labels{"bucket": name}}), - } - if r != nil { - r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfullUploadTime) + return nil } - return bkt -} - -type metricBucket struct { - bkt Bucket - - ops *prometheus.CounterVec - opsFailures *prometheus.CounterVec - opsDuration *prometheus.HistogramVec - lastSuccessfullUploadTime prometheus.Gauge -} - -func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error { - const op = "iter" - - err := b.bkt.Iter(ctx, dir, f) + err = backoff.RetryNotify(do, getCustomBackoff(ctx), errorNotifier) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + return errors.Wrapf(err, "failed to download file %s", src) } - b.ops.WithLabelValues(op).Inc() - - return err + return nil } -func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - const op = "get" - b.ops.WithLabelValues(op).Inc() - - rc, err := b.bkt.Get(ctx, name) - if err != nil { - b.opsFailures.WithLabelValues(op).Inc() - return nil, err - } - rc = newTimingReadCloser( - rc, - op, - b.opsDuration, - b.opsFailures, +// DownloadDir downloads all object found in the directory into the local directory. +func DownloadDir(ctx context.Context, logger log.Logger, bucket BucketReader, src, dst string) error { + var ( + err error + downloadedFiles []string ) - - return rc, nil -} - -func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - const op = "get_range" - b.ops.WithLabelValues(op).Inc() - - rc, err := b.bkt.GetRange(ctx, name, off, length) + err = os.MkdirAll(dst, 0777) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() - return nil, err + return errors.Wrap(err, "create dir") } - rc = newTimingReadCloser( - rc, - op, - b.opsDuration, - b.opsFailures, - ) - return rc, nil -} + defer func() { + if err != nil { + // Best-effort cleanup if the download failed. + for _, f := range downloadedFiles { + if rErr := os.Remove(f); rErr != nil { + level.Warn(logger).Log("msg", "failed to remove file on partial dir download error", "file", f, "err", rErr) + } + } + } + }() -func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { - const op = "exists" - start := time.Now() + // TODO(xjewer): handle iterations, what's the best effort? + err = bucket.Iter(ctx, src, func(name string) error { + if strings.HasSuffix(name, DirDelim) { + return DownloadDir(ctx, logger, bucket, name, filepath.Join(dst, filepath.Base(name))) + } + if err := DownloadFile(ctx, logger, bucket, name, dst); err != nil { + return err + } - ok, err := b.bkt.Exists(ctx, name) - if err != nil { - b.opsFailures.WithLabelValues(op).Inc() - } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + downloadedFiles = append(downloadedFiles, dst) + return nil + }) - return ok, err + return nil } -func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { - const op = "upload" - start := time.Now() +// Exists checks if file exists with backoff retries. +func Exists(ctx context.Context, logger log.Logger, bucket BucketReader, src string) (bool, error) { + var ok bool - err := b.bkt.Upload(ctx, name, r) - if err != nil { - b.opsFailures.WithLabelValues(op).Inc() - } else { - //TODO: Use SetToCurrentTime() once we update the Prometheus client_golang - b.lastSuccessfullUploadTime.Set(float64(time.Now().UnixNano()) / 1e9) + errorNotifier := func(err error, d time.Duration) { + level.Warn(logger).Log("msg", "unsuccessful attempt getting the file/dir", "src", src, "err", err, "next", d.String()) } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) - - return err -} -func (b *metricBucket) Delete(ctx context.Context, name string) error { - const op = "delete" - start := time.Now() + do := func() error { + var err error + ok, err = bucket.Exists(ctx, src) + if err != nil { + return handleErrors(errors.Wrap(err, "check exists")) + } + return nil + } - err := b.bkt.Delete(ctx, name) + err := backoff.RetryNotify(do, getCustomBackoff(ctx), errorNotifier) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + return ok, errors.Wrapf(err, "failed to check file %s", src) } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) - return err + return ok, nil } -func (b *metricBucket) IsObjNotFoundErr(err error) bool { - return b.bkt.IsObjNotFoundErr(err) -} - -func (b *metricBucket) Close() error { - return b.bkt.Close() -} - -func (b *metricBucket) Name() string { - return b.bkt.Name() -} - -type timingReadCloser struct { - io.ReadCloser - - ok bool - start time.Time - op string - duration *prometheus.HistogramVec - failed *prometheus.CounterVec -} - -func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec) *timingReadCloser { - // Initialize the metrics with 0. - dur.WithLabelValues(op) - failed.WithLabelValues(op) - return &timingReadCloser{ - ReadCloser: rc, - ok: true, - start: time.Now(), - op: op, - duration: dur, - failed: failed, - } -} - -func (rc *timingReadCloser) Close() error { - err := rc.ReadCloser.Close() - rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds()) - if rc.ok && err != nil { - rc.failed.WithLabelValues(rc.op).Inc() - rc.ok = false +// handleErrors handles net error and wraps in permanent otherwise. +func handleErrors(err error) error { + cErr := errors.Cause(err) + if _, ok := cErr.(net.Error); !ok { + return backoff.Permanent(errors.Wrap(err, "permanent error")) } return err } -func (rc *timingReadCloser) Read(b []byte) (n int, err error) { - n, err = rc.ReadCloser.Read(b) - if rc.ok && err != nil && err != io.EOF { - rc.failed.WithLabelValues(rc.op).Inc() - rc.ok = false - } - return n, err +// getCustomBackoff returns custom backoff to be used to retry operations. +func getCustomBackoff(ctx context.Context) backoff.BackOff { + // TODO(xjewer): customize backoff with the data from context + return backoff.WithContext(backoff.NewExponentialBackOff(), ctx) } diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 34e266e6f38..bf3d5eac28a 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -18,6 +18,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff" "github.com/fsnotify/fsnotify" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -179,7 +180,7 @@ func (r *Reloader) apply(ctx context.Context) error { // Retry trigger reload until it succeeded or next tick is near. retryCtx, cancel := context.WithTimeout(ctx, r.ruleInterval) - err := runutil.RetryWithLog(r.logger, r.retryInterval, retryCtx.Done(), func() error { + err := runutil.RetryWithLog(r.logger, backoff.NewConstantBackOff(r.retryInterval), retryCtx.Done(), func() error { if err := r.triggerReload(ctx); err != nil { return errors.Wrap(err, "trigger reload") } diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index e10732cf3ff..92629c314a9 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -1,23 +1,21 @@ package runutil import ( - "os" - "time" - - "io" - "fmt" + "io" + "os" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" ) // Repeat executes f every interval seconds until stopc is closed. -// It executes f once right after being called. -func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error { - tick := time.NewTicker(interval) - defer tick.Stop() +// It executes f right after being called. +func Repeat(b backoff.BackOff, stopc <-chan struct{}, f func() error) error { + ticker := backoff.NewTicker(b) + defer ticker.Stop() for { if err := f(); err != nil { @@ -26,20 +24,20 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error select { case <-stopc: return nil - case <-tick.C: + case <-ticker.C: } } } -// Retry executes f every interval seconds until timeout or no error is returned from f. -func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error { - return RetryWithLog(log.NewNopLogger(), interval, stopc, f) +// Retry executes f regarding backoff intervals until timeout or no error is returned from f. +func Retry(b backoff.BackOff, stopc <-chan struct{}, f func() error) error { + return RetryWithLog(log.NewNopLogger(), b, stopc, f) } // RetryWithLog executes f every interval seconds until timeout or no error is returned from f. It logs an error on each f error. -func RetryWithLog(logger log.Logger, interval time.Duration, stopc <-chan struct{}, f func() error) error { - tick := time.NewTicker(interval) - defer tick.Stop() +func RetryWithLog(logger log.Logger, b backoff.BackOff, stopc <-chan struct{}, f func() error) error { + ticker := backoff.NewTicker(b) + defer ticker.Stop() var err error for { @@ -50,7 +48,7 @@ func RetryWithLog(logger log.Logger, interval time.Duration, stopc <-chan struct select { case <-stopc: return err - case <-tick.C: + case <-ticker.C: } } } diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c1df9a9b7e..2c8bc359101 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -190,7 +190,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { } // Check against bucket if the meta file for this block exists. - ok, err := s.bucket.Exists(ctx, path.Join(meta.ULID.String(), block.MetaFilename)) + ok, err := objstore.Exists(ctx, s.logger, s.bucket, path.Join(meta.ULID.String(), block.MetaFilename)) if err != nil { return errors.Wrap(err, "check exists") } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b8516b0e4e9..ae97611ea1b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1627,3 +1627,25 @@ func (s queryStats) merge(o *queryStats) *queryStats { return &s } + +// GetMetas gets all bucket metas +func GetMetas(ctx context.Context, logger log.Logger, bkt objstore.Bucket) ([]*block.Meta, error) { + var metas []*block.Meta + + err := bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + m, err := block.GetMeta(ctx, logger, bkt, id) + if err != nil { + return errors.Wrap(err, "decode meta") + } + + metas = append(metas, &m) + return nil + }) + + return metas, err +} diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 53ecc3f3477..74c9f0907a9 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/cenkalti/backoff" "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/objstore" @@ -82,7 +83,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) go func() { - if err := runutil.Repeat(100*time.Millisecond, ctx.Done(), func() error { + if err := runutil.Repeat(backoff.NewConstantBackOff(100*time.Millisecond), ctx.Done(), func() error { return store.SyncBlocks(ctx) }); err != nil && errors.Cause(err) != context.Canceled { t.Error(err) @@ -92,7 +93,7 @@ func TestBucketStore_e2e(t *testing.T) { ctx, _ = context.WithTimeout(ctx, 30*time.Second) - err = runutil.Retry(100*time.Millisecond, ctx.Done(), func() error { + err = runutil.Retry(backoff.NewConstantBackOff(100*time.Millisecond), ctx.Done(), func() error { if store.numBlocks() < 6 { return errors.New("not all blocks loaded") } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 54a20703d45..83a699b4740 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -50,7 +50,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) } - meta, err := block.DownloadMeta(ctx, logger, bkt, id) + meta, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return errors.Wrapf(err, "download meta file %s", id) } diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 675442ace74..6d324bff40c 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -52,7 +52,7 @@ func fetchOverlaps(ctx context.Context, logger log.Logger, bkt objstore.Bucket) return nil } - m, err := block.DownloadMeta(ctx, logger, bkt, id) + m, err := block.GetMeta(ctx, logger, bkt, id) if err != nil { return err } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index c43e1b74ac5..ffbca0002e4 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/cenkalti/backoff" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" @@ -90,7 +91,7 @@ scrape_configs: ) // Try query without deduplication. - err = runutil.Retry(time.Second, ctx.Done(), func() error { + err = runutil.Retry(backoff.NewConstantBackOff(time.Second), ctx.Done(), func() error { select { case criticalErr = <-exit: t.Errorf("Some process exited unexpectedly: %v", err) @@ -135,7 +136,7 @@ scrape_configs: }, res[2].Metric) // Try query with deduplication. - err = runutil.Retry(time.Second, ctx.Done(), func() error { + err = runutil.Retry(backoff.NewConstantBackOff(time.Second), ctx.Done(), func() error { select { case criticalErr = <-exit: t.Errorf("Some process exited unexpectedly: %v", err) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index c4e3fb8ca47..3cbfc5b72b0 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/cenkalti/backoff" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" @@ -86,7 +87,7 @@ groups: "replica": "2", }, } - err = runutil.Retry(5*time.Second, ctx.Done(), func() error { + err = runutil.Retry(backoff.NewConstantBackOff(5*time.Second), ctx.Done(), func() error { select { case err := <-exit: t.Errorf("Some process exited unexpectedly: %v", err)