Skip to content

Commit

Permalink
fix: exit ticktock goroutine when pool is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
GlebRadchenko authored and panjf2000 committed Dec 20, 2022
1 parent 3fbd956 commit 23c4f48
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 38 deletions.
65 changes: 46 additions & 19 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type Pool struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

options *Options
Expand Down Expand Up @@ -111,15 +114,44 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() {
func (p *Pool) ticktock(ctx context.Context) {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

if p.IsClosed() {
break
}

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *Pool) startHeartbeat() {
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
}

func (p *Pool) startTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}

func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}
Expand Down Expand Up @@ -166,15 +198,8 @@ func NewPool(size int, options ...Option) (*Pool, error) {

p.cond = sync.NewCond(p.lock)

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()
p.startHeartbeat()
p.startTicktock()

return p, nil
}
Expand Down Expand Up @@ -259,17 +284,21 @@ func (p *Pool) Release() {

// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil {
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil {
return ErrPoolClosed
}

p.stopHeartbeat()
p.stopHeartbeat = nil
p.stopTicktock()
p.stopTicktock = nil
p.Release()

endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -281,11 +310,9 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error {
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.startHeartbeat()
atomic.StoreInt32(&p.ticktockDone, 0)
p.startTicktock()
}
}

Expand Down
65 changes: 46 additions & 19 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type PoolWithFunc struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

ticktockDone int32
stopTicktock context.CancelFunc

now atomic.Value

options *Options
Expand Down Expand Up @@ -134,15 +137,44 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() {
func (p *PoolWithFunc) ticktock(ctx context.Context) {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()
defer func() {
ticker.Stop()
atomic.StoreInt32(&p.ticktockDone, 1)
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

if p.IsClosed() {
break
}

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *PoolWithFunc) startHeartbeat() {
// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
}

func (p *PoolWithFunc) startTicktock() {
p.now.Store(time.Now())
var ctx context.Context
ctx, p.stopTicktock = context.WithCancel(context.Background())
go p.ticktock(ctx)
}

func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}
Expand Down Expand Up @@ -191,15 +223,8 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
}
p.cond = sync.NewCond(p.lock)

// Start a goroutine to clean up expired workers periodically.
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()
p.startHeartbeat()
p.startTicktock()

return p, nil
}
Expand Down Expand Up @@ -288,17 +313,21 @@ func (p *PoolWithFunc) Release() {

// ReleaseTimeout is like Release but with a timeout, it waits all workers to exit before timing out.
func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
if p.IsClosed() || p.stopHeartbeat == nil {
if p.IsClosed() || p.stopHeartbeat == nil || p.stopTicktock == nil {
return ErrPoolClosed
}

p.stopHeartbeat()
p.stopHeartbeat = nil
p.stopTicktock()
p.stopTicktock = nil
p.Release()

endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) {
if p.Running() == 0 &&
(p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) &&
atomic.LoadInt32(&p.ticktockDone) == 1 {
return nil
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -310,11 +339,9 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error {
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.heartbeatDone, 0)
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgeStaleWorkers(ctx)
}
p.startHeartbeat()
atomic.StoreInt32(&p.ticktockDone, 0)
p.startTicktock()
}
}

Expand Down

0 comments on commit 23c4f48

Please sign in to comment.