Skip to content

Commit

Permalink
compact: add concurrency to meta sync (#887)
Browse files Browse the repository at this point in the history
* add concurrency to meta sync

* fix test

* update docs

* address cr

* use sentinel error to handle ignoring fresh blocks
  • Loading branch information
mjd95 authored and bwplotka committed Mar 5, 2019
1 parent 45c8a5f commit 181c8ce
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 45 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
maxCompactionLevel := cmd.Flag("debug.max-compaction-level", fmt.Sprintf("Maximum compaction level, default is %d: %s", compactions.maxLevel(), compactions.String())).
Hidden().Default(strconv.Itoa(compactions.maxLevel())).Int()

blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -108,6 +111,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
name,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
)
}
}
Expand All @@ -126,6 +130,7 @@ func runCompact(
component string,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -157,7 +162,7 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay)
sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,8 @@ Flags:
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.

```
136 changes: 94 additions & 42 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

var blockTooFreshSentinelError = errors.New("Block too fresh")

// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
metrics *syncerMetrics
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
}

type syncerMetrics struct {
Expand Down Expand Up @@ -124,17 +128,18 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
syncDelay: syncDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
logger: logger,
reg: reg,
syncDelay: syncDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
}, nil
}

Expand All @@ -157,6 +162,44 @@ func (c *Syncer) SyncMetas(ctx context.Context) error {
}

func (c *Syncer) syncMetas(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()

metaIDsChan := make(chan ulid.ULID)
errChan := make(chan error, c.blockSyncConcurrency)

workCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i := 0; i < c.blockSyncConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for id := range metaIDsChan {
// Check if we already have this block cached locally.
c.blocksMtx.Lock()
_, seen := c.blocks[id]
c.blocksMtx.Unlock()
if seen {
continue
}

meta, err := c.downloadMeta(workCtx, id)
if err == blockTooFreshSentinelError {
continue
}
if err != nil {
errChan <- err
return
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
}
}()
}

// Read back all block metas so we can detect deleted blocks.
remote := map[ulid.ULID]struct{}{}

Expand All @@ -168,42 +211,25 @@ func (c *Syncer) syncMetas(ctx context.Context) error {

remote[id] = struct{}{}

// Check if we already have this block cached locally.
if _, ok := c.blocks[id]; ok {
return nil
}

level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
return errors.Wrapf(err, "downloading meta.json for %s", id)
select {
case <-ctx.Done():
case metaIDsChan <- id:
}

// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
// - repair created blocks
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil
}

remote[id] = struct{}{}
c.blocks[id] = &meta

return nil
})
close(metaIDsChan)
if err != nil {
return retry(errors.Wrap(err, "retrieve bucket block metas"))
}

wg.Wait()
close(errChan)

if err := <-errChan; err != nil {
return retry(err)
}

// Delete all local block dirs that no longer exist in the bucket.
for id := range c.blocks {
if _, ok := remote[id]; !ok {
Expand All @@ -214,6 +240,32 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return nil
}

func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
// - repair created blocks
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil, blockTooFreshSentinelError
}

return &meta, nil
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down

0 comments on commit 181c8ce

Please sign in to comment.