diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index c95cf113d6..a7f3fff550 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -98,6 +98,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). Default("20").Int() + compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups."). + Default("1").Int() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { return runCompact(g, logger, reg, *httpAddr, @@ -116,6 +119,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *disableDownsampling, *maxCompactionLevel, *blockSyncConcurrency, + *compactionConcurrency, ) } } @@ -136,6 +140,7 @@ func runCompact( disableDownsampling bool, maxCompactionLevel int, blockSyncConcurrency int, + concurrency int, ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", @@ -198,7 +203,10 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } - compactor := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt) + compactor, err := compact.NewBucketCompactor(logger, sy, comp, compactDir, bkt, concurrency) + if err != nil { + return errors.Wrap(err, "create bucket compactor") + } if retentionByResolution[compact.ResolutionLevelRaw].Seconds() != 0 { level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) diff --git a/docs/components/compact.md b/docs/components/compact.md index efd1455fc9..1c6eebb29d 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -31,44 +31,46 @@ usage: thanos compact [] continuously compacts blocks in an object store bucket Flags: - -h, --help Show context-sensitive help (also try --help-long and - --help-man). - --version Show application version. - --log.level=info Log filtering level. - --log.format=logfmt Log format to use. + -h, --help Show context-sensitive help (also try --help-long + and --help-man). + --version Show application version. + --log.level=info Log filtering level. + --log.format=logfmt Log format to use. --gcloudtrace.project=GCLOUDTRACE.PROJECT - GCP project to send Google Cloud Trace tracings to. - If empty, tracing will be disabled. + GCP project to send Google Cloud Trace tracings + to. If empty, tracing will be disabled. --gcloudtrace.sample-factor=1 - How often we send traces (1/). If 0 no - trace will be sent periodically, unless forced by - baggage item. See `pkg/tracing/tracing.go` for - details. + How often we send traces (1/). If + 0 no trace will be sent periodically, unless + forced by baggage item. See + `pkg/tracing/tracing.go` for details. --http-address="0.0.0.0:10902" - Listen host:port for HTTP endpoints. - --data-dir="./data" Data directory in which to cache blocks and process - compactions. + Listen host:port for HTTP endpoints. + --data-dir="./data" Data directory in which to cache blocks and + process compactions. --objstore.config-file= - Path to YAML file that contains object store - configuration. + Path to YAML file that contains object store + configuration. --objstore.config= - Alternative to 'objstore.config-file' flag. Object - store configuration in YAML. - --sync-delay=30m Minimum age of fresh (non-compacted) blocks before - they are being processed. + Alternative to 'objstore.config-file' flag. + Object store configuration in YAML. + --sync-delay=30m Minimum age of fresh (non-compacted) blocks + before they are being processed. --retention.resolution-raw=0d - How long to retain raw samples in bucket. 0d - - disables this retention + How long to retain raw samples in bucket. 0d - + disables this retention --retention.resolution-5m=0d - How long to retain samples of resolution 1 (5 - minutes) in bucket. 0d - disables this retention + How long to retain samples of resolution 1 (5 + minutes) in bucket. 0d - disables this retention --retention.resolution-1h=0d - How long to retain samples of resolution 2 (1 hour) - in bucket. 0d - disables this retention - -w, --wait Do not exit after all compactions have been processed - and wait for new work. + How long to retain samples of resolution 2 (1 + hour) in bucket. 0d - disables this retention + -w, --wait Do not exit after all compactions have been + processed and wait for new work. --block-sync-concurrency=20 - Number of goroutines to use when syncing block - metadata from object storage. + Number of goroutines to use when syncing block + metadata from object storage. + --compact.concurrency=1 Number of goroutines to use when compacting + groups. ``` diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6fd89b76e9..2de928ab6a 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/labels" + "golang.org/x/sync/errgroup" ) type ResolutionLevel int64 @@ -878,28 +879,68 @@ func (cg *Group) deleteBlock(b string) error { // BucketCompactor compacts blocks in a bucket. type BucketCompactor struct { - logger log.Logger - sy *Syncer - comp tsdb.Compactor - compactDir string - bkt objstore.Bucket + logger log.Logger + sy *Syncer + comp tsdb.Compactor + compactDir string + bkt objstore.Bucket + concurrency int } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket) *BucketCompactor { - return &BucketCompactor{ - logger: logger, - sy: sy, - comp: comp, - compactDir: compactDir, - bkt: bkt, +func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) { + if concurrency <= 0 { + return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0") } + return &BucketCompactor{ + logger: logger, + sy: sy, + comp: comp, + compactDir: compactDir, + bkt: bkt, + concurrency: concurrency, + }, nil } // Compact runs compaction over bucket. func (c *BucketCompactor) Compact(ctx context.Context) error { // Loop over bucket and compact until there's no work left. for { + var ( + errGroup, errGroupCtx = errgroup.WithContext(ctx) + groupChan = make(chan *Group) + finishedAllGroups = true + mtx sync.Mutex + ) + + // Set up workers who will compact the groups when the groups are ready. + for i := 0; i < c.concurrency; i++ { + errGroup.Go(func() error { + for g := range groupChan { + shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp) + if err == nil { + if shouldRerunGroup { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + } + continue + } + + if IsIssue347Error(err) { + if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } + } + return errors.Wrap(err, "compaction") + } + return nil + }) + } + // Clean up the compaction temporary directory at the beginning of every compaction loop. if err := os.RemoveAll(c.compactDir); err != nil { return errors.Wrap(err, "clean up the compaction temporary directory") @@ -923,24 +964,20 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { if err != nil { return errors.Wrap(err, "build compaction groups") } - finishedAllGroups := true - for _, g := range groups { - shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) - if err == nil { - if shouldRerunGroup { - finishedAllGroups = false - } - continue - } - if IsIssue347Error(err) { - if err := RepairIssue347(ctx, c.logger, c.bkt, err); err == nil { - finishedAllGroups = false - continue - } + // Send all groups found during this pass to the compaction workers. + for _, g := range groups { + select { + case <-errGroupCtx.Done(): + break + case groupChan <- g: } - return errors.Wrap(err, "compaction") } + close(groupChan) + if err := errGroup.Wait(); err != nil { + return err + } + if finishedAllGroups { break }