Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: cache current time for workders and update it periodically #261

Merged
merged 3 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.50.1
args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot
args: --timeout 5m -v -E gofumpt -E gocritic -E misspell -E revive -E godot
test:
needs: lint
strategy:
Expand Down
2 changes: 2 additions & 0 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ var (
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)

const nowTimeUpdateInterval = 500 * time.Millisecond

// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Expand Down
43 changes: 32 additions & 11 deletions ants_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"
)

const (
RunTimes = 1000000
RunTimes = 1e6
PoolCap = 5e4
BenchParam = 10
BenchAntsSize = 200000
DefaultExpiredTime = 10 * time.Second
)

Expand Down Expand Up @@ -75,10 +77,11 @@ func BenchmarkGoroutines(b *testing.B) {
}
}

func BenchmarkSemaphore(b *testing.B) {
func BenchmarkChannel(b *testing.B) {
var wg sync.WaitGroup
sema := make(chan struct{}, BenchAntsSize)
sema := make(chan struct{}, PoolCap)

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
Expand All @@ -93,12 +96,31 @@ func BenchmarkSemaphore(b *testing.B) {
}
}

func BenchmarkErrGroup(b *testing.B) {
var wg sync.WaitGroup
var pool errgroup.Group
pool.SetLimit(PoolCap)

b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
pool.Go(func() error {
demoFunc()
wg.Done()
return nil
})
}
wg.Wait()
}
}

func BenchmarkAntsPool(b *testing.B) {
var wg sync.WaitGroup
p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime))
p, _ := NewPool(PoolCap, WithExpiryDuration(DefaultExpiredTime))
defer p.Release()

b.StartTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
wg.Add(RunTimes)
for j := 0; j < RunTimes; j++ {
Expand All @@ -109,7 +131,6 @@ func BenchmarkAntsPool(b *testing.B) {
}
wg.Wait()
}
b.StopTimer()
}

func BenchmarkGoroutinesThroughput(b *testing.B) {
Expand All @@ -121,7 +142,7 @@ func BenchmarkGoroutinesThroughput(b *testing.B) {
}

func BenchmarkSemaphoreThroughput(b *testing.B) {
sema := make(chan struct{}, BenchAntsSize)
sema := make(chan struct{}, PoolCap)
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
sema <- struct{}{}
Expand All @@ -134,13 +155,13 @@ func BenchmarkSemaphoreThroughput(b *testing.B) {
}

func BenchmarkAntsPoolThroughput(b *testing.B) {
p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime))
p, _ := NewPool(PoolCap, WithExpiryDuration(DefaultExpiredTime))
defer p.Release()
b.StartTimer()

b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < RunTimes; j++ {
_ = p.Submit(demoFunc)
}
}
b.StopTimer()
}
52 changes: 39 additions & 13 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,22 +322,50 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) {
_ = p1.Invoke("Oops!")
}

func TestPurge(t *testing.T) {
p, err := NewPool(10)
func TestPurgePool(t *testing.T) {
size := 500
ch := make(chan struct{})

p, err := NewPool(size)
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
_ = p.Submit(demoFunc)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")
p1, err := NewPoolWithFunc(10, demoPoolFunc)

for i := 0; i < size; i++ {
j := i + 1
_ = p.Submit(func() {
<-ch
d := j % 100
time.Sleep(time.Duration(d) * time.Millisecond)
})
}
assert.Equalf(t, size, p.Running(), "pool should be full, expected: %d, but got: %d", size, p.Running())

close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running())

ch = make(chan struct{})
f := func(i interface{}) {
<-ch
d := i.(int) % 100
time.Sleep(time.Duration(d) * time.Millisecond)
}

p1, err := NewPoolWithFunc(size, f)
assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err)
defer p1.Release()
_ = p1.Invoke(1)
time.Sleep(3 * DefaultCleanIntervalTime)
assert.EqualValues(t, 0, p.Running(), "all p should be purged")

for i := 0; i < size; i++ {
_ = p1.Invoke(i)
}
assert.Equalf(t, size, p1.Running(), "pool should be full, expected: %d, but got: %d", size, p1.Running())

close(ch)
time.Sleep(5 * DefaultCleanIntervalTime)
assert.Equalf(t, 0, p1.Running(), "pool should be empty after purge, but got %d", p1.Running())
}

func TestPurgePreMalloc(t *testing.T) {
func TestPurgePreMallocPool(t *testing.T) {
p, err := NewPool(10, WithPreAlloc(true))
assert.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.Release()
Expand Down Expand Up @@ -547,9 +575,7 @@ func TestInfinitePool(t *testing.T) {
}
var err error
_, err = NewPool(-1, WithPreAlloc(true))
if err != ErrInvalidPreAllocSize {
t.Errorf("expect ErrInvalidPreAllocSize but got %v", err)
}
assert.EqualErrorf(t, err, ErrInvalidPreAllocSize.Error(), "")
}

func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) {
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/panjf2000/ants/v2

go 1.13

require github.com/stretchr/testify v1.8.1
require (
github.com/stretchr/testify v1.8.1
golang.org/x/sync v0.1.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
32 changes: 26 additions & 6 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ type Pool struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

now atomic.Value

options *Options
}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *Pool) purgePeriodically(ctx context.Context) {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *Pool) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration)

defer func() {
Expand All @@ -76,9 +78,9 @@ func (p *Pool) purgePeriodically(ctx context.Context) {

for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
case <-heartbeat.C:
}

if p.IsClosed() {
Expand Down Expand Up @@ -108,6 +110,20 @@ func (p *Pool) purgePeriodically(ctx context.Context) {
}
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *Pool) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *Pool) nowTime() time.Time {
return p.now.Load().(time.Time)
}

// NewPool generates an instance of ants pool.
func NewPool(size int, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
Expand Down Expand Up @@ -154,8 +170,12 @@ func NewPool(size int, options ...Option) (*Pool, error) {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()

return p, nil
}

Expand Down Expand Up @@ -264,7 +284,7 @@ func (p *Pool) Reboot() {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}
}
}
Expand Down Expand Up @@ -340,7 +360,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool {
p.cond.Broadcast()
return false
}
worker.recycleTime = time.Now()
worker.recycleTime = p.nowTime()
p.lock.Lock()

// To avoid memory leaks, add a double check in the lock scope.
Expand Down
48 changes: 38 additions & 10 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ type PoolWithFunc struct {
heartbeatDone int32
stopHeartbeat context.CancelFunc

now atomic.Value

options *Options
}

// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger.
func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
heartbeat := time.NewTicker(p.options.ExpiryDuration)
defer func() {
heartbeat.Stop()
Expand All @@ -78,26 +80,34 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
var expiredWorkers []*goWorkerWithFunc
for {
select {
case <-heartbeat.C:
case <-ctx.Done():
return
case <-heartbeat.C:
}

if p.IsClosed() {
break
}

currentTime := time.Now()
criticalTime := time.Now().Add(-p.options.ExpiryDuration)

p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers)
var i int
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.options.ExpiryDuration; i++ {
l, r, mid := 0, n-1, 0
for l <= r {
mid = (l + r) / 2
if criticalTime.Before(idleWorkers[mid].recycleTime) {
r = mid - 1
} else {
l = mid + 1
}
}
i := r + 1
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
if i > 0 {
m := copy(idleWorkers, idleWorkers[i:])
for i = m; i < n; i++ {
for i := m; i < n; i++ {
idleWorkers[i] = nil
}
p.workers = idleWorkers[:m]
Expand All @@ -123,6 +133,20 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) {
}
}

// ticktock is a goroutine that updates the current time in the pool regularly.
func (p *PoolWithFunc) ticktock() {
ticker := time.NewTicker(nowTimeUpdateInterval)
defer ticker.Stop()

for range ticker.C {
p.now.Store(time.Now())
}
}

func (p *PoolWithFunc) nowTime() time.Time {
return p.now.Load().(time.Time)
}

// NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) {
if size <= 0 {
Expand Down Expand Up @@ -171,8 +195,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}

p.now.Store(time.Now())
go p.ticktock()

return p, nil
}

Expand Down Expand Up @@ -285,7 +313,7 @@ func (p *PoolWithFunc) Reboot() {
var ctx context.Context
ctx, p.stopHeartbeat = context.WithCancel(context.Background())
if !p.options.DisablePurge {
go p.purgePeriodically(ctx)
go p.purgeStaleWorkers(ctx)
}
}
}
Expand Down Expand Up @@ -368,7 +396,7 @@ func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
p.cond.Broadcast()
return false
}
worker.recycleTime = time.Now()
worker.recycleTime = p.nowTime()
p.lock.Lock()

// To avoid memory leaks, add a double check in the lock scope.
Expand Down