diff --git a/pkg/utils/io.go b/pkg/utils/io.go index 7ce6a9125..ff52f843e 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -207,31 +207,34 @@ func (c *SyncClosers) AcquireReference() bool { } newRef := ref + 1 if atomic.CompareAndSwapInt32(&c.ref, ref, newRef) { - log.Debugf("AcquireReference %p: %d", c, newRef) + // log.Debugf("AcquireReference %p: %d", c, newRef) return true } } } +const closersClosed = math.MinInt16 + func (c *SyncClosers) Close() error { - for { - ref := atomic.LoadInt32(&c.ref) - if ref < 0 { - return nil - } - newRef := ref - 1 - if newRef <= 0 { - newRef = math.MinInt16 - } - if atomic.CompareAndSwapInt32(&c.ref, ref, newRef) { - log.Debugf("Close %p: %d", c, ref) - if newRef > 0 { - return nil - } - break - } + ref := atomic.AddInt32(&c.ref, -1) + if ref > 0 { + // log.Debugf("ReleaseReference %p: %d", c, ref) + return nil + } + + if ref < -1 { + atomic.StoreInt32(&c.ref, closersClosed) + return nil + } + + // Attempt to acquire FinalClose permission. + // At this point, ref must be 0 or -1. We try to atomically change it to the closersClosed state. + // Only the first successful goroutine gets the cleanup permission. + if !atomic.CompareAndSwapInt32(&c.ref, ref, closersClosed) { + return nil } + // log.Debugf("FinalClose %p", c) var errs []error for _, closer := range c.closers { if closer != nil {