From e5a2201bda86823dab6d538ff46376e913b3f97e Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 11 Dec 2022 18:16:45 +0800 Subject: [PATCH 1/3] opt: cache current time for workders and update it periodically --- ants.go | 2 ++ ants_benchmark_test.go | 42 ++++++++++++++++++++++++++++++++++ ants_test.go | 52 +++++++++++++++++++++++++++++++----------- pool.go | 32 +++++++++++++++++++++----- pool_func.go | 32 +++++++++++++++++++++----- 5 files changed, 135 insertions(+), 25 deletions(-) diff --git a/ants.go b/ants.go index df77ace1..c8248f57 100644 --- a/ants.go +++ b/ants.go @@ -88,6 +88,8 @@ var ( defaultAntsPool, _ = NewPool(DefaultAntsPoolSize) ) +const nowTimeUpdateInterval = 500 * time.Millisecond + // Logger is used for logging formatted messages. type Logger interface { // Printf must have the same semantics as log.Printf. diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index 0dfd8ee1..c8364e59 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -25,6 +25,7 @@ package ants import ( "runtime" "sync" + "sync/atomic" "testing" "time" ) @@ -144,3 +145,44 @@ func BenchmarkAntsPoolThroughput(b *testing.B) { } b.StopTimer() } + +func BenchmarkTimeNow(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = time.Now() + } +} + +func BenchmarkTimeNowCache(b *testing.B) { + var ( + now atomic.Value + offset int32 + ) + + now.Store(time.Now()) + go func() { + for range time.Tick(500 * time.Millisecond) { + now.Store(time.Now()) + atomic.StoreInt32(&offset, 0) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = now.Load().(time.Time).Add(time.Duration(atomic.AddInt32(&offset, 1))) + } +} + +func BenchmarkTimeNowCache1(b *testing.B) { + var now atomic.Value + now.Store(time.Now()) + go func() { + for range time.Tick(500 * time.Millisecond) { + now.Store(time.Now()) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = now.Load().(time.Time) + } +} diff --git a/ants_test.go b/ants_test.go index 73425422..93bbbeac 100644 --- a/ants_test.go +++ b/ants_test.go @@ -322,22 +322,50 @@ func TestPoolPanicWithoutHandlerPreMalloc(t *testing.T) { _ = p1.Invoke("Oops!") } -func TestPurge(t *testing.T) { - p, err := NewPool(10) +func TestPurgePool(t *testing.T) { + size := 500 + ch := make(chan struct{}) + + p, err := NewPool(size) assert.NoErrorf(t, err, "create TimingPool failed: %v", err) defer p.Release() - _ = p.Submit(demoFunc) - time.Sleep(3 * DefaultCleanIntervalTime) - assert.EqualValues(t, 0, p.Running(), "all p should be purged") - p1, err := NewPoolWithFunc(10, demoPoolFunc) + + for i := 0; i < size; i++ { + j := i + 1 + _ = p.Submit(func() { + <-ch + d := j % 100 + time.Sleep(time.Duration(d) * time.Millisecond) + }) + } + assert.Equalf(t, size, p.Running(), "pool should be full, expected: %d, but got: %d", size, p.Running()) + + close(ch) + time.Sleep(5 * DefaultCleanIntervalTime) + assert.Equalf(t, 0, p.Running(), "pool should be empty after purge, but got %d", p.Running()) + + ch = make(chan struct{}) + f := func(i interface{}) { + <-ch + d := i.(int) % 100 + time.Sleep(time.Duration(d) * time.Millisecond) + } + + p1, err := NewPoolWithFunc(size, f) assert.NoErrorf(t, err, "create TimingPoolWithFunc failed: %v", err) defer p1.Release() - _ = p1.Invoke(1) - time.Sleep(3 * DefaultCleanIntervalTime) - assert.EqualValues(t, 0, p.Running(), "all p should be purged") + + for i := 0; i < size; i++ { + _ = p1.Invoke(i) + } + assert.Equalf(t, size, p1.Running(), "pool should be full, expected: %d, but got: %d", size, p1.Running()) + + close(ch) + time.Sleep(5 * DefaultCleanIntervalTime) + assert.Equalf(t, 0, p1.Running(), "pool should be empty after purge, but got %d", p1.Running()) } -func TestPurgePreMalloc(t *testing.T) { +func TestPurgePreMallocPool(t *testing.T) { p, err := NewPool(10, WithPreAlloc(true)) assert.NoErrorf(t, err, "create TimingPool failed: %v", err) defer p.Release() @@ -547,9 +575,7 @@ func TestInfinitePool(t *testing.T) { } var err error _, err = NewPool(-1, WithPreAlloc(true)) - if err != ErrInvalidPreAllocSize { - t.Errorf("expect ErrInvalidPreAllocSize but got %v", err) - } + assert.EqualErrorf(t, err, ErrInvalidPreAllocSize.Error(), "") } func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) { diff --git a/pool.go b/pool.go index d5e7b7b1..95909e9e 100644 --- a/pool.go +++ b/pool.go @@ -62,11 +62,13 @@ type Pool struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + now atomic.Value + options *Options } -// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *Pool) purgePeriodically(ctx context.Context) { +// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. +func (p *Pool) purgeStaleWorkers(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer func() { @@ -76,9 +78,9 @@ func (p *Pool) purgePeriodically(ctx context.Context) { for { select { - case <-heartbeat.C: case <-ctx.Done(): return + case <-heartbeat.C: } if p.IsClosed() { @@ -108,6 +110,20 @@ func (p *Pool) purgePeriodically(ctx context.Context) { } } +// ticktock is a goroutine that updates the current time in the pool regularly. +func (p *Pool) ticktock() { + ticker := time.NewTicker(nowTimeUpdateInterval) + defer ticker.Stop() + + for range ticker.C { + p.now.Store(time.Now()) + } +} + +func (p *Pool) nowTime() time.Time { + return p.now.Load().(time.Time) +} + // NewPool generates an instance of ants pool. func NewPool(size int, options ...Option) (*Pool, error) { opts := loadOptions(options...) @@ -154,8 +170,12 @@ func NewPool(size int, options ...Option) (*Pool, error) { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } + + p.now.Store(time.Now()) + go p.ticktock() + return p, nil } @@ -264,7 +284,7 @@ func (p *Pool) Reboot() { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } } } @@ -340,7 +360,7 @@ func (p *Pool) revertWorker(worker *goWorker) bool { p.cond.Broadcast() return false } - worker.recycleTime = time.Now() + worker.recycleTime = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope. diff --git a/pool_func.go b/pool_func.go index c72cf34f..6bca66e2 100644 --- a/pool_func.go +++ b/pool_func.go @@ -64,11 +64,13 @@ type PoolWithFunc struct { heartbeatDone int32 stopHeartbeat context.CancelFunc + now atomic.Value + options *Options } -// purgePeriodically clears expired workers periodically which runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { +// purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. +func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { heartbeat := time.NewTicker(p.options.ExpiryDuration) defer func() { heartbeat.Stop() @@ -78,9 +80,9 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { var expiredWorkers []*goWorkerWithFunc for { select { - case <-heartbeat.C: case <-ctx.Done(): return + case <-heartbeat.C: } if p.IsClosed() { @@ -123,6 +125,20 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { } } +// ticktock is a goroutine that updates the current time in the pool regularly. +func (p *PoolWithFunc) ticktock() { + ticker := time.NewTicker(nowTimeUpdateInterval) + defer ticker.Stop() + + for range ticker.C { + p.now.Store(time.Now()) + } +} + +func (p *PoolWithFunc) nowTime() time.Time { + return p.now.Load().(time.Time) +} + // NewPoolWithFunc generates an instance of ants pool with a specific function. func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWithFunc, error) { if size <= 0 { @@ -171,8 +187,12 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } + + p.now.Store(time.Now()) + go p.ticktock() + return p, nil } @@ -285,7 +305,7 @@ func (p *PoolWithFunc) Reboot() { var ctx context.Context ctx, p.stopHeartbeat = context.WithCancel(context.Background()) if !p.options.DisablePurge { - go p.purgePeriodically(ctx) + go p.purgeStaleWorkers(ctx) } } } @@ -368,7 +388,7 @@ func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool { p.cond.Broadcast() return false } - worker.recycleTime = time.Now() + worker.recycleTime = p.nowTime() p.lock.Lock() // To avoid memory leaks, add a double check in the lock scope. From 9f631c3da79f0f9df38ac14cc0efd6b18fc0204d Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 11 Dec 2022 19:17:37 +0800 Subject: [PATCH 2/3] chore: add errorgroup for benchmark --- ants_benchmark_test.go | 85 ++++++++++++++++-------------------------- go.mod | 5 ++- go.sum | 2 + 3 files changed, 38 insertions(+), 54 deletions(-) diff --git a/ants_benchmark_test.go b/ants_benchmark_test.go index c8364e59..360443a8 100644 --- a/ants_benchmark_test.go +++ b/ants_benchmark_test.go @@ -25,15 +25,16 @@ package ants import ( "runtime" "sync" - "sync/atomic" "testing" "time" + + "golang.org/x/sync/errgroup" ) const ( - RunTimes = 1000000 + RunTimes = 1e6 + PoolCap = 5e4 BenchParam = 10 - BenchAntsSize = 200000 DefaultExpiredTime = 10 * time.Second ) @@ -76,10 +77,11 @@ func BenchmarkGoroutines(b *testing.B) { } } -func BenchmarkSemaphore(b *testing.B) { +func BenchmarkChannel(b *testing.B) { var wg sync.WaitGroup - sema := make(chan struct{}, BenchAntsSize) + sema := make(chan struct{}, PoolCap) + b.ResetTimer() for i := 0; i < b.N; i++ { wg.Add(RunTimes) for j := 0; j < RunTimes; j++ { @@ -94,12 +96,31 @@ func BenchmarkSemaphore(b *testing.B) { } } +func BenchmarkErrGroup(b *testing.B) { + var wg sync.WaitGroup + var pool errgroup.Group + pool.SetLimit(PoolCap) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + wg.Add(RunTimes) + for j := 0; j < RunTimes; j++ { + pool.Go(func() error { + demoFunc() + wg.Done() + return nil + }) + } + wg.Wait() + } +} + func BenchmarkAntsPool(b *testing.B) { var wg sync.WaitGroup - p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime)) + p, _ := NewPool(PoolCap, WithExpiryDuration(DefaultExpiredTime)) defer p.Release() - b.StartTimer() + b.ResetTimer() for i := 0; i < b.N; i++ { wg.Add(RunTimes) for j := 0; j < RunTimes; j++ { @@ -110,7 +131,6 @@ func BenchmarkAntsPool(b *testing.B) { } wg.Wait() } - b.StopTimer() } func BenchmarkGoroutinesThroughput(b *testing.B) { @@ -122,7 +142,7 @@ func BenchmarkGoroutinesThroughput(b *testing.B) { } func BenchmarkSemaphoreThroughput(b *testing.B) { - sema := make(chan struct{}, BenchAntsSize) + sema := make(chan struct{}, PoolCap) for i := 0; i < b.N; i++ { for j := 0; j < RunTimes; j++ { sema <- struct{}{} @@ -135,54 +155,13 @@ func BenchmarkSemaphoreThroughput(b *testing.B) { } func BenchmarkAntsPoolThroughput(b *testing.B) { - p, _ := NewPool(BenchAntsSize, WithExpiryDuration(DefaultExpiredTime)) + p, _ := NewPool(PoolCap, WithExpiryDuration(DefaultExpiredTime)) defer p.Release() - b.StartTimer() - for i := 0; i < b.N; i++ { - for j := 0; j < RunTimes; j++ { - _ = p.Submit(demoFunc) - } - } - b.StopTimer() -} - -func BenchmarkTimeNow(b *testing.B) { - for i := 0; i < b.N; i++ { - _ = time.Now() - } -} - -func BenchmarkTimeNowCache(b *testing.B) { - var ( - now atomic.Value - offset int32 - ) - - now.Store(time.Now()) - go func() { - for range time.Tick(500 * time.Millisecond) { - now.Store(time.Now()) - atomic.StoreInt32(&offset, 0) - } - }() b.ResetTimer() for i := 0; i < b.N; i++ { - _ = now.Load().(time.Time).Add(time.Duration(atomic.AddInt32(&offset, 1))) - } -} - -func BenchmarkTimeNowCache1(b *testing.B) { - var now atomic.Value - now.Store(time.Now()) - go func() { - for range time.Tick(500 * time.Millisecond) { - now.Store(time.Now()) + for j := 0; j < RunTimes; j++ { + _ = p.Submit(demoFunc) } - }() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - _ = now.Load().(time.Time) } } diff --git a/go.mod b/go.mod index df32ce2f..e307bfb8 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/panjf2000/ants/v2 go 1.13 -require github.com/stretchr/testify v1.8.1 +require ( + github.com/stretchr/testify v1.8.1 + golang.org/x/sync v0.1.0 +) diff --git a/go.sum b/go.sum index 2ec90f70..96e82f82 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From fc731bc17df7ed3bbc8079e6b3f51dea70cd4517 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 11 Dec 2022 19:40:00 +0800 Subject: [PATCH 3/3] opt: leverage binary-search algorithm to speed up PoolWithFunc.purgeStaleWorkers() --- .github/workflows/test.yml | 2 +- pool_func.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b7f531fd..b31e475b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -40,7 +40,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.50.1 - args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot + args: --timeout 5m -v -E gofumpt -E gocritic -E misspell -E revive -E godot test: needs: lint strategy: diff --git a/pool_func.go b/pool_func.go index 6bca66e2..a3cf2249 100644 --- a/pool_func.go +++ b/pool_func.go @@ -89,17 +89,25 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { break } - currentTime := time.Now() + criticalTime := time.Now().Add(-p.options.ExpiryDuration) + p.lock.Lock() idleWorkers := p.workers n := len(idleWorkers) - var i int - for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.options.ExpiryDuration; i++ { + l, r, mid := 0, n-1, 0 + for l <= r { + mid = (l + r) / 2 + if criticalTime.Before(idleWorkers[mid].recycleTime) { + r = mid - 1 + } else { + l = mid + 1 + } } + i := r + 1 expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...) if i > 0 { m := copy(idleWorkers, idleWorkers[i:]) - for i = m; i < n; i++ { + for i := m; i < n; i++ { idleWorkers[i] = nil } p.workers = idleWorkers[:m]