-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compact: add concurrency to group compact #898
Conversation
@bwplotka please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM my comments that I've deleted - I missed some sneaky braces. LGTM from a purely mechanical side.
afca3ac
to
de76399
Compare
@GiedriusS i have fixed the conflicts, please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we need something more complex than this.
- We need to ensure directory is different (that might be the case already)
- We have to limit the number of concurrent go routines. Essentially the main problem with compactor is not that it's too slow for typical cases. It's that it uses too much memory. We need to optimize the compaction process for it at some point, but essentially even single run for bigger project can take 30-50GB of RAM or sometimes more if people have milions of series.
This change will hit those users as now instead of "just" 60GB they will need 60GB * number of individual groups they have. For example for us it 30 different ones. So 60 * 30 GB suddenly with this PR merged. It's quite memory intensive part.
So I would suggest having something like this but have configurable number of workers to do it concurrently if user wish so and he has enough memory for their blocks. What do you think? (:
@bwplotka @GiedriusS Please review again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some question.
Also what do you think about moving to errgroup.Group instead?
errChan := make(chan error, len(groups)) | ||
groupChan := make(chan struct{}, groupCompactConcurrency) | ||
defer close(groupChan) | ||
for i := 0; i < groupCompactConcurrency; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am reviewing on mobile, so might miss something, but I don't get this loop. What is the purpose of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I get it, but seem my comment on line 917
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for update! but I think there is still some work to be done.. @earthdiaosi let me know what do you think!
@@ -886,7 +886,7 @@ func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, comp | |||
} | |||
|
|||
// Compact runs compaction over bucket. | |||
func (c *BucketCompactor) Compact(ctx context.Context) error { | |||
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just concurrency
would be enough.
func (c *BucketCompactor) Compact(ctx context.Context, groupCompactConcurrency int) error { | |
func (c *BucketCompactor) Compact(ctx context.Context, concurrency int) error { |
} | ||
} | ||
return errors.Wrap(err, "compaction") | ||
errChan <- errors.Wrap(err, "compaction") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use errgroup.Group
instead?
@@ -95,6 +95,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri | |||
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). | |||
Default("20").Int() | |||
|
|||
groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group."). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
groupCompactConcurrency := cmd.Flag("group-compact-concurrency", "Number of goroutines to use when compacting group."). | |
groupCompactConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting group."). |
@@ -206,7 +211,7 @@ func runCompact( | |||
|
|||
ctx, cancel := context.WithCancel(context.Background()) | |||
f := func() error { | |||
if err := compactor.Compact(ctx); err != nil { | |||
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := compactor.Compact(ctx, groupCompactConcurrency); err != nil { | |
if err := compactor.Compact(ctx, concurrency); err != nil { |
errChan := make(chan error, len(groups)) | ||
groupChan := make(chan struct{}, groupCompactConcurrency) | ||
defer close(groupChan) | ||
for i := 0; i < groupCompactConcurrency; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I get it, but seem my comment on line 917
shouldRerunGroup, _, err := g.Compact(ctx, c.compactDir, c.comp) | ||
if err == nil { | ||
if shouldRerunGroup { | ||
finishedAllGroups = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocker: Non-thread-safe memory accessed by mulitple go routines.
@@ -913,22 +913,42 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { | |||
return errors.Wrap(err, "build compaction groups") | |||
} | |||
finishedAllGroups := true | |||
var wg sync.WaitGroup | |||
errChan := make(chan error, len(groups)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need something more similiar to this https://github.com/improbable-eng/thanos/pull/887/files still.
This is for consistency and readability reasons. so:
- Run
concurrency
number of go routines BEFOREfor{
loop (891 line) - Do the whole sync Iterate over groups
for _, g := range groups {
````.
3. Send `g` via some channel with 0 queue.
4. In go routines do the g (`g.Compact`) and mark `finishedAllGroups` to false if there are still some group to run. Make sure to make it `atomic.Value` or use `sync.Mutex`.
5. After ` for _, g := range groups {` close channel and wait for all groups to finish. If `finishedAllGroups == true` return otherwise continue
@earthdiaosi thanks for raising this PR! I think it's a good optional addition, and can be a very helpful optimisation in some cases. We are working through a large backlog of data and this PR is very helpful for us. Since we need to use it as soon as possible, I've addressed @bwplotka's comments and raised another PR (#1010) based on top of yours addressing them. I hope that's okay! |
Closing to avoid confusion, hope it's ok with you @earthdiaosi ! Let us know if not, good work with starting with this! |
Changes
Allow for compacting of group in parallel.
Verification
Tested by running thanos compact to see that compacting much quicker when concurrency is added.