Skip to content

Commit

Permalink
compact: group concurrency improvements (#1029)
Browse files Browse the repository at this point in the history
* group concurrency improvements

* remove unnecessary error check

* add to wg in main goroutine
  • Loading branch information
mjd95 authored and bwplotka committed Apr 17, 2019
1 parent 9fad981 commit 6703f14
Showing 1 changed file with 30 additions and 15 deletions.
45 changes: 30 additions & 15 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

Expand All @@ -22,7 +23,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -921,17 +921,22 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
// Loop over bucket and compact until there's no work left.
for {
var (
errGroup, errGroupCtx = errgroup.WithContext(ctx)
groupChan = make(chan *Group)
finishedAllGroups = true
mtx sync.Mutex
wg sync.WaitGroup
workCtx, workCtxCancel = context.WithCancel(ctx)
groupChan = make(chan *Group)
errChan = make(chan error, c.concurrency)
finishedAllGroups = true
mtx sync.Mutex
)

// Set up workers who will compact the groups when the groups are ready.
// They will compact available groups until they encounter an error, after which they will stop.
for i := 0; i < c.concurrency; i++ {
errGroup.Go(func() error {
wg.Add(1)
go func() {
defer wg.Done()
for g := range groupChan {
shouldRerunGroup, _, err := g.Compact(errGroupCtx, c.compactDir, c.comp)
shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp)
if err == nil {
if shouldRerunGroup {
mtx.Lock()
Expand All @@ -942,17 +947,17 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
}

if IsIssue347Error(err) {
if err := RepairIssue347(errGroupCtx, c.logger, c.bkt, err); err == nil {
if err := RepairIssue347(workCtx, c.logger, c.bkt, err); err == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
}
}
return errors.Wrap(err, "compaction")
errChan <- errors.Wrap(err, fmt.Sprintf("compaction failed for group %s", g.Key()))
return
}
return nil
})
}()
}

// Clean up the compaction temporary directory at the beginning of every compaction loop.
Expand Down Expand Up @@ -980,16 +985,26 @@ func (c *BucketCompactor) Compact(ctx context.Context) error {
}

// Send all groups found during this pass to the compaction workers.
groupLoop:
for _, g := range groups {
select {
case <-errGroupCtx.Done():
break
case err = <-errChan:
break groupLoop
case groupChan <- g:
}
}
close(groupChan)
if err := errGroup.Wait(); err != nil {
return err
wg.Wait()

close(errChan)
workCtxCancel()
if err != nil {
errMsgs := []string{err.Error()}
// Collect any other errors reported by the workers.
for e := range errChan {
errMsgs = append(errMsgs, e.Error())
}
return errors.New(strings.Join(errMsgs, "; "))
}

if finishedAllGroups {
Expand Down

0 comments on commit 6703f14

Please sign in to comment.