Skip to content

Commit

Permalink
add the retry to upload/download bucket operations
Browse files Browse the repository at this point in the history
Add backoff retry for a single object storage request, except Range and Iter.
Error handler splits errors on net/http and others, and replies the request to the object storage for the former.

Fixes thanos-io#318
  • Loading branch information
xjewer committed Oct 10, 2018
1 parent f0fbe0b commit 21db8dc
Show file tree
Hide file tree
Showing 20 changed files with 440 additions and 272 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
case "wide":
printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand All @@ -180,7 +180,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
enc.SetIndent("", "\t")

printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand All @@ -192,7 +192,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return errors.Wrap(err, "invalid template")
}
printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
// DefaultRepeatInterval is a interval to wait before next data sync
DefaultRepeatInterval = 5 * time.Minute
)

func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously compacts blocks in an object store bucket")

Expand Down Expand Up @@ -187,7 +192,7 @@ func runCompact(
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
return runutil.Repeat(DefaultRepeatInterval, ctx.Done(), func() error {
err := f()
if err == nil {
return nil
Expand Down
25 changes: 2 additions & 23 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package main

import (
"context"
"encoding/json"
"os"
"path"
"path/filepath"
"time"

"github.com/improbable-eng/thanos/pkg/store"
"github.com/prometheus/tsdb/chunkenc"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -105,28 +104,8 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
var metas []*block.Meta

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
return errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var m block.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
metas = append(metas, &m)

return nil
})
metas, err := store.GetMetas(ctx, logger, bkt)
if err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
DefaultStorageSyncInterval = 5 * time.Second
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
Expand Down Expand Up @@ -262,7 +266,7 @@ func runQuery(
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
return runutil.Repeat(DefaultStorageSyncInterval, ctx.Done(), func() error {
stores.Update(ctx)
return nil
})
Expand Down
9 changes: 7 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
AlertManagerUpdateInterval = 30 * time.Second
RuleSyncInterval = 30 * time.Second
)

// registerRule registers a rule command.
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")
Expand Down Expand Up @@ -291,7 +296,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(AlertManagerUpdateInterval, ctx.Done(), func() error {
if err := alertmgrs.update(ctx); err != nil {
level.Warn(logger).Log("msg", "refreshing Alertmanagers failed", "err", err)
}
Expand Down Expand Up @@ -448,7 +453,7 @@ func runRule(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(RuleSyncInterval, ctx.Done(), func() error {
s.Sync(ctx)

minTime, _, err := s.Timestamps()
Expand Down
20 changes: 13 additions & 7 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)

const (
PrometheusReloadInterval = 30 * time.Second
PrometheusHealthCheckInterval = 2 * time.Second
ShipperSyncInterval = 30 * time.Second
)

func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand Down Expand Up @@ -126,7 +132,7 @@ func runSidecar(
g.Add(func() error {
// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := runutil.Retry(PrometheusHealthCheckInterval, ctx.Done(), func() error {
if err := metadata.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -160,7 +166,7 @@ func runSidecar(

// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
// the external labels we apply.
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(PrometheusReloadInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()

Expand Down Expand Up @@ -200,10 +206,10 @@ func runSidecar(
}
logger := log.With(logger, "component", "sidecar")

var client http.Client
var c http.Client

promStore, err := store.NewPrometheusStore(
logger, &client, promURL, metadata.Labels, metadata.Timestamps)
logger, &c, promURL, metadata.Labels, metadata.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -232,7 +238,7 @@ func runSidecar(
}

// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
// new blocks to object storage service.
bkt, err := client.NewBucket(logger, bucketConfig, reg, component)
if err != nil && err != client.ErrNotFound {
return err
Expand All @@ -257,7 +263,7 @@ func runSidecar(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(ShipperSyncInterval, ctx.Done(), func() error {
s.Sync(ctx)

minTime, _, err := s.Timestamps()
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
StoreSyncInterval = 3 * time.Minute
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS / S3.")
Expand Down Expand Up @@ -124,7 +128,7 @@ func runStore(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(3*time.Minute, ctx.Done(), func() error {
err := runutil.Repeat(StoreSyncInterval, ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Flags:
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1 hour)
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
-r, --repeat=5m Do not exit after all compactions have been processed
and repeat after N minutes for new work.

```
35 changes: 21 additions & 14 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package block
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"

"fmt"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand Down Expand Up @@ -71,8 +71,8 @@ type ThanosDownsampleMeta struct {
// WriteMetaFile writes the given meta into <dir>/meta.json.
func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
// Make any changes to the file appear atomic.
path := filepath.Join(dir, MetaFilename)
tmp := path + ".tmp"
pathMetaFilename := filepath.Join(dir, MetaFilename)
tmp := pathMetaFilename + ".tmp"

f, err := os.Create(tmp)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
if err := f.Close(); err != nil {
return err
}
return renameFile(logger, tmp, path)
return renameFile(logger, tmp, pathMetaFilename)
}

// ReadMetaFile reads the given meta from <dir>/meta.json.
Expand Down Expand Up @@ -209,23 +209,30 @@ func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error {
return err
}

// Delete removes directory that is mean to be block directory.
// Delete removes directory that means to be block directory.
// NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake.
func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
// Delete directory.
del := func() error {
return objstore.DeleteDir(ctx, bucket, id.String())
}
// Retry uploading operations on the network failures, see closer to default backoff configs.
if err := backoff.Retry(del, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil {
return errors.Wrapf(err, "delete block dir %s", id.String())
}
return objstore.DeleteDir(ctx, bucket, id.String())
}

// DownloadMeta downloads only meta file from bucket by block ID.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
// GetMeta downloads only meta file from bucket by block ID.
func GetMeta(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID) (Meta, error) {
var m Meta
data, err := objstore.GetFile(ctx, logger, bucket, path.Join(id.String(), MetaFilename))
if err != nil {
return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
return m, errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")

var m Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
if err := json.Unmarshal(data, &m); err != nil {
return m, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Peer struct {

const (
DefaultRefreshInterval = model.Duration(60 * time.Second)
PeerSyncInterval = 2 * time.Second

// Peer's network types. These are used as a predefined peer configurations for a specified network type.
LocalNetworkPeerType = "local"
Expand Down Expand Up @@ -462,7 +463,7 @@ func resolvePeers(ctx context.Context, peers []string, myAddress string, res net
retryCtx, cancel := context.WithCancel(ctx)
defer cancel()

err := runutil.Retry(2*time.Second, retryCtx.Done(), func() error {
err := runutil.Retry(PeerSyncInterval, retryCtx.Done(), func() error {
if lookupErrSpotted {
// We need to invoke cancel in next run of retry when lookupErrSpotted to preserve LookupIPAddr error.
cancel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {

level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
meta, err := block.GetMeta(ctx, c.logger, c.bkt, id)
if err != nil {
return errors.Wrapf(err, "downloading meta.json for %s", id)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk
if !ok {
return nil
}
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return errors.Wrap(err, "download metadata")
}
Expand Down
Loading

0 comments on commit 21db8dc

Please sign in to comment.