Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: remove malformed blocks after delay #1053

Merged
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down Expand Up @@ -114,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*httpAddr,
*dataDir,
objStoreConfig,
time.Duration(*syncDelay),
time.Duration(*consistencyDelay),
*haltOnError,
*acceptMalformedIndex,
*wait,
Expand All @@ -140,7 +140,7 @@ func runCompact(
httpBindAddr string,
dataDir string,
objStoreConfig *pathOrContent,
syncDelay time.Duration,
consistencyDelay time.Duration,
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
Expand Down Expand Up @@ -182,7 +182,7 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay,
sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex)
if err != nil {
return errors.Wrap(err, "create syncer")
Expand Down
6 changes: 4 additions & 2 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ Flags:
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed blocks
older than the maximum of consistency-delay and
30m0s will be removed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
Expand Down
48 changes: 43 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
Expand All @@ -31,6 +32,8 @@ const (
ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0)
ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1)
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)

MinimumAgeForRemoval = time.Duration(30 * time.Minute)
)

var blockTooFreshSentinelError = errors.New("Block too fresh")
Expand All @@ -41,7 +44,7 @@ type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
Expand Down Expand Up @@ -132,14 +135,14 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
syncDelay: syncDelay,
consistencyDelay: consistencyDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
Expand All @@ -149,7 +152,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
}

// SyncMetas synchronizes all meta files from blocks in the bucket into
// the memory.
// the memory. It removes any partial blocks older than the max of
// consistencyDelay and MinimumAgeForRemoval from the bucket.
func (c *Syncer) SyncMetas(ctx context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -194,6 +198,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
continue
}
if err != nil {
if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well technically you can do just if c.removeIfMetaMalformed(workCtx, id) here (:

continue
}
errChan <- err
return
}
Expand Down Expand Up @@ -250,6 +257,10 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil, blockTooFreshSentinelError
}
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

Expand All @@ -259,7 +270,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {
Expand All @@ -271,6 +282,33 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
return &meta, nil
}

// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that
// are younger than MinimumAgeForRemoval.
func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) {
metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err)
return false
}
if metaExists {
// Meta exists, block is not malformed.
return false
}

if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) {
// Minimum delay has not expired, ignore for now
return true
}

if err := block.Delete(ctx, c.bkt, id); err != nil {
level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err)
return false
}
level.Info(c.logger).Log("msg", "deleted malformed block", "block", id)

return true
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
40 changes: 40 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package compact

import (
"bytes"
"context"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
"github.com/oklog/ulid"
"path"
"testing"
"time"

"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
Expand Down Expand Up @@ -37,3 +43,37 @@ func TestRetryError(t *testing.T) {
err = errors.Wrap(retry(errors.Wrap(halt(errors.New("test")), "something")), "something2")
testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
}

func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false)
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil)
testutil.Ok(t, err)

var fakeChunk bytes.Buffer
fakeChunk.Write([]byte{0,1,2,3})
testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk))

// Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk
// data but no meta. Compactor should ignore it.
shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil)
testutil.Ok(t, err)

testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk))

testutil.Ok(t, sy.SyncMetas(ctx))

exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, false, exists)

exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}