From 4d9c6ea16eca856621388c88aead5f9025276110 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 17 Apr 2019 15:25:12 +0100 Subject: [PATCH 01/17] compactor removes malformed blocks after delay --- pkg/compact/compact.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 150451185d..448054ba6e 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -31,6 +31,8 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) + + consistencyDelay = time.Duration(30 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") @@ -149,7 +151,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket } // SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. +// the memory. It removes any partial blocks older than consistencyDelay +// from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() defer c.mtx.Unlock() @@ -193,6 +196,11 @@ func (c *Syncer) syncMetas(ctx context.Context) error { if err == blockTooFreshSentinelError { continue } + if c.bkt.IsObjNotFoundErr(err) { + if ulid.Now()-id.Time() < uint64(consistencyDelay/time.Millisecond) { + err = c.bkt.Delete(workCtx, id.String()) + } + } if err != nil { errChan <- err return From f04c1dcb20524f76a62a52e87c61200b7d130ac8 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 18 Apr 2019 17:04:40 +0100 Subject: [PATCH 02/17] compactor removes malformed blocks after delay --- pkg/compact/compact.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 448054ba6e..13a16ec2a8 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -196,12 +196,10 @@ func (c *Syncer) syncMetas(ctx context.Context) error { if err == blockTooFreshSentinelError { continue } - if c.bkt.IsObjNotFoundErr(err) { - if ulid.Now()-id.Time() < uint64(consistencyDelay/time.Millisecond) { - err = c.bkt.Delete(workCtx, id.String()) - } - } if err != nil { + if removed := c.removeIfMalformed(workCtx, id); removed { + continue + } errChan <- err return } @@ -279,6 +277,30 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta return &meta, nil } +func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool { + exists, err := block.MetaExists(ctx, c.logger, c.bkt, id) + if err != nil { + level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id) + return false + } + if exists { + // Meta exists, block is not malformed. + return false + } + + if ulid.Now()-id.Time() <= uint64(consistencyDelay/time.Millisecond) { + // Consistency delay has not expired, so can't say block is malformed yet. + return false + } + + if err := block.Delete(ctx, c.bkt, id); err != nil { + level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id) + } + level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) + + return true +} + // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. func GroupKey(meta metadata.Meta) string { From 4a932af15d0e16651cdd32463483379c915b0687 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 18 Apr 2019 17:13:38 +0100 Subject: [PATCH 03/17] include missing file --- pkg/block/block.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/block/block.go b/pkg/block/block.go index 5c1033a7d8..2726cee63a 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -145,3 +145,12 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } + +// MetaExists checks whether the meta file exists for the provided block +func MetaExists(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (bool, error) { + exists, err := bkt.Exists(ctx, path.Join(id.String(), MetaFilename)) + if err != nil { + return false, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) + } + return exists, nil +} From 58dfb04f5ee460d46948b2896b396cbe9afe94d4 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:28:37 +0100 Subject: [PATCH 04/17] reuse existing freshness check --- pkg/compact/compact.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 13a16ec2a8..ffcf0d63dc 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -256,6 +256,10 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) if err != nil { + if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) { + level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) + return nil, blockTooFreshSentinelError + } return nil, errors.Wrapf(err, "downloading meta.json for %s", id) } @@ -277,6 +281,9 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta return &meta, nil } +// removeIfMalformed removes a block from the bucket if that block does not have a meta file. +// It is the responsibility of the caller to ensure that enough time has passed to ensure that +// the enough time has passed for the block to become consistent func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool { exists, err := block.MetaExists(ctx, c.logger, c.bkt, id) if err != nil { @@ -288,11 +295,6 @@ func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool { return false } - if ulid.Now()-id.Time() <= uint64(consistencyDelay/time.Millisecond) { - // Consistency delay has not expired, so can't say block is malformed yet. - return false - } - if err := block.Delete(ctx, c.bkt, id); err != nil { level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id) } From e55e4ac276b1cee351c1f7dea1a1c1e2de88c1b7 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:29:46 +0100 Subject: [PATCH 05/17] fix comment --- pkg/block/block.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/block/block.go b/pkg/block/block.go index 2726cee63a..3cba0ae35b 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -146,7 +146,7 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { return id, err == nil } -// MetaExists checks whether the meta file exists for the provided block +// MetaExists checks whether the meta file exists for the provided block. func MetaExists(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (bool, error) { exists, err := bkt.Exists(ctx, path.Join(id.String(), MetaFilename)) if err != nil { From a1d7c24369a5dd2f45fa0c231faf5f56b9fcaa64 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:31:21 +0100 Subject: [PATCH 06/17] remove unused var --- pkg/compact/compact.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index ffcf0d63dc..60fb882278 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -31,8 +31,6 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) - - consistencyDelay = time.Duration(30 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") From 7d40d91f382301c46cf54a5fd187be90622efb6b Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:32:03 +0100 Subject: [PATCH 07/17] fix comment --- pkg/compact/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 60fb882278..906fce2bfa 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -149,7 +149,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket } // SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. It removes any partial blocks older than consistencyDelay +// the memory. It removes any partial blocks older than syncDelay // from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() From 87940867ebfe64f6cb3aad4dd44e29310ba3b192 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:42:28 +0100 Subject: [PATCH 08/17] syncDelay -> consistencyDelay --- pkg/compact/compact.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 906fce2bfa..70143270f3 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -41,7 +41,7 @@ type Syncer struct { logger log.Logger reg prometheus.Registerer bkt objstore.Bucket - syncDelay time.Duration + consistencyDelay time.Duration mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta blocksMtx sync.Mutex @@ -132,14 +132,14 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ logger: logger, reg: reg, - syncDelay: syncDelay, + consistencyDelay: consistencyDelay, blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), @@ -149,7 +149,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket } // SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. It removes any partial blocks older than syncDelay +// the memory. It removes any partial blocks older than consistencyDelay // from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() @@ -254,7 +254,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) if err != nil { - if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) { + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil, blockTooFreshSentinelError } @@ -267,7 +267,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta // - compactor created blocks // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 - if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) && meta.Thanos.Source != metadata.BucketRepairSource && meta.Thanos.Source != metadata.CompactorSource && meta.Thanos.Source != metadata.CompactorRepairSource { From c33205a295627af5750708ef61949c75b3ed539f Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:44:33 +0100 Subject: [PATCH 09/17] fix comment --- pkg/compact/compact.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 70143270f3..7ec5acff93 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -280,8 +280,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta } // removeIfMalformed removes a block from the bucket if that block does not have a meta file. -// It is the responsibility of the caller to ensure that enough time has passed to ensure that -// the enough time has passed for the block to become consistent +// It is the responsibility of the caller to ensure that enough time has passed for the block to become consistent func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool { exists, err := block.MetaExists(ctx, c.logger, c.bkt, id) if err != nil { From 16324bcb606ad1c3306c9caf9eefb960b8daaa51 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Wed, 24 Apr 2019 14:48:50 +0100 Subject: [PATCH 10/17] update flag description --- cmd/thanos/compact.go | 2 +- docs/components/compact.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index bec9746c55..36b5c0ea95 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed."). + syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed."). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) diff --git a/docs/components/compact.md b/docs/components/compact.md index 373c6d94b9..b7134d14c4 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -61,7 +61,8 @@ Flags: Alternative to 'objstore.config-file' flag. Object store configuration in YAML. --sync-delay=30m Minimum age of fresh (non-compacted) blocks - before they are being processed. + before they are being processed. Blocks older + than this which are malformed will be removed. --retention.resolution-raw=0d How long to retain raw samples in bucket. 0d - disables this retention From a44441a450fe4dbee629e262957aed27a9a8a738 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 25 Apr 2019 18:01:26 +0100 Subject: [PATCH 11/17] address cr --- pkg/block/block.go | 9 --------- pkg/compact/compact.go | 20 +++++++++++++------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pkg/block/block.go b/pkg/block/block.go index 3cba0ae35b..5c1033a7d8 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -145,12 +145,3 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { id, err := ulid.Parse(filepath.Base(path)) return id, err == nil } - -// MetaExists checks whether the meta file exists for the provided block. -func MetaExists(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (bool, error) { - exists, err := bkt.Exists(ctx, path.Join(id.String(), MetaFilename)) - if err != nil { - return false, errors.Wrapf(err, "meta.json bkt get for %s", id.String()) - } - return exists, nil -} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 7ec5acff93..3525fe16ed 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "sort" "strings" @@ -195,7 +196,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { continue } if err != nil { - if removed := c.removeIfMalformed(workCtx, id); removed { + if removed := c.removeIfMetaMalformed(workCtx, id); removed { continue } errChan <- err @@ -280,20 +281,25 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta } // removeIfMalformed removes a block from the bucket if that block does not have a meta file. -// It is the responsibility of the caller to ensure that enough time has passed for the block to become consistent -func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool { - exists, err := block.MetaExists(ctx, c.logger, c.bkt, id) +// It is the responsibility of the caller to ensure that enough time has passed for the block to become consistent. +func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) bool { + metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) if err != nil { - level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id) + level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) return false } - if exists { + if err != nil { + level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id, "err", err) + return false + } + if metaExists { // Meta exists, block is not malformed. return false } if err := block.Delete(ctx, c.bkt, id); err != nil { - level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id) + level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) + return false } level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) From 2a3800c1901477abc02eb46b27a6339a7ef36750 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 25 Apr 2019 18:12:36 +0100 Subject: [PATCH 12/17] fix dupliacte error handling --- cmd/thanos/compact.go | 8 ++++---- docs/components/compact.md | 2 +- pkg/compact/compact.go | 4 ---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 36b5c0ea95..50b680ba5b 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed."). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed."). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) @@ -114,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *httpAddr, *dataDir, objStoreConfig, - time.Duration(*syncDelay), + time.Duration(*consistencyDelay), *haltOnError, *acceptMalformedIndex, *wait, @@ -140,7 +140,7 @@ func runCompact( httpBindAddr string, dataDir string, objStoreConfig *pathOrContent, - syncDelay time.Duration, + consistencyDelay time.Duration, haltOnError bool, acceptMalformedIndex bool, wait bool, @@ -182,7 +182,7 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, + sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, blockSyncConcurrency, acceptMalformedIndex) if err != nil { return errors.Wrap(err, "create syncer") diff --git a/docs/components/compact.md b/docs/components/compact.md index b7134d14c4..304bf830a9 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -60,7 +60,7 @@ Flags: --objstore.config= Alternative to 'objstore.config-file' flag. Object store configuration in YAML. - --sync-delay=30m Minimum age of fresh (non-compacted) blocks + --consistency-delay=30m Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed. --retention.resolution-raw=0d diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 3525fe16ed..b4d613cce8 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -288,10 +288,6 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) bool { level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) return false } - if err != nil { - level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id, "err", err) - return false - } if metaExists { // Meta exists, block is not malformed. return false From b94f24ffea8ef50c12df56bf8101fe738ad306bf Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Fri, 26 Apr 2019 16:38:38 +0100 Subject: [PATCH 13/17] minimum value for --consistency-delay --- cmd/thanos/compact.go | 2 +- docs/components/compact.md | 2 ++ pkg/compact/compact.go | 6 ++++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 50b680ba5b..96d2542625 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed."). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed. The smallest possible value of this flag is %s.", compact.MinimumConsistencyDelay)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) diff --git a/docs/components/compact.md b/docs/components/compact.md index 304bf830a9..dde59b5150 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -63,6 +63,8 @@ Flags: --consistency-delay=30m Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed. + The smallest possible value of this flag is + 10m0s. --retention.resolution-raw=0d How long to retain raw samples in bucket. 0d - disables this retention diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b4d613cce8..4f9e43a9fc 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -32,6 +32,8 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) + + MinimumConsistencyDelay = time.Duration(10 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") @@ -134,6 +136,10 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { + if consistencyDelay < MinimumConsistencyDelay { + return nil, errors.New(fmt.Sprintf("invalid consistency delay, must be at least %s", MinimumConsistencyDelay)) + } + if logger == nil { logger = log.NewNopLogger() } From c7a114f86ae2603a1942893086e0bec63465d819 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 2 May 2019 11:44:31 +0100 Subject: [PATCH 14/17] update --- cmd/thanos/compact.go | 2 +- pkg/compact/compact.go | 21 +++++++++++---------- pkg/compact/compact_e2e_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 96d2542625..f125e5d578 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed. The smallest possible value of this flag is %s.", compact.MinimumConsistencyDelay)). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumConsistencyDelay)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 4f9e43a9fc..5f39e2d4b0 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -136,10 +136,6 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { - if consistencyDelay < MinimumConsistencyDelay { - return nil, errors.New(fmt.Sprintf("invalid consistency delay, must be at least %s", MinimumConsistencyDelay)) - } - if logger == nil { logger = log.NewNopLogger() } @@ -156,8 +152,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket } // SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. It removes any partial blocks older than consistencyDelay -// from the bucket. +// the memory. It removes any partial blocks older than the max of +// consistencyDelay and minimumRemoveAge from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() defer c.mtx.Unlock() @@ -202,7 +198,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error { continue } if err != nil { - if removed := c.removeIfMetaMalformed(workCtx, id); removed { + if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored { continue } errChan <- err @@ -286,9 +282,9 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta return &meta, nil } -// removeIfMalformed removes a block from the bucket if that block does not have a meta file. -// It is the responsibility of the caller to ensure that enough time has passed for the block to become consistent. -func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) bool { +// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that +// are younger than minimumRemoveAge. +func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) if err != nil { level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) @@ -299,6 +295,11 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) bool { return false } + if ulid.Now()-id.Time() <= uint64(MinimumConsistencyDelay/time.Millisecond) { + // Minimum delay has not expired, should ignore for now + return true + } + if err := block.Delete(ctx, c.bkt, id); err != nil { level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) return false diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f58862c0ac..5fa4e3611d 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -75,6 +75,30 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { } +func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { + objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) + testutil.Ok(t, err) + + // Generate 1 block which is older than minimumRemovalAge which has chunk data but no meta. Compactor should delete it. + id, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + + var fakeChunk bytes.Buffer + fakeChunk.Write([]byte{0,1,2,3}) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), "chunks", "000001"), &fakeChunk)) + + testutil.Ok(t, sy.SyncMetas(ctx)) + + exists, err := bkt.Exists(ctx, id.String()) + testutil.Ok(t, err) + testutil.Equals(t, false, exists) + }) +} + func TestSyncer_GarbageCollect_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) From f75782d1db715dc77c5bbb406a7ced7ed993b3b1 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 2 May 2019 14:15:03 +0100 Subject: [PATCH 15/17] docs --- cmd/thanos/compact.go | 2 +- docs/components/compact.md | 7 +++---- pkg/compact/compact.go | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index f125e5d578..365d211992 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumConsistencyDelay)). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) diff --git a/docs/components/compact.md b/docs/components/compact.md index dde59b5150..439adc520b 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -61,10 +61,9 @@ Flags: Alternative to 'objstore.config-file' flag. Object store configuration in YAML. --consistency-delay=30m Minimum age of fresh (non-compacted) blocks - before they are being processed. Blocks older - than this which are malformed will be removed. - The smallest possible value of this flag is - 10m0s. + before they are being processed. Malformed blocks + older than the maximum of consistency-delay and + 30m0s will be removed. --retention.resolution-raw=0d How long to retain raw samples in bucket. 0d - disables this retention diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5f39e2d4b0..21837df4ff 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -33,7 +33,7 @@ const ( ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) - MinimumConsistencyDelay = time.Duration(10 * time.Minute) + MinimumAgeForRemoval = time.Duration(30 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") @@ -295,7 +295,7 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov return false } - if ulid.Now()-id.Time() <= uint64(MinimumConsistencyDelay/time.Millisecond) { + if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { // Minimum delay has not expired, should ignore for now return true } From 738ea86ac3c30a0f6302eeea290d4f1d7b999a85 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 2 May 2019 14:21:34 +0100 Subject: [PATCH 16/17] add test case --- pkg/compact/compact.go | 6 +++--- pkg/compact/compact_e2e_test.go | 21 ++++++++++++++++----- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 21837df4ff..c9c21cdeb0 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -153,7 +153,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket // SyncMetas synchronizes all meta files from blocks in the bucket into // the memory. It removes any partial blocks older than the max of -// consistencyDelay and minimumRemoveAge from the bucket. +// consistencyDelay and MinimumAgeForRemoval from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() defer c.mtx.Unlock() @@ -283,7 +283,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta } // removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that -// are younger than minimumRemoveAge. +// are younger than MinimumAgeForRemoval. func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) if err != nil { @@ -296,7 +296,7 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov } if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { - // Minimum delay has not expired, should ignore for now + // Minimum delay has not expired, ignore for now return true } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 5fa4e3611d..4b9512fd71 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -80,22 +80,33 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false) + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false) testutil.Ok(t, err) - // Generate 1 block which is older than minimumRemovalAge which has chunk data but no meta. Compactor should delete it. - id, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) + // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. + shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) testutil.Ok(t, err) var fakeChunk bytes.Buffer fakeChunk.Write([]byte{0,1,2,3}) - testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), "chunks", "000001"), &fakeChunk)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) + + // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk + // data but no meta. Compactor should ignore it. + shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk)) testutil.Ok(t, sy.SyncMetas(ctx)) - exists, err := bkt.Exists(ctx, id.String()) + exists, err := bkt.Exists(ctx, shouldDeleteId.String()) testutil.Ok(t, err) testutil.Equals(t, false, exists) + + exists, err = bkt.Exists(ctx, shouldIgnoreId.String()) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) }) } From 58febd3c2a7f2306ed3b9ebf2007adfa2d63fda1 Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Thu, 2 May 2019 14:42:56 +0100 Subject: [PATCH 17/17] move test to inmem bucket --- pkg/compact/compact_e2e_test.go | 35 ----------------------------- pkg/compact/compact_test.go | 40 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 4b9512fd71..f58862c0ac 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -75,41 +75,6 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { } -func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() - - sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false) - testutil.Ok(t, err) - - // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. - shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) - testutil.Ok(t, err) - - var fakeChunk bytes.Buffer - fakeChunk.Write([]byte{0,1,2,3}) - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) - - // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk - // data but no meta. Compactor should ignore it. - shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil) - testutil.Ok(t, err) - - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk)) - - testutil.Ok(t, sy.SyncMetas(ctx)) - - exists, err := bkt.Exists(ctx, shouldDeleteId.String()) - testutil.Ok(t, err) - testutil.Equals(t, false, exists) - - exists, err = bkt.Exists(ctx, shouldIgnoreId.String()) - testutil.Ok(t, err) - testutil.Equals(t, true, exists) - }) -} - func TestSyncer_GarbageCollect_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index f175fcce93..a5fbe5d790 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -1,7 +1,13 @@ package compact import ( + "bytes" + "context" + "github.com/improbable-eng/thanos/pkg/objstore/inmem" + "github.com/oklog/ulid" + "path" "testing" + "time" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" @@ -37,3 +43,37 @@ func TestRetryError(t *testing.T) { err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2") testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } + +func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + bkt := inmem.NewBucket() + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false) + testutil.Ok(t, err) + + // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. + shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + + var fakeChunk bytes.Buffer + fakeChunk.Write([]byte{0,1,2,3}) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) + + // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk + // data but no meta. Compactor should ignore it. + shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk)) + + testutil.Ok(t, sy.SyncMetas(ctx)) + + exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, false, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) +}