Skip to content

Commit

Permalink
Add a feature of rebooting a released pool
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Jan 16, 2020
1 parent b7fb5f3 commit d32d668
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 35 deletions.
12 changes: 11 additions & 1 deletion ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ const (

// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = time.Second
)

const (
// OPENED represents that the pool is opened.
OPENED = iota

// CLOSED represents that the pool is closed.
CLOSED = 1
CLOSED
)

var (
Expand Down Expand Up @@ -171,3 +176,8 @@ func Free() int {
func Release() {
defaultAntsPool.Release()
}

// Reboot reboots the default pool.
func Reboot() {
defaultAntsPool.Reboot()
}
54 changes: 54 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,60 @@ func TestMaxBlockingSubmitWithFunc(t *testing.T) {
default:
}
}

func TestRebootDefaultPool(t *testing.T) {
defer Release()
Reboot()
var wg sync.WaitGroup
wg.Add(1)
_ = Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
Release()
assert.EqualError(t, Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
Reboot()
wg.Add(1)
assert.NoError(t, Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()
}

func TestRebootNewPool(t *testing.T) {
var wg sync.WaitGroup
p, err := NewPool(10)
assert.NoErrorf(t, err, "create Pool failed: %v", err)
defer p.Release()
wg.Add(1)
_ = p.Submit(func() {
demoFunc()
wg.Done()
})
wg.Wait()
p.Release()
assert.EqualError(t, p.Submit(nil), ErrPoolClosed.Error(), "pool should be closed")
p.Reboot()
wg.Add(1)
assert.NoError(t, p.Submit(func() { wg.Done() }), "pool should be rebooted")
wg.Wait()

p1, err := NewPoolWithFunc(10, func(i interface{}) {
demoPoolFunc(i)
wg.Done()
})
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
wg.Add(1)
_ = p1.Invoke(1)
wg.Wait()
p1.Release()
assert.EqualError(t, p1.Invoke(nil), ErrPoolClosed.Error(), "pool should be closed")
p1.Reboot()
wg.Add(1)
assert.NoError(t, p1.Invoke(1), "pool should be rebooted")
wg.Wait()
}

func TestRestCodeCoverage(t *testing.T) {
_, err := NewPool(-1, WithExpiryDuration(-1))
t.Log(err)
Expand Down
32 changes: 17 additions & 15 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,15 @@ type Pool struct {
// workers is a slice that store the available workers.
workers workerArray

// release is used to notice the pool to closed itself.
release int32
// state is used to notice the pool to closed itself.
state int32

// lock for synchronous operation.
lock sync.Locker

// cond for waiting to get a idle worker.
cond *sync.Cond

// once makes sure releasing this pool will just be done for one time.
once sync.Once

// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool

Expand All @@ -62,13 +59,13 @@ type Pool struct {
options *Options
}

// Clear expired workers periodically.
// periodicallyPurge clears expired workers periodically.
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()

for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED {
if atomic.LoadInt32(&p.state) == CLOSED {
break
}

Expand Down Expand Up @@ -139,7 +136,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {

// Submit submits a task to this pool.
func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED {
if atomic.LoadInt32(&p.state) == CLOSED {
return ErrPoolClosed
}
var w *goWorker
Expand Down Expand Up @@ -175,12 +172,17 @@ func (p *Pool) Tune(size int) {

// Release Closes this pool.
func (p *Pool) Release() {
p.once.Do(func() {
atomic.StoreInt32(&p.release, 1)
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
})
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
p.workers.reset()
p.lock.Unlock()
}

// Reboot reboots a released pool.
func (p *Pool) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.periodicallyPurge()
}
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -242,7 +244,7 @@ func (p *Pool) retrieveWorker() *goWorker {

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() {
if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
return false
}
worker.recycleTime = time.Now()
Expand Down
40 changes: 21 additions & 19 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type PoolWithFunc struct {
// workers is a slice that store the available workers.
workers []*goWorkerWithFunc

// release is used to notice the pool to closed itself.
release int32
// state is used to notice the pool to closed itself.
state int32

// lock for synchronous operation.
lock sync.Locker
Expand All @@ -53,9 +53,6 @@ type PoolWithFunc struct {
// poolFunc is the function for processing tasks.
poolFunc func(interface{})

// once makes sure releasing this pool will just be done for one time.
once sync.Once

// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool

Expand All @@ -65,14 +62,14 @@ type PoolWithFunc struct {
options *Options
}

// Clear expired workers periodically.
// periodicallyPurge clears expired workers periodically.
func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()

var expiredWorkers []*goWorkerWithFunc
for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED {
if atomic.LoadInt32(&p.state) == CLOSED {
break
}
currentTime := time.Now()
Expand Down Expand Up @@ -158,7 +155,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi

// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED {
if atomic.LoadInt32(&p.state) == CLOSED {
return ErrPoolClosed
}
var w *goWorkerWithFunc
Expand Down Expand Up @@ -194,16 +191,21 @@ func (p *PoolWithFunc) Tune(size int) {

// Release Closes this pool.
func (p *PoolWithFunc) Release() {
p.once.Do(func() {
atomic.StoreInt32(&p.release, 1)
p.lock.Lock()
idleWorkers := p.workers
for _, w := range idleWorkers {
w.args <- nil
}
p.workers = nil
p.lock.Unlock()
})
atomic.StoreInt32(&p.state, CLOSED)
p.lock.Lock()
idleWorkers := p.workers
for _, w := range idleWorkers {
w.args <- nil
}
p.workers = nil
p.lock.Unlock()
}

// Reboot reboots a released pool.
func (p *PoolWithFunc) Reboot() {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
go p.periodicallyPurge()
}
}

//---------------------------------------------------------------------------
Expand Down Expand Up @@ -269,7 +271,7 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
if atomic.LoadInt32(&p.release) == CLOSED || p.Running() > p.Cap() {
if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
return false
}
worker.recycleTime = time.Now()
Expand Down

0 comments on commit d32d668

Please sign in to comment.