Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor: remove malformed blocks after delay #1053

Merged
2 changes: 1 addition & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed.").
syncDelay := modelDuration(cmd.Flag("sync-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed. Blocks older than this which are malformed will be removed.").
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down
3 changes: 2 additions & 1 deletion docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ Flags:
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
before they are being processed. Blocks older
than this which are malformed will be removed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
Expand Down
9 changes: 9 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,12 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}

// MetaExists checks whether the meta file exists for the provided block.
func MetaExists(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (bool, error) {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
exists, err := bkt.Exists(ctx, path.Join(id.String(), MetaFilename))
if err != nil {
return false, errors.Wrapf(err, "meta.json bkt get for %s", id.String())
}
return exists, nil
}
39 changes: 34 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
consistencyDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
Expand Down Expand Up @@ -132,14 +132,14 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
syncDelay: syncDelay,
consistencyDelay: consistencyDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
Expand All @@ -149,7 +149,8 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
}

// SyncMetas synchronizes all meta files from blocks in the bucket into
// the memory.
// the memory. It removes any partial blocks older than consistencyDelay
// from the bucket.
func (c *Syncer) SyncMetas(ctx context.Context) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down Expand Up @@ -194,6 +195,9 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
continue
}
if err != nil {
if removed := c.removeIfMalformed(workCtx, id); removed {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
continue
}
errChan <- err
return
}
Expand Down Expand Up @@ -250,6 +254,10 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil, blockTooFreshSentinelError
}
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

Expand All @@ -259,7 +267,7 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {
Expand All @@ -271,6 +279,27 @@ func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta
return &meta, nil
}

// removeIfMalformed removes a block from the bucket if that block does not have a meta file.
// It is the responsibility of the caller to ensure that enough time has passed for the block to become consistent
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
func (c *Syncer) removeIfMalformed(ctx context.Context, id ulid.ULID) bool {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
exists, err := block.MetaExists(ctx, c.logger, c.bkt, id)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to check if block is malformed", "block", id)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if exists {
// Meta exists, block is not malformed.
return false
}

if err := block.Delete(ctx, c.bkt, id); err != nil {
level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
}
level.Info(c.logger).Log("msg", "deleted malformed block", "block", id)

return true
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down