Skip to content

Commit

Permalink
Make optimization to Pool and PoolWithFunc struct
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Oct 10, 2019
1 parent 5697095 commit 5ecbdf4
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 68 deletions.
2 changes: 1 addition & 1 deletion ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Option func(opts *Options)

// Options contains all options which will be applied when instantiating a ants pool.
type Options struct {
// ExpiryDuration set the expired time (second) of every worker.
// ExpiryDuration set the expired time of every worker.
ExpiryDuration time.Duration

// PreAlloc indicate whether to make memory pre-allocation when initializing Pool.
Expand Down
8 changes: 6 additions & 2 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ func TestMaxBlockingSubmit(t *testing.T) {

func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := NewPoolWithFunc(poolSize, longRunningPoolFunc, WithNonblocking(true))
ch1 := make(chan struct{})
p, err := NewPoolWithFunc(poolSize, func(i interface{}) {
longRunningPoolFunc(i)
close(ch1)
}, WithNonblocking(true))
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
Expand All @@ -506,7 +510,7 @@ func TestNonblockingSubmitWithFunc(t *testing.T) {
}
// interrupt f to get an available worker
close(ch)
time.Sleep(1 * time.Second)
<-ch1
if err := p.Invoke(nil); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
Expand Down
42 changes: 12 additions & 30 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Pool struct {
// running is the number of the currently running goroutines.
running int32

// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration

// workers is a slice that store the available workers.
workers workerArray

Expand All @@ -59,27 +56,15 @@ type Pool struct {
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool

// panicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
panicHandler func(interface{})

// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
maxBlockingTasks int32

// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int

// When nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When nonblocking is true, MaxBlockingTasks is inoperative.
nonblocking bool
options *Options
}

// Clear expired workers periodically.
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()

for range heartbeat.C {
Expand All @@ -88,7 +73,7 @@ func (p *Pool) periodicallyPurge() {
}

p.lock.Lock()
expiredWorkers := p.workers.findOutExpiry(p.expiryDuration)
expiredWorkers := p.workers.findOutExpiry(p.options.ExpiryDuration)
p.lock.Unlock()

// Notify obsolete workers to stop.
Expand Down Expand Up @@ -126,12 +111,9 @@ func NewPool(size int, options ...Option) (*Pool, error) {
}

p := &Pool{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
capacity: int32(size),
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache = sync.Pool{
New: func() interface{} {
Expand All @@ -141,7 +123,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
}
},
}
if opts.PreAlloc {
if p.options.PreAlloc {
p.workers = newWorkerArray(loopQueueType, size)
} else {
p.workers = newWorkerArray(stackType, 0)
Expand Down Expand Up @@ -187,7 +169,7 @@ func (p *Pool) Cap() int {

// Tune changes the capacity of this pool.
func (p *Pool) Tune(size int) {
if p.Cap() == size {
if size < 0 || p.Cap() == size || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
Expand Down Expand Up @@ -232,12 +214,12 @@ func (p *Pool) retrieveWorker() *goWorker {
p.lock.Unlock()
spawnWorker()
} else {
if p.nonblocking {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry:
if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
Expand Down
44 changes: 13 additions & 31 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type PoolWithFunc struct {
// running is the number of the currently running goroutines.
running int32

// expiryDuration set the expired time (second) of every worker.
expiryDuration time.Duration

// workers is a slice that store the available workers.
workers []*goWorkerWithFunc

Expand All @@ -62,27 +59,15 @@ type PoolWithFunc struct {
// workerCache speeds up the obtainment of the an usable worker in function:retrieveWorker.
workerCache sync.Pool

// panicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
panicHandler func(interface{})

// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
maxBlockingTasks int32

// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32
// blockingNum is the number of the goroutines already been blocked on pool.Submit, protected by pool.lock
blockingNum int

// When nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When nonblocking is true, MaxBlockingTasks is inoperative.
nonblocking bool
options *Options
}

// Clear expired workers periodically.
func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer heartbeat.Stop()

var expiredWorkers []*goWorkerWithFunc
Expand All @@ -95,7 +80,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
idleWorkers := p.workers
n := len(idleWorkers)
var i int
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ {
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.options.ExpiryDuration; i++ {
}
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
if i > 0 {
Expand Down Expand Up @@ -147,13 +132,10 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
}

p := &PoolWithFunc{
capacity: int32(size),
expiryDuration: opts.ExpiryDuration,
poolFunc: pf,
nonblocking: opts.Nonblocking,
maxBlockingTasks: int32(opts.MaxBlockingTasks),
panicHandler: opts.PanicHandler,
lock: internal.NewSpinLock(),
capacity: int32(size),
poolFunc: pf,
lock: internal.NewSpinLock(),
options: opts,
}
p.workerCache = sync.Pool{
New: func() interface{} {
Expand All @@ -163,7 +145,7 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
}
},
}
if opts.PreAlloc {
if p.options.PreAlloc {
p.workers = make([]*goWorkerWithFunc, 0, size)
}
p.cond = sync.NewCond(p.lock)
Expand Down Expand Up @@ -206,7 +188,7 @@ func (p *PoolWithFunc) Cap() int {

// Tune change the capacity of this pool.
func (p *PoolWithFunc) Tune(size int) {
if p.Cap() == size {
if size < 0 || p.Cap() == size || p.options.PreAlloc {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
Expand Down Expand Up @@ -259,12 +241,12 @@ func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
p.lock.Unlock()
spawnWorker()
} else {
if p.nonblocking {
if p.options.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry:
if p.maxBlockingTasks != 0 && p.blockingNum >= p.maxBlockingTasks {
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (w *goWorker) run() {
defer func() {
w.pool.decRunning()
if p := recover(); p != nil {
if w.pool.panicHandler != nil {
w.pool.panicHandler(p)
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
log.Printf("worker exits from a panic: %v\n", p)
var buf [4096]byte
Expand Down
4 changes: 2 additions & 2 deletions worker_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (w *goWorkerWithFunc) run() {
defer func() {
w.pool.decRunning()
if p := recover(); p != nil {
if w.pool.panicHandler != nil {
w.pool.panicHandler(p)
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
log.Printf("worker with func exits from a panic: %v\n", p)
var buf [4096]byte
Expand Down

0 comments on commit 5ecbdf4

Please sign in to comment.