Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: add concurrency to meta sync #887

Merged
merged 5 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
maxCompactionLevel := cmd.Flag("debug.max-compaction-level", fmt.Sprintf("Maximum compaction level, default is %d: %s", compactions.maxLevel(), compactions.String())).
Hidden().Default(strconv.Itoa(compactions.maxLevel())).Int()

blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -108,6 +111,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
name,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
)
}
}
Expand All @@ -126,6 +130,7 @@ func runCompact(
component string,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -157,7 +162,7 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay)
sy, err := compact.NewSyncer(logger, reg, bkt, syncDelay, blockSyncConcurrency)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
3 changes: 3 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,8 @@ Flags:
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.

```
136 changes: 94 additions & 42 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ const (
ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2)
)

var blockTooFreshSentinelError = errors.New("Block too fresh")

// Syncer syncronizes block metas from a bucket into a local directory.
// It sorts them into compaction groups based on equal label sets.
type Syncer struct {
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
metrics *syncerMetrics
logger log.Logger
reg prometheus.Registerer
bkt objstore.Bucket
syncDelay time.Duration
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
blocksMtx sync.Mutex
blockSyncConcurrency int
metrics *syncerMetrics
}

type syncerMetrics struct {
Expand Down Expand Up @@ -124,17 +128,18 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, syncDelay time.Duration, blockSyncConcurrency int) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
return &Syncer{
logger: logger,
reg: reg,
syncDelay: syncDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
logger: logger,
reg: reg,
syncDelay: syncDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
}, nil
}

Expand All @@ -157,6 +162,44 @@ func (c *Syncer) SyncMetas(ctx context.Context) error {
}

func (c *Syncer) syncMetas(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()
mjd95 marked this conversation as resolved.
Show resolved Hide resolved

metaIDsChan := make(chan ulid.ULID)
errChan := make(chan error, c.blockSyncConcurrency)

workCtx, cancel := context.WithCancel(ctx)
defer cancel()
for i := 0; i < c.blockSyncConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for id := range metaIDsChan {
// Check if we already have this block cached locally.
c.blocksMtx.Lock()
_, seen := c.blocks[id]
c.blocksMtx.Unlock()
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
if seen {
continue
}

meta, err := c.downloadMeta(workCtx, id)
if err == blockTooFreshSentinelError {
continue
}
if err != nil {
errChan <- err
return
}

c.blocksMtx.Lock()
c.blocks[id] = meta
c.blocksMtx.Unlock()
}
}()
}

// Read back all block metas so we can detect deleted blocks.
remote := map[ulid.ULID]struct{}{}

Expand All @@ -168,42 +211,25 @@ func (c *Syncer) syncMetas(ctx context.Context) error {

remote[id] = struct{}{}

// Check if we already have this block cached locally.
if _, ok := c.blocks[id]; ok {
return nil
}

level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
return errors.Wrapf(err, "downloading meta.json for %s", id)
select {
case <-ctx.Done():
case metaIDsChan <- id:
}

// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
// - repair created blocks
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
return nil
}

remote[id] = struct{}{}
c.blocks[id] = &meta

return nil
})
close(metaIDsChan)
if err != nil {
return retry(errors.Wrap(err, "retrieve bucket block metas"))
}

wg.Wait()
close(errChan)

if err := <-errChan; err != nil {
return retry(err)
}

// Delete all local block dirs that no longer exist in the bucket.
for id := range c.blocks {
if _, ok := remote[id]; !ok {
Expand All @@ -214,6 +240,32 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
return nil
}

func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
return nil, errors.Wrapf(err, "downloading meta.json for %s", id)
}

// ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to
// avoid races when a block is only partially uploaded. This relates to all blocks, excluding:
// - repair created blocks
// - compactor created blocks
// NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks.
// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377
if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
return nil, blockTooFreshSentinelError
}

return &meta, nil
}

// GroupKey returns a unique identifier for the group the block belongs to. It considers
// the downsampling resolution and the block's labels.
func GroupKey(meta metadata.Meta) string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down