Skip to content

Commit

Permalink
compact: add backoff to the retry to upload/download buckets
Browse files Browse the repository at this point in the history
Add backoff reply for a single object storage query request, except Range and Iter methods.
Error handler splits errors on net/http and others, and replies the request to the object storage for the former.

Fixes thanos-io#318
  • Loading branch information
xjewer committed Sep 26, 2018
1 parent 1e1f9e5 commit 466d5b0
Show file tree
Hide file tree
Showing 25 changed files with 471 additions and 297 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
case "wide":
printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand All @@ -167,7 +167,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
enc.SetIndent("", "\t")

printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand All @@ -179,7 +179,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
return errors.Wrap(err, "invalid template")
}
printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, logger, bkt, id)
m, err := block.GetMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"
"time"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/compact"
Expand Down Expand Up @@ -41,8 +42,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
retention5m := modelDuration(cmd.Flag("retention.resolution-5m", "How long to retain samples of resolution 1 (5 minutes) in bucket. 0d - disables this retention").Default("0d"))
retention1h := modelDuration(cmd.Flag("retention.resolution-1h", "How long to retain samples of resolution 2 (1 hour) in bucket. 0d - disables this retention").Default("0d"))

wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()
repeat := cmd.Flag("repeat", "Do not exit after all compactions have been processed and repeat after N minutes for new work.").
Short('r').Default("5m").Duration()

// TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed.
disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+
Expand All @@ -56,7 +57,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*bucketConfFile,
time.Duration(*syncDelay),
*haltOnError,
*wait,
*repeat,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
compact.ResolutionLevel5m: time.Duration(*retention5m),
Expand All @@ -77,7 +78,7 @@ func runCompact(
bucketConfFile string,
syncDelay time.Duration,
haltOnError bool,
wait bool,
repeat time.Duration,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
disableDownsampling bool,
Expand Down Expand Up @@ -178,12 +179,12 @@ func runCompact(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

if !wait {
if repeat == 0 {
return f()
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
// --repeat is specified.
return runutil.Repeat(backoff.NewConstantBackOff(repeat), ctx.Done(), func() error {
err := f()
if err == nil {
return nil
Expand Down
25 changes: 2 additions & 23 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package main

import (
"context"
"encoding/json"
"os"
"path"
"path/filepath"
"time"

"github.com/improbable-eng/thanos/pkg/store"
"github.com/prometheus/tsdb/chunkenc"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -102,28 +101,8 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
var metas []*block.Meta

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
return errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var m block.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
metas = append(metas, &m)

return nil
})
metas, err := store.GetMetas(ctx, logger, bkt)
if err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"time"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -35,6 +36,10 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
DefaultStorageSyncInterval = 5 * time.Second
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
Expand Down Expand Up @@ -255,7 +260,7 @@ func runQuery(
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
return runutil.Repeat(backoff.NewConstantBackOff(DefaultStorageSyncInterval), ctx.Done(), func() error {
stores.Update(ctx)
return nil
})
Expand Down
10 changes: 8 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"syscall"
"time"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
Expand Down Expand Up @@ -47,6 +48,11 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
AlertManagerUpdateInterval = 30 * time.Second
RuleSyncInterval = 30 * time.Second
)

// registerRule registers a rule command.
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "ruler evaluating Prometheus rules against given Query nodes, exposing Store API and storing old blocks in bucket")
Expand Down Expand Up @@ -292,7 +298,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(backoff.NewConstantBackOff(AlertManagerUpdateInterval), ctx.Done(), func() error {
if err := alertmgrs.update(ctx); err != nil {
level.Warn(logger).Log("msg", "refreshing Alertmanagers failed", "err", err)
}
Expand Down Expand Up @@ -445,7 +451,7 @@ func runRule(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(backoff.NewConstantBackOff(RuleSyncInterval), ctx.Done(), func() error {
s.Sync(ctx)

minTime, _, err := s.Timestamps()
Expand Down
19 changes: 13 additions & 6 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand All @@ -31,6 +32,12 @@ import (
yaml "gopkg.in/yaml.v2"
)

const (
PrometheusReloadInterval = 30 * time.Second
PrometheusHealthCheckInterval = 2 * time.Second
ShipperSyncInterval = 30 * time.Second
)

func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "sidecar for Prometheus server")

Expand Down Expand Up @@ -127,7 +134,7 @@ func runSidecar(
g.Add(func() error {
// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
err := runutil.Retry(backoff.NewConstantBackOff(PrometheusHealthCheckInterval), ctx.Done(), func() error {
if err := metadata.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -161,7 +168,7 @@ func runSidecar(

// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating
// the external labels we apply.
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(backoff.NewConstantBackOff(PrometheusReloadInterval), ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()

Expand Down Expand Up @@ -201,10 +208,10 @@ func runSidecar(
}
logger := log.With(logger, "component", "sidecar")

var client http.Client
var c http.Client

promStore, err := store.NewPrometheusStore(
logger, &client, promURL, metadata.Labels, metadata.Timestamps)
logger, &c, promURL, metadata.Labels, metadata.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand All @@ -228,7 +235,7 @@ func runSidecar(
var uploads = true

// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
// new blocks to object storage service.
bkt, err := client.NewBucket(logger, bucketConfFile, reg, component)
if err != nil && err != client.ErrNotFound {
return err
Expand All @@ -253,7 +260,7 @@ func runSidecar(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.Repeat(backoff.NewConstantBackOff(ShipperSyncInterval), ctx.Done(), func() error {
s.Sync(ctx)

minTime, _, err := s.Timestamps()
Expand Down
7 changes: 6 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"time"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/cluster"
Expand All @@ -21,6 +22,10 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
StoreSyncInterval = 3 * time.Minute
)

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "store node giving access to blocks in a bucket provider. Now supported GCS / S3.")
Expand Down Expand Up @@ -120,7 +125,7 @@ func runStore(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

err := runutil.Repeat(3*time.Minute, ctx.Done(), func() error {
err := runutil.Repeat(backoff.NewConstantBackOff(StoreSyncInterval), ctx.Done(), func() error {
if err := bs.SyncBlocks(ctx); err != nil {
level.Warn(logger).Log("msg", "syncing blocks failed", "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Flags:
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1 hour)
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
-r, --repeat=5m Do not exit after all compactions have been processed
and repeat after N minutes for new work.

```
35 changes: 21 additions & 14 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package block
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"

"fmt"

"github.com/cenkalti/backoff"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand Down Expand Up @@ -71,8 +71,8 @@ type ThanosDownsampleMeta struct {
// WriteMetaFile writes the given meta into <dir>/meta.json.
func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
// Make any changes to the file appear atomic.
path := filepath.Join(dir, MetaFilename)
tmp := path + ".tmp"
pathMetaFilename := filepath.Join(dir, MetaFilename)
tmp := pathMetaFilename + ".tmp"

f, err := os.Create(tmp)
if err != nil {
Expand All @@ -89,7 +89,7 @@ func WriteMetaFile(logger log.Logger, dir string, meta *Meta) error {
if err := f.Close(); err != nil {
return err
}
return renameFile(logger, tmp, path)
return renameFile(logger, tmp, pathMetaFilename)
}

// ReadMetaFile reads the given meta from <dir>/meta.json.
Expand Down Expand Up @@ -209,23 +209,30 @@ func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error {
return err
}

// Delete removes directory that is mean to be block directory.
// Delete removes directory that means to be block directory.
// NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake.
func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
// Delete directory.
del := func() error {
return objstore.DeleteDir(ctx, bucket, id.String())
}
// Retry uploading operations on the network failures, see closer to default backoff configs.
if err := backoff.Retry(del, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)); err != nil {
return errors.Wrapf(err, "delete block dir %s", id.String())
}
return objstore.DeleteDir(ctx, bucket, id.String())
}

// DownloadMeta downloads only meta file from bucket by block ID.
func DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (Meta, error) {
rc, err := bkt.Get(ctx, path.Join(id.String(), MetaFilename))
// GetMeta downloads only meta file from bucket by block ID.
func GetMeta(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID) (Meta, error) {
var m Meta
data, err := objstore.GetFile(ctx, logger, bucket, path.Join(id.String(), MetaFilename))
if err != nil {
return Meta{}, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
return m, errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "download meta bucket client")

var m Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return Meta{}, errors.Wrapf(err, "decode meta.json for block %s", id.String())
if err := json.Unmarshal(data, &m); err != nil {
return m, errors.Wrapf(err, "decode meta.json for block %s", id.String())
}
return m, nil
}
Expand Down
Loading

0 comments on commit 466d5b0

Please sign in to comment.