Skip to content

Commit

Permalink
Compactor: remove malformed blocks after delay (#1053)
Browse files Browse the repository at this point in the history
* compactor removes malformed blocks after delay

* compactor removes malformed blocks after delay

* include missing file

* reuse existing freshness check

* fix comment

* remove unused var

* fix comment

* syncDelay -> consistencyDelay

* fix comment

* update flag description

* address cr

* fix dupliacte error handling

* minimum value for --consistency-delay

* update

* docs

* add test case

* move test to inmem bucket
  • Loading branch information
mjd95 authored and bwplotka committed May 2, 2019
1 parent f97d168 commit 3a763be
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 11 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down Expand Up @@ -114,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*httpAddr,
*dataDir,
objStoreConfig,
time.Duration(*syncDelay),
time.Duration(*consistencyDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
Expand All @@ -140,7 +140,7 @@ func runCompact(
httpBindAddr string,
dataDir string,
objStoreConfig *pathOrContent,
syncDelay time.Duration,
consistencyDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
Expand Down Expand Up @@ -182,7 +182,7 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay,
sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
6 changes: 4 additions & 2 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ Flags:
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed blocks
older than the maximum of consistency-delay and
30m0s will be removed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
Expand Down
48 changes: 43 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand All @@ -31,6 +32,8 @@ const (
ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0)
ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1)
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)

MinimumAgeForRemoval = time.Duration(30 * time.Minute)
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
Expand All @@ -41,7 +44,7 @@ type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
Expand Down Expand Up @@ -132,14 +135,14 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
syncDelay: syncDelay,
consistencyDelay: consistencyDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
Expand All @@ -149,7 +152,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
}

// SyncMetas synchronizes all meta files from blocks in the bucket into
// the memory.
// the memory. It removes any partial blocks older than the max of
// consistencyDelay and MinimumAgeForRemoval from the bucket.
func (c *Syncer) SyncMetas(ctx context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -194,6 +198,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
continue
}
if err != nil {
if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
continue
}
errChan <- err
return
}
Expand Down Expand Up @@ -250,6 +257,10 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil, blockTooFreshSentinelError
}
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

Expand All @@ -259,7 +270,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {
Expand All @@ -271,6 +282,33 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
return &meta, nil
}

// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that
// are younger than MinimumAgeForRemoval.
func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) {
metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err)
return false
}
if metaExists {
// Meta exists, block is not malformed.
return false
}

if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) {
// Minimum delay has not expired, ignore for now
return true
}

if err := block.Delete(ctx, c.bkt, id); err != nil {
level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err)
return false
}
level.Info(c.logger).Log("msg", "deleted malformed block", "block", id)

return true
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
40 changes: 40 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package compact

import (
"bytes"
"context"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
"github.com/oklog/ulid"
"path"
"testing"
"time"

"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -37,3 +43,37 @@ func TestRetryError(t *testing.T) {
err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
}

func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false)
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil)
testutil.Ok(t, err)

var fakeChunk bytes.Buffer
fakeChunk.Write([]byte{0,1,2,3})
testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk))

// Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk
// data but no meta. Compactor should ignore it.
shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil)
testutil.Ok(t, err)

testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk))

testutil.Ok(t, sy.SyncMetas(ctx))

exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, false, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}

0 comments on commit 3a763be

Please sign in to comment.