From c2ea1732ca71e64d88c6bd64deca0d831a770ca9 Mon Sep 17 00:00:00 2001 From: plastikfan Date: Wed, 29 May 2024 21:33:58 +0100 Subject: [PATCH] feat(ants,boost): add client context (#273) --- boost/examples/alpha/main.go | 67 +++++++++++++++++++ boost/examples/beta/main.go | 64 ++++++++++++++++++ boost/pool-defs-internal.go | 2 + boost/worker-pool-func.go | 7 +- boost/worker-pool-func_test.go | 38 ++++++++++- boost/worker-pool-manifold.go | 7 +- ...-pool-submitter.go => worker-pool-task.go} | 15 +++-- ...itter_test.go => worker-pool-task_test.go} | 10 ++- internal/ants/ants_test.go | 43 +++++++----- internal/ants/pool-func.go | 31 +++++---- internal/ants/pool.go | 24 ++++--- internal/ants/worker-func.go | 15 +++-- internal/ants/worker-queue.go | 5 +- internal/ants/worker.go | 15 +++-- 14 files changed, 282 insertions(+), 61 deletions(-) create mode 100644 boost/examples/alpha/main.go create mode 100644 boost/examples/beta/main.go rename boost/{worker-pool-submitter.go => worker-pool-task.go} (78%) rename boost/{worker-pool-submitter_test.go => worker-pool-task_test.go} (84%) diff --git a/boost/examples/alpha/main.go b/boost/examples/alpha/main.go new file mode 100644 index 0000000..5293d2f --- /dev/null +++ b/boost/examples/alpha/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/snivilised/lorax/boost" + "github.com/snivilised/lorax/internal/ants" +) + +// Demonstrates that when all workers are engaged and the pool is at capacity, +// new incoming jobs are blocked, until a worker becomes free. The invoked function +// takes a second to complete. The PRE and POST indicators reflect this: +// +// PRE: <--- (n: 0) [13:56:22] ๐Ÿ‹ +// => running: '0') +// POST: <--- (n: 0) [13:56:22] ๐ŸŠ +// PRE: <--- (n: 1) [13:56:22] ๐Ÿ‹ +// => running: '1') +// POST: <--- (n: 1) [13:56:22] ๐ŸŠ +// PRE: <--- (n: 2) [13:56:22] ๐Ÿ‹ +// => running: '2') +// POST: <--- (n: 2) [13:56:22] ๐ŸŠ +// PRE: <--- (n: 3) [13:56:22] ๐Ÿ‹ +// => running: '3') +// <--- (n: 2)๐Ÿ’ +// <--- (n: 1)๐Ÿ’ +// <--- (n: 0)๐Ÿ’ +// <--- (n: 3)๐Ÿ’ +// POST: <--- (n: 3) [13:56:23] ๐ŸŠ +// +// Considering the above, whilst the pool is not at capacity, each new submission is +// executed immediately, as a new worker can be allocated to those jobs (n=0..2). +// Once the pool has reached capacity (n=3), the PRE is blocked, because its corresponding +// POST doesn't happen until a second later; this illustrates the blocking. +// + +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const NoW = 3 + + pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(inputCh ants.InputParam) { + n, _ := inputCh.(int) + fmt.Printf("<--- (n: %v)๐Ÿ’ \n", n) + time.Sleep(time.Second) + }, &wg, ants.WithNonblocking(false)) + + defer pool.Release() + + for i := 0; i < 30; i++ { // producer + fmt.Printf("PRE: <--- (n: %v) [%v] ๐Ÿ‹ \n", i, time.Now().Format(time.TimeOnly)) + _ = pool.Post(i) + fmt.Printf("POST: <--- (n: %v) [%v] ๐ŸŠ \n", i, time.Now().Format(time.TimeOnly)) + } + + fmt.Printf("pool with func, running workers number:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (func-pool) FINISHED") +} diff --git a/boost/examples/beta/main.go b/boost/examples/beta/main.go new file mode 100644 index 0000000..38179b2 --- /dev/null +++ b/boost/examples/beta/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/snivilised/lorax/boost" +) + +// Demonstrates that when all workers are engaged and the pool is at capacity, +// new incoming jobs are blocked, until a worker becomes free. The invoked function +// takes a second to complete. The PRE and POST indicators reflect this: +// +// PRE: <--- (n: 0) [13:41:49] ๐Ÿ‹ +// POST: <--- (n: 0) [13:41:49] ๐ŸŠ +// PRE: <--- (n: 1) [13:41:49] ๐Ÿ‹ +// POST: <--- (n: 1) [13:41:49] ๐ŸŠ +// PRE: <--- (n: 2) [13:41:49] ๐Ÿ‹ +// POST: <--- (n: 2) [13:41:49] ๐ŸŠ +// PRE: <--- (n: 3) [13:41:49] ๐Ÿ‹ +// => running: '3') +// <--- (n: 2)๐Ÿ’ +// => running: '3') +// <--- (n: 1)๐Ÿ’ +// => running: '3') +// <--- (n: 0)๐Ÿ’ +// POST: <--- (n: 3) [13:41:50] ๐ŸŠ +// +// Considering the above, whilst the pool is not at capacity, each new submission is +// executed immediately, as a new worker can be allocated to those jobs (n=0..2). +// Once the pool has reached capacity (n=3), the PRE is blocked, because its corresponding +// POST doesn't happen until a second later; this illustrates the blocking. +// + +func main() { + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const NoW = 3 + + pool, _ := boost.NewTaskPool[int, int](ctx, NoW, &wg) + + defer pool.Release() + + for i := 0; i < 30; i++ { // producer + fmt.Printf("PRE: <--- (n: %v) [%v] ๐Ÿ‹ \n", i, time.Now().Format(time.TimeOnly)) + _ = pool.Post(func() { + fmt.Printf("=> running: '%v')\n", pool.Running()) + fmt.Printf("<--- (n: %v)๐Ÿ’ \n", i) + time.Sleep(time.Second) + }) + fmt.Printf("POST: <--- (n: %v) [%v] ๐ŸŠ \n", i, time.Now().Format(time.TimeOnly)) + } + + fmt.Printf("task pool, running workers number:%d\n", + pool.Running(), + ) + wg.Wait() + fmt.Println("๐Ÿ (task-pool) FINISHED") +} diff --git a/boost/pool-defs-internal.go b/boost/pool-defs-internal.go index d4ac44f..135bd4b 100644 --- a/boost/pool-defs-internal.go +++ b/boost/pool-defs-internal.go @@ -1,6 +1,7 @@ package boost import ( + "context" "sync" "github.com/snivilised/lorax/internal/ants" @@ -31,6 +32,7 @@ type ( workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O] basePool struct { + ctx context.Context idGen IDGenerator wg *sync.WaitGroup } diff --git a/boost/worker-pool-func.go b/boost/worker-pool-func.go index 9f159f4..9b9342c 100644 --- a/boost/worker-pool-func.go +++ b/boost/worker-pool-func.go @@ -1,6 +1,7 @@ package boost import ( + "context" "sync" "github.com/snivilised/lorax/internal/ants" @@ -14,7 +15,8 @@ type WorkerPoolInvoker[I, O any] struct { // NewFuncPool creates a new worker pool using the native ants interface; ie // new jobs are submitted with Submit(task TaskFunc) -func NewFuncPool[I, O any](size int, +func NewFuncPool[I, O any](ctx context.Context, + size int, pf ants.PoolFunc, wg *sync.WaitGroup, options ...Option, @@ -24,13 +26,14 @@ func NewFuncPool[I, O any](size int, // allocated for each job, but this is not necessarily // the case, because each worker has its own job queue. // - pool, err := ants.NewPoolWithFunc(size, func(i ants.InputParam) { + pool, err := ants.NewPoolWithFunc(ctx, size, func(i ants.InputParam) { defer wg.Done() pf(i) }, options...) return &WorkerPoolInvoker[I, O]{ basePool: basePool{ + ctx: ctx, idGen: &Sequential{}, wg: wg, }, diff --git a/boost/worker-pool-func_test.go b/boost/worker-pool-func_test.go index 87b4a2e..8dc27b3 100644 --- a/boost/worker-pool-func_test.go +++ b/boost/worker-pool-func_test.go @@ -1,21 +1,27 @@ package boost_test import ( + "context" "sync" + "time" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok . "github.com/onsi/gomega" //nolint:revive // gomega ok "github.com/snivilised/lorax/boost" + "github.com/snivilised/lorax/internal/ants" ) var _ = Describe("WorkerPoolFunc", func() { Context("ants", func() { - It("should: not fail", func() { + It("should: not fail", func(specCtx SpecContext) { // TestNonblockingSubmit var wg sync.WaitGroup - pool, err := boost.NewFuncPool[int, int](AntsSize, demoPoolFunc, &wg) + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + + pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg) defer pool.Release() @@ -30,5 +36,33 @@ var _ = Describe("WorkerPoolFunc", func() { Expect(err).To(Succeed()) }) + + When("shorter", func() { + It("should: not fail", func(specCtx SpecContext) { + // TestNonblockingSubmit + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + + pool, err := boost.NewFuncPool[int, int](ctx, 3, func(inputCh ants.InputParam) { + n, _ := inputCh.(int) + time.Sleep(time.Duration(n) * time.Millisecond) + }, &wg) + + defer pool.Release() + + for i := 0; i < 10; i++ { // producer + _ = pool.Post(Param) + } + wg.Wait() + GinkgoWriter.Printf("pool with func, running workers number:%d\n", + pool.Running(), + ) + ShowMemStats() + + Expect(err).To(Succeed()) + }) + }) }) }) diff --git a/boost/worker-pool-manifold.go b/boost/worker-pool-manifold.go index e1247ef..01b7b39 100644 --- a/boost/worker-pool-manifold.go +++ b/boost/worker-pool-manifold.go @@ -1,6 +1,7 @@ package boost import ( + "context" "sync" "github.com/snivilised/lorax/internal/ants" @@ -14,12 +15,13 @@ type WorkerPoolManifold[I, O any] struct { sourceJobsChIn JobStream[I] } -func NewManifoldPool[I, O any](size int, +func NewManifoldPool[I, O any](ctx context.Context, + size int, pf ants.PoolFunc, wg *sync.WaitGroup, options ...Option, ) (*WorkerPoolManifold[I, O], error) { - pool, err := ants.NewPoolWithFunc(size, func(i ants.InputParam) { + pool, err := ants.NewPoolWithFunc(ctx, size, func(i ants.InputParam) { defer wg.Done() pf(i) @@ -27,6 +29,7 @@ func NewManifoldPool[I, O any](size int, return &WorkerPoolManifold[I, O]{ basePool: basePool{ + ctx: ctx, idGen: &Sequential{}, wg: wg, }, diff --git a/boost/worker-pool-submitter.go b/boost/worker-pool-task.go similarity index 78% rename from boost/worker-pool-submitter.go rename to boost/worker-pool-task.go index 2eb8eaf..eac69cd 100644 --- a/boost/worker-pool-submitter.go +++ b/boost/worker-pool-task.go @@ -23,6 +23,7 @@ package boost // ManifoldE: func[I, O any]() O, error // import ( + "context" "sync" "github.com/snivilised/lorax/internal/ants" @@ -34,16 +35,18 @@ type WorkerPool[I, O any] struct { sourceJobsChIn JobStream[I] } -// NewSubmitterPool creates a new worker pool using the native ants interface; ie +// NewTaskPool creates a new worker pool using the native ants interface; ie // new jobs are submitted with Submit(task TaskFunc) -func NewSubmitterPool[I, O any](size int, +func NewTaskPool[I, O any](ctx context.Context, + size int, wg *sync.WaitGroup, options ...Option, ) (*WorkerPool[I, O], error) { - pool, err := ants.NewPool(size, options...) + pool, err := ants.NewPool(ctx, size, options...) return &WorkerPool[I, O]{ basePool: basePool{ + ctx: ctx, idGen: &Sequential{}, wg: wg, }, @@ -56,13 +59,17 @@ func NewSubmitterPool[I, O any](size int, func (p *WorkerPool[I, O]) Post(task ants.TaskFunc) error { p.wg.Add(1) - return p.pool.Submit(func() { + return p.pool.Submit(p.ctx, func() { defer p.wg.Done() task() }) } +func (p *WorkerPool[I, O]) Running() int { + return p.pool.Running() +} + func (p *WorkerPool[I, O]) Release() { p.pool.Release() } diff --git a/boost/worker-pool-submitter_test.go b/boost/worker-pool-task_test.go similarity index 84% rename from boost/worker-pool-submitter_test.go rename to boost/worker-pool-task_test.go index b22e411..d9b813f 100644 --- a/boost/worker-pool-submitter_test.go +++ b/boost/worker-pool-task_test.go @@ -1,6 +1,7 @@ package boost_test import ( + "context" "sync" . "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok @@ -10,13 +11,16 @@ import ( "github.com/snivilised/lorax/internal/ants" ) -var _ = Describe("WorkerPoolSubmitter", func() { +var _ = Describe("WorkerPoolTask", func() { Context("ants", func() { - It("should: not fail", func() { + It("should: not fail", func(specCtx SpecContext) { // TestNonblockingSubmit var wg sync.WaitGroup - pool, err := boost.NewSubmitterPool[int, int](PoolSize, &wg, + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + + pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg, ants.WithNonblocking(true), ) defer pool.Release() diff --git a/internal/ants/ants_test.go b/internal/ants/ants_test.go index 4a76163..ac1492b 100644 --- a/internal/ants/ants_test.go +++ b/internal/ants/ants_test.go @@ -1,6 +1,7 @@ package ants_test import ( + "context" "sync" "time" @@ -14,18 +15,20 @@ var _ = Describe("Ants", func() { Context("NewPool", func() { Context("Submit", func() { When("non-blocking", func() { - It("๐Ÿงช should: not fail", func() { + It("๐Ÿงช should: not fail", func(specCtx SpecContext) { // TestNonblockingSubmit // ??? defer leaktest.Check(GinkgoT())() + ctx, cancel := context.WithCancel(specCtx) + defer cancel() poolSize := 10 - pool, err := ants.NewPool(poolSize, ants.WithNonblocking(true)) + pool, err := ants.NewPool(ctx, poolSize, ants.WithNonblocking(true)) Expect(err).To(Succeed(), "create TimingPool failed") defer pool.Release() for i := 0; i < poolSize-1; i++ { - Expect(pool.Submit(longRunningFunc)).To(Succeed(), + Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) } @@ -36,35 +39,37 @@ var _ = Describe("Ants", func() { close(secondCh) } // p is full now. - Expect(pool.Submit(fn)).To(Succeed(), + Expect(pool.Submit(ctx, fn)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) - Expect(pool.Submit(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), + Expect(pool.Submit(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), "nonblocking submit when pool is full should get an ErrPoolOverload", ) // interrupt fn to get an available worker close(firstCh) <-secondCh - Expect(pool.Submit(demoFunc)).To(Succeed(), + Expect(pool.Submit(ctx, demoFunc)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) }) }) When("max blocking", func() { - It("๐Ÿงช should: not fail", func() { + It("๐Ÿงช should: not fail", func(specCtx SpecContext) { // TestMaxBlockingSubmit // ??? defer leaktest.Check(GinkgoT())() + ctx, cancel := context.WithCancel(specCtx) + defer cancel() poolSize := 10 - pool, err := ants.NewPool(poolSize, ants.WithMaxBlockingTasks(1)) + pool, err := ants.NewPool(ctx, poolSize, ants.WithMaxBlockingTasks(1)) Expect(err).To(Succeed(), "create TimingPool failed") defer pool.Release() for i := 0; i < poolSize-1; i++ { - Expect(pool.Submit(longRunningFunc)).To(Succeed(), + Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(), "blocking submit when pool is not full shouldn't return error", ) } @@ -73,7 +78,7 @@ var _ = Describe("Ants", func() { <-ch } // p is full now. - Expect(pool.Submit(fn)).To(Succeed(), + Expect(pool.Submit(ctx, fn)).To(Succeed(), "nonblocking submit when pool is not full shouldn't return error", ) @@ -82,14 +87,14 @@ var _ = Describe("Ants", func() { errCh := make(chan error, 1) go func() { // should be blocked. blocking num == 1 - if err := pool.Submit(demoFunc); err != nil { + if err := pool.Submit(ctx, demoFunc); err != nil { errCh <- err } wg.Done() }() time.Sleep(1 * time.Second) // already reached max blocking limit - Expect(pool.Submit(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), + Expect(pool.Submit(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()), "blocking submit when pool reach max blocking submit should return ErrPoolOverload", ) @@ -110,11 +115,14 @@ var _ = Describe("Ants", func() { Context("NewPoolWithFunc", func() { Context("Invoke", func() { When("waiting to get worker", func() { - It("๐Ÿงช should: not fail", func() { + It("๐Ÿงช should: not fail", func(specCtx SpecContext) { // TestAntsPoolWithFuncWaitToGetWorker + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + var wg sync.WaitGroup - pool, _ := ants.NewPoolWithFunc(AntsSize, func(i ants.InputParam) { + pool, _ := ants.NewPoolWithFunc(ctx, AntsSize, func(i ants.InputParam) { demoPoolFunc(i) wg.Done() }) @@ -133,11 +141,14 @@ var _ = Describe("Ants", func() { }) When("waiting to get worker with pre malloc", func() { - It("๐Ÿงช should: not fail", func() { + It("๐Ÿงช should: not fail", func(specCtx SpecContext) { // TestAntsPoolWithFuncWaitToGetWorkerPreMalloc + ctx, cancel := context.WithCancel(specCtx) + defer cancel() + var wg sync.WaitGroup - pool, _ := ants.NewPoolWithFunc(AntsSize, func(i ants.InputParam) { + pool, _ := ants.NewPoolWithFunc(ctx, AntsSize, func(i ants.InputParam) { demoPoolFunc(i) wg.Done() }, ants.WithPreAlloc(true)) diff --git a/internal/ants/pool-func.go b/internal/ants/pool-func.go index c14bca6..35868cb 100644 --- a/internal/ants/pool-func.go +++ b/internal/ants/pool-func.go @@ -34,6 +34,8 @@ import ( // PoolWithFunc accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type PoolWithFunc struct { + // client defined context + ctx context.Context // capacity of the pool. capacity int32 @@ -73,7 +75,7 @@ type PoolWithFunc struct { } // purgeStaleWorkers clears stale workers periodically, it runs in an individual goroutine, as a scavenger. -func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { +func (p *PoolWithFunc) purgeStaleWorkers(purgeCtx context.Context) { ticker := time.NewTicker(p.o.ExpiryDuration) defer func() { ticker.Stop() @@ -82,7 +84,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -116,7 +118,7 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *PoolWithFunc) ticktock(ctx context.Context) { +func (p *PoolWithFunc) ticktock(ticktockCtx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() @@ -125,7 +127,7 @@ func (p *PoolWithFunc) ticktock(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: } @@ -144,16 +146,16 @@ func (p *PoolWithFunc) goPurge() { } // Start a goroutine to clean up expired workers periodically. - var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) - go p.purgeStaleWorkers(ctx) + var purgeCtx context.Context + purgeCtx, p.stopPurge = context.WithCancel(p.ctx) + go p.purgeStaleWorkers(purgeCtx) } func (p *PoolWithFunc) goTicktock() { p.now.Store(time.Now()) - var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) - go p.ticktock(ctx) + var ticktockCtx context.Context + ticktockCtx, p.stopTicktock = context.WithCancel(p.ctx) + go p.ticktock(ticktockCtx) } func (p *PoolWithFunc) nowTime() time.Time { @@ -161,7 +163,11 @@ func (p *PoolWithFunc) nowTime() time.Time { } // NewPoolWithFunc instantiates a PoolWithFunc with customized options. -func NewPoolWithFunc(size int, pf PoolFunc, options ...Option) (*PoolWithFunc, error) { +func NewPoolWithFunc(ctx context.Context, + size int, + pf PoolFunc, + options ...Option, +) (*PoolWithFunc, error) { if size <= 0 { size = -1 } @@ -185,6 +191,7 @@ func NewPoolWithFunc(size int, pf PoolFunc, options ...Option) (*PoolWithFunc, e } p := &PoolWithFunc{ + ctx: ctx, capacity: int32(size), poolFunc: pf, lock: async.NewSpinLock(), @@ -226,7 +233,7 @@ func (p *PoolWithFunc) Invoke(job InputParam) error { w, err := p.retrieveWorker() if w != nil { - w.sendParam(job) + w.sendParam(p.ctx, job) } return err diff --git a/internal/ants/pool.go b/internal/ants/pool.go index 119970c..0af6a09 100644 --- a/internal/ants/pool.go +++ b/internal/ants/pool.go @@ -34,6 +34,9 @@ import ( // Pool accepts the tasks and process them concurrently, // it limits the total of goroutines to a given number by recycling goroutines. type Pool struct { + // client defined context + ctx context.Context + // capacity of the pool, a negative value means that the capacity of pool is // limitless, an infinite pool is used to avoid potential issue of endless // blocking caused by nested usage of a pool: submitting a task to pool which @@ -76,7 +79,7 @@ type Pool struct { // purgeStaleWorkers clears stale workers periodically, it runs in an // individual goroutine, as a scavenger. -func (p *Pool) purgeStaleWorkers(ctx context.Context) { +func (p *Pool) purgeStaleWorkers(purgeCtx context.Context) { ticker := time.NewTicker(p.o.ExpiryDuration) defer func() { @@ -86,7 +89,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-purgeCtx.Done(): return case <-ticker.C: } @@ -121,7 +124,7 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) { } // ticktock is a goroutine that updates the current time in the pool regularly. -func (p *Pool) ticktock(ctx context.Context) { +func (p *Pool) ticktock(ticktockCtx context.Context) { ticker := time.NewTicker(nowTimeUpdateInterval) defer func() { ticker.Stop() @@ -130,7 +133,7 @@ func (p *Pool) ticktock(ctx context.Context) { for { select { - case <-ctx.Done(): + case <-ticktockCtx.Done(): return case <-ticker.C: } @@ -150,14 +153,14 @@ func (p *Pool) goPurge() { // Start a goroutine to clean up expired workers periodically. var ctx context.Context - ctx, p.stopPurge = context.WithCancel(context.Background()) + ctx, p.stopPurge = context.WithCancel(p.ctx) go p.purgeStaleWorkers(ctx) } func (p *Pool) goTicktock() { p.now.Store(time.Now()) var ctx context.Context - ctx, p.stopTicktock = context.WithCancel(context.Background()) + ctx, p.stopTicktock = context.WithCancel(p.ctx) go p.ticktock(ctx) } @@ -166,7 +169,7 @@ func (p *Pool) nowTime() time.Time { } // NewPool instantiates a Pool with customized options. -func NewPool(size int, options ...Option) (*Pool, error) { +func NewPool(ctx context.Context, size int, options ...Option) (*Pool, error) { if size <= 0 { size = -1 } @@ -186,6 +189,7 @@ func NewPool(size int, options ...Option) (*Pool, error) { } p := &Pool{ + ctx: ctx, capacity: int32(size), lock: async.NewSpinLock(), o: opts, @@ -222,14 +226,14 @@ func NewPool(size int, options ...Option) (*Pool, error) { // get blocked with the last Pool.Submit() call once the current Pool // runs out of its capacity, and to avoid this, you should instantiate // a Pool with ants.WithNonblocking(true). -func (p *Pool) Submit(task TaskFunc) error { +func (p *Pool) Submit(ctx context.Context, task TaskFunc) error { if p.IsClosed() { return ErrPoolClosed } w, err := p.retrieveWorker() if w != nil { - w.sendTask(task) + w.sendTask(ctx, task) } return err @@ -346,7 +350,7 @@ func (p *Pool) addWaiting(delta int) { // retrieveWorker returns an available worker to run the tasks. func (p *Pool) retrieveWorker() (w worker, err error) { - p.lock.Lock() + p.lock.Lock() // why isn't the unlock just deferred? retry: // First try to fetch the worker from the queue. diff --git a/internal/ants/worker-func.go b/internal/ants/worker-func.go index 77293bb..1c16013 100644 --- a/internal/ants/worker-func.go +++ b/internal/ants/worker-func.go @@ -23,6 +23,7 @@ package ants import ( + "context" "runtime/debug" "time" ) @@ -77,17 +78,23 @@ func (w *goWorkerWithFunc) run() { } func (w *goWorkerWithFunc) finish() { - w.inputCh <- nil // โœจ + select { + case <-w.pool.ctx.Done(): + case w.inputCh <- nil: // โœจ + } } func (w *goWorkerWithFunc) lastUsedTime() time.Time { return w.lastUsed } -func (w *goWorkerWithFunc) sendTask(TaskFunc) { +func (w *goWorkerWithFunc) sendTask(context.Context, TaskFunc) { panic("unreachable") } -func (w *goWorkerWithFunc) sendParam(job InputParam) { - w.inputCh <- job +func (w *goWorkerWithFunc) sendParam(ctx context.Context, job InputParam) { + select { + case <-ctx.Done(): + case w.inputCh <- job: + } } diff --git a/internal/ants/worker-queue.go b/internal/ants/worker-queue.go index 8892832..5a26bb0 100644 --- a/internal/ants/worker-queue.go +++ b/internal/ants/worker-queue.go @@ -23,6 +23,7 @@ package ants import ( + "context" "errors" "time" ) @@ -39,8 +40,8 @@ type worker interface { run() finish() lastUsedTime() time.Time - sendTask(TaskFunc) - sendParam(InputParam) + sendTask(context.Context, TaskFunc) + sendParam(context.Context, InputParam) } type workerQueue interface { diff --git a/internal/ants/worker.go b/internal/ants/worker.go index 5f4722f..4fa6501 100644 --- a/internal/ants/worker.go +++ b/internal/ants/worker.go @@ -23,6 +23,7 @@ package ants import ( + "context" "runtime/debug" "time" ) @@ -74,17 +75,23 @@ func (w *goWorker) run() { } func (w *goWorker) finish() { - w.taskCh <- nil // โœจ + select { + case <-w.pool.ctx.Done(): + case w.taskCh <- nil: // โœจ: + } } func (w *goWorker) lastUsedTime() time.Time { return w.lastUsed } -func (w *goWorker) sendTask(fn TaskFunc) { - w.taskCh <- fn +func (w *goWorker) sendTask(ctx context.Context, fn TaskFunc) { + select { + case <-ctx.Done(): + case w.taskCh <- fn: + } } -func (w *goWorker) sendParam(InputParam) { +func (w *goWorker) sendParam(context.Context, InputParam) { panic("unreachable") }