From 3a763bef3d8b2bd653318949fd93d75fe60fa16b Mon Sep 17 00:00:00 2001 From: Martin Dickson Date: Fri, 3 May 2019 00:10:45 +0100 Subject: [PATCH] Compactor: remove malformed blocks after delay (#1053) * compactor removes malformed blocks after delay * compactor removes malformed blocks after delay * include missing file * reuse existing freshness check * fix comment * remove unused var * fix comment * syncDelay -> consistencyDelay * fix comment * update flag description * address cr * fix dupliacte error handling * minimum value for --consistency-delay * update * docs * add test case * move test to inmem bucket --- cmd/thanos/compact.go | 8 +++---- docs/components/compact.md | 6 +++-- pkg/compact/compact.go | 48 +++++++++++++++++++++++++++++++++---- pkg/compact/compact_test.go | 40 +++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 11 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index bec9746c55..365d211992 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed."). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) @@ -114,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *httpAddr, *dataDir, objStoreConfig, - time.Duration(*syncDelay), + time.Duration(*consistencyDelay), *haltOnError, *acceptMalformedIndex, *wait, @@ -140,7 +140,7 @@ func runCompact( httpBindAddr string, dataDir string, objStoreConfig *pathOrContent, - syncDelay time.Duration, + consistencyDelay time.Duration, haltOnError bool, acceptMalformedIndex bool, wait bool, @@ -182,7 +182,7 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, + sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, blockSyncConcurrency, acceptMalformedIndex) if err != nil { return errors.Wrap(err, "create syncer") diff --git a/docs/components/compact.md b/docs/components/compact.md index 373c6d94b9..439adc520b 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -60,8 +60,10 @@ Flags: --objstore.config= Alternative to 'objstore.config-file' flag. Object store configuration in YAML. - --sync-delay=30m Minimum age of fresh (non-compacted) blocks - before they are being processed. + --consistency-delay=30m Minimum age of fresh (non-compacted) blocks + before they are being processed. Malformed blocks + older than the maximum of consistency-delay and + 30m0s will be removed. --retention.resolution-raw=0d How long to retain raw samples in bucket. 0d - disables this retention diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 150451185d..c9c21cdeb0 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "os" + "path" "path/filepath" "sort" "strings" @@ -31,6 +32,8 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) + + MinimumAgeForRemoval = time.Duration(30 * time.Minute) ) var blockTooFreshSentinelError = errors.New("Block too fresh") @@ -41,7 +44,7 @@ type Syncer struct { logger log.Logger reg prometheus.Registerer bkt objstore.Bucket - syncDelay time.Duration + consistencyDelay time.Duration mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta blocksMtx sync.Mutex @@ -132,14 +135,14 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ logger: logger, reg: reg, - syncDelay: syncDelay, + consistencyDelay: consistencyDelay, blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, metrics: newSyncerMetrics(reg), @@ -149,7 +152,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket } // SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. +// the memory. It removes any partial blocks older than the max of +// consistencyDelay and MinimumAgeForRemoval from the bucket. func (c *Syncer) SyncMetas(ctx context.Context) error { c.mtx.Lock() defer c.mtx.Unlock() @@ -194,6 +198,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error { continue } if err != nil { + if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored { + continue + } errChan <- err return } @@ -250,6 +257,10 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) if err != nil { + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { + level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) + return nil, blockTooFreshSentinelError + } return nil, errors.Wrapf(err, "downloading meta.json for %s", id) } @@ -259,7 +270,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta // - compactor created blocks // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 - if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && + if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) && meta.Thanos.Source != metadata.BucketRepairSource && meta.Thanos.Source != metadata.CompactorSource && meta.Thanos.Source != metadata.CompactorRepairSource { @@ -271,6 +282,33 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta return &meta, nil } +// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that +// are younger than MinimumAgeForRemoval. +func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { + metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) + if err != nil { + level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) + return false + } + if metaExists { + // Meta exists, block is not malformed. + return false + } + + if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { + // Minimum delay has not expired, ignore for now + return true + } + + if err := block.Delete(ctx, c.bkt, id); err != nil { + level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) + return false + } + level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) + + return true +} + // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. func GroupKey(meta metadata.Meta) string { diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index f175fcce93..a5fbe5d790 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -1,7 +1,13 @@ package compact import ( + "bytes" + "context" + "github.com/improbable-eng/thanos/pkg/objstore/inmem" + "github.com/oklog/ulid" + "path" "testing" + "time" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/pkg/errors" @@ -37,3 +43,37 @@ func TestRetryError(t *testing.T) { err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2") testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } + +func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + bkt := inmem.NewBucket() + sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false) + testutil.Ok(t, err) + + // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. + shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + + var fakeChunk bytes.Buffer + fakeChunk.Write([]byte{0,1,2,3}) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) + + // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk + // data but no meta. Compactor should ignore it. + shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk)) + + testutil.Ok(t, sy.SyncMetas(ctx)) + + exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, false, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) +}