-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: avoid panic from releasing unacquired semaphore #363
Conversation
a7188d3
to
b19b776
Compare
Codecov Report
@@ Coverage Diff @@
## main #363 +/- ##
==========================================
- Coverage 72.16% 72.10% -0.06%
==========================================
Files 42 43 +1
Lines 4092 4094 +2
==========================================
- Hits 2953 2952 -1
- Misses 857 862 +5
+ Partials 282 280 -2
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
@abursavich Thank you for your contribution. I thought about using func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
}
g.sem = make(chan token, n)
} The statement However, your concern is still valid. The pattern eg, egCtx := errgroup.WithContext(ctx)
for /* ... */ {
limiter.Acquire(ctx, 1)
eg.Go(func() error {
defer limiter.Release(1)
// ...
return nil
})
}
if err := eg.Wait(); err != nil {
return err
} is flawed since the error of eg, egCtx := errgroup.WithContext(ctx)
for /* ... */ {
if err := limiter.Acquire(ctx, 1); err != nil {
return err
}
eg.Go(func() error {
defer limiter.Release(1)
// ...
return nil
})
}
if err := eg.Wait(); err != nil {
return err
} |
BTW, thank you for informing |
The type used in the channel is actually 0 bytes: https://go.dev/play/p/ctojkieS8zW type token struct{}
// ...
func (g *Group) SetLimit(n int) {
// ...
g.sem = make(chan token, n)
} So its space complexity ends up being
This was also my initial fix, but I ended up changing it because:
Then I did this, which fixed all of those problems: limiter := semaphore.NewWeighted(opts.Concurrency)
eg, egCtx := errgroup.WithContext(ctx)
for /* ... */ {
if err := limiter.Acquire(ctx, 1); err != nil {
return err
}
eg.Go(func() error {
defer limiter.Release(1)
select {
case <-egCtx.Done():
return egCtx.Err()
default:
}
// ...
return nil
})
}
if err := eg.Wait(); err != nil {
return err
} But I felt like that was too complicated to be repeating everywhere, so I switched to eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(opts.Concurrency))
for /* ... */ {
eg.Go(func() error {
select {
case <-egCtx.Done():
return egCtx.Err()
default:
}
// ...
return nil
})
}
if err := eg.Wait(); err != nil {
return err
} But that still felt a little too complicated to be repeating everywhere, so I encapsulated all the grunge into eg, egCtx := syncutil.NewLimitGroup(ctx, int(opts.Concurrency))
for /* ... */ {
eg.Go(func() error {
// ...
return nil
})
}
if err := eg.Wait(); err != nil {
return err
} |
@abursavich You are right. The c.buf = mallocgc(mem, elem, true) |
content.go
Outdated
@@ -119,13 +118,10 @@ func TagN(ctx context.Context, target Target, srcReference string, dstReferences | |||
return err | |||
} | |||
|
|||
limiter := semaphore.NewWeighted(opts.Concurrency) | |||
eg, egCtx := errgroup.WithContext(ctx) | |||
eg, egCtx := syncutil.NewLimitGroup(ctx, int(opts.Concurrency)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an issue on casting int64
to int
. What if we lose precisions?
@Wwwsylvia Should we change the type of all occurrences of Concurrency
to int
from int64
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah int
should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sent a PR for this. See #376.
internal/syncutil/limitgroup.go
Outdated
|
||
// A LimitGroup is a collection of goroutines working on subtasks that are part of | ||
// the same overall task. | ||
type LimitGroup struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of LimitGroup
/ NewLimitGroup()
pair, it is better to align the name to LimitedGroup
/ LimitGroup()
, just like io.LimitedReader
/ io.LimitReader()
and LimitedRegion
/ LimitRegion
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree with the change to the constructor's name, but I made the change as requested.
LimitReader
is taking in a Reader
and limiting it, so its name is like a command "limit this reader". I think NewLimitedGroup
would be better than LimitGroup
, since it's creating the LimitedGroup
and not just wrapping an existing Group
with limits.
@abursavich @Wwwsylvia Do you also want to revisit internal/syncutil/limit.go for context cancellation? |
78c5148
to
43c0ad6
Compare
I think that this type of concurrent code is tricky and you should probably use whatever you're most comfortable with. I'm more comfortable with
Could probably tweak the func Go[T any](ctx context.Context, limiter *semaphore.Weighted, fn GoFunc[T], items ...T) error {
eg, egCtx := errgroup.WithContext(ctx)
for _, item := range items {
region := LimitRegion(egCtx, limiter)
if err := region.Start(); err != nil {
eg.Go(func() error { return err })
break
}
eg.Go(func(t T) func() error {
return func() error {
defer region.End()
return fn(egCtx, region, t)
}
}(item))
}
return eg.Wait()
} |
rebsae? |
Signed-off-by: Andy Bursavich <abursavich@gmail.com>
I've rebased this a couple times to keep it up to date with main. It's unclear to me if there are any more requested changes to this PR, or if the more recent discussion is about other related changes that should be made separately? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LGTM. Let's wait for review from @shizhMSFT.
+1 for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
When using this pattern:
If
ctx
is cancelled thenAcquire
may return an error, which will then causeRelease
to panic after it's called more times than the semaphore was successfully acquired.This change uses the SetLimit functionality which has been added to
errgroup.Group
and provides a small wrapper around it in the internal/syncutil package to improve the ergonomics of its usage.