Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: group concurrency #1010

Merged
merged 17 commits into from
Apr 10, 2019
10 changes: 9 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").Int()

compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -116,6 +119,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
*compactionConcurrency,
)
}
}
Expand All @@ -136,6 +140,7 @@ func runCompact(
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -198,7 +203,10 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt)
compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency)
if err != nil {
return errors.Wrap(err, "create bucket compactor")
}

if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 {
level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw])
Expand Down
62 changes: 32 additions & 30 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,46 @@ usage: thanos compact [<flags>]
continuously compacts blocks in an object store bucket

Flags:
-h, --help Show context-sensitive help (also try --help-long and
--help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
-h, --help Show context-sensitive help (also try --help-long
and --help-man).
--version Show application version.
--log.level=info Log filtering level.
--log.format=logfmt Log format to use.
--gcloudtrace.project=GCLOUDTRACE.PROJECT
GCP project to send Google Cloud Trace tracings to.
If empty, tracing will be disabled.
GCP project to send Google Cloud Trace tracings
to. If empty, tracing will be disabled.
--gcloudtrace.sample-factor=1
How often we send traces (1/<sample-factor>). If 0 no
trace will be sent periodically, unless forced by
baggage item. See `pkg/tracing/tracing.go` for
details.
How often we send traces (1/<sample-factor>). If
0 no trace will be sent periodically, unless
forced by baggage item. See
`pkg/tracing/tracing.go` for details.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and process
compactions.
Listen host:port for HTTP endpoints.
--data-dir="./data" Data directory in which to cache blocks and
process compactions.
--objstore.config-file=<bucket.config-yaml-path>
Path to YAML file that contains object store
configuration.
Path to YAML file that contains object store
configuration.
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag. Object
store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks before
they are being processed.
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--sync-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed.
--retention.resolution-raw=0d
How long to retain raw samples in bucket. 0d -
disables this retention
How long to retain raw samples in bucket. 0d -
disables this retention
--retention.resolution-5m=0d
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
How long to retain samples of resolution 1 (5
minutes) in bucket. 0d - disables this retention
--retention.resolution-1h=0d
How long to retain samples of resolution 2 (1 hour)
in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been processed
and wait for new work.
How long to retain samples of resolution 2 (1
hour) in bucket. 0d - disables this retention
-w, --wait Do not exit after all compactions have been
processed and wait for new work.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
Number of goroutines to use when syncing block
metadata from object storage.
--compact.concurrency=1 Number of goroutines to use when compacting
groups.

```
91 changes: 64 additions & 27 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -878,28 +879,68 @@ func (cg *Group) deleteBlock(b string) error {

// BucketCompactor compacts blocks in a bucket.
type BucketCompactor struct {
logger log.Logger
sy *Syncer
comp tsdb.Compactor
compactDir string
bkt objstore.Bucket
logger log.Logger
sy *Syncer
comp tsdb.Compactor
compactDir string
bkt objstore.Bucket
concurrency int
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor {
return &BucketCompactor{
logger: logger,
sy: sy,
comp: comp,
compactDir: compactDir,
bkt: bkt,
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0")
}
return &BucketCompactor{
logger: logger,
sy: sy,
comp: comp,
compactDir: compactDir,
bkt: bkt,
concurrency: concurrency,
}, nil
}

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
// Loop over bucket and compact until there's no work left.
for {
var (
errGroup, errGroupCtx = errgroup.WithContext(ctx)
groupChan = make(chan *Group)
finishedAllGroups = true
mtx sync.Mutex
)

// Set up workers who will compact the groups when the groups are ready.
for i := 0; i < c.concurrency; i++ {
errGroup.Go(func() error {
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
return errors.Wrap(err, "compaction")
Copy link
Member

@povilasv povilasv Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more context here please, like which group this error-ed on.

}
return nil
})
}

// Clean up the compaction temporary directory at the beginning of every compaction loop.
if err := os.RemoveAll(c.compactDir); err != nil {
return errors.Wrap(err, "clean up the compaction temporary directory")
Expand All @@ -923,24 +964,20 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "build compaction groups")
}
finishedAllGroups := true
for _, g := range groups {
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
finishedAllGroups = false
}
continue
}

if IsIssue347Error(err) {
if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil {
finishedAllGroups = false
continue
}
// Send all groups found during this pass to the compaction workers.
for _, g := range groups {
povilasv marked this conversation as resolved.
Show resolved Hide resolved
select {
mjd95 marked this conversation as resolved.
Show resolved Hide resolved
case <-errGroupCtx.Done():
break
case groupChan <- g:
}
return errors.Wrap(err, "compaction")
}
close(groupChan)
if err := errGroup.Wait(); err != nil {
return err
}

if finishedAllGroups {
break
}
Expand Down