diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..cb0a6ec --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,41 @@ +on: + push: + branches: + - master + - main + pull_request: +name: Build +jobs: + test: + name: Test + strategy: + matrix: + go-version: [1.15.x, 1.16.x, 1.17.x] + os: [ubuntu-latest, macos-latest, windows-latest] + runs-on: ${{ matrix.os }} + steps: + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + - name: Checkout code + uses: actions/checkout@v2 + - name: Test + run: go test -race -v ./ + codecov: + name: Upload coverage report to Codecov + runs-on: ubuntu-latest + steps: + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + - name: Checkout code + uses: actions/checkout@v2 + - name: Test + run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./ + - uses: codecov/codecov-action@v2 + with: + files: ./coverage.txt + fail_ci_if_error: true + verbose: true diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index cb67307..0000000 --- a/.travis.yml +++ /dev/null @@ -1,28 +0,0 @@ -language: go - -os: - - linux - - osx - - windows - -go: - - 1.11.x - - 1.12.x - - 1.13.x - - 1.14.x - - 1.15.x - - 1.16.x - - 1.17.x - -# Enable Go Modules -env: - - GO111MODULE=on - -# Skip go get -install: true - -script: - - go test ./ -race -coverprofile=coverage.txt -covermode=atomic -v - -after_success: - - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then curl -s https://codecov.io/bash > .codecov && chmod +x .codecov && ./.codecov; else bash <(curl -s https://codecov.io/bash); fi \ No newline at end of file diff --git a/README.md b/README.md index cc00e00..a894117 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ - +Build status @@ -31,8 +31,9 @@ Some common scenarios include: - Task panics are handled gracefully (configurable panic handler) - Supports Non-blocking and Blocking task submission modes (buffered / unbuffered) - Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See [benchmarks](./benchmark/README.md)) -- **New (since v1.3.0)**: configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy. -- **New (since v1.5.0)**: complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring). +- Configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy. +- Complete pool metrics such as number of running workers, tasks waiting in the queue [and more](#metrics--monitoring). +- **New (since v1.7.0)**: configurable parent context and graceful shutdown with deadline. - [API reference](https://pkg.go.dev/github.com/alitto/pond) ## How to install @@ -167,6 +168,11 @@ pool := pond.New(10, 1000, pond.PanicHandler(panicHandler))) eagerPool := pond.New(10, 1000, pond.Strategy(pond.Eager())) balancedPool := pond.New(10, 1000, pond.Strategy(pond.Balanced())) lazyPool := pond.New(10, 1000, pond.Strategy(pond.Lazy())) +``` +- **Context**: Configures a parent context on this pool to stop all workers when it is cancelled. The default value `context.Background()`. Example: +``` go +// This creates a pool that is stopped when myCtx is cancelled +pool := pond.New(10, 1000, pond.Context(myCtx)) ``` ### Resizing strategies @@ -177,6 +183,13 @@ The following chart illustrates the behaviour of the different pool resizing str As the name suggests, the "Eager" strategy always spawns an extra worker when there are no idles, which causes the pool to grow almost linearly with the number of submitted tasks. On the other end, the "Lazy" strategy creates one worker every N submitted tasks, where N is the maximum number of available CPUs ([GOMAXPROCS](https://golang.org/pkg/runtime/#GOMAXPROCS)). The "Balanced" strategy represents a middle ground between the previous two because it creates a worker every N/2 submitted tasks. +### Stopping a pool + +There are 3 methods available to stop a pool and release associated resources: +- `pool.Stop()`: stop accepting new tasks and signal all workers to stop processing new tasks. Tasks being processed by workers will continue until completion unless the process is terminated. +- `pool.StopAndWait()`: stop accepting new tasks and wait until all running and queued tasks have completed before returning. +- `pool.StopAndWaitFor(deadline time.Duration)`: similar to `StopAndWait` but with a deadline to prevent waiting indefinitely. + ### Metrics & monitoring Each worker pool instance exposes useful metrics that can be queried through the following methods: @@ -194,6 +207,14 @@ Each worker pool instance exposes useful metrics that can be queried through the In our [Prometheus example](./examples/prometheus/prometheus.go) we showcase how to configure collectors for these metrics and expose them to Prometheus. +## Examples + +- [Creating a worker pool with dynamic size](./examples/dynamic_size/dynamic_size.go) +- [Creating a worker pool with fixed size](./examples/fixed_size/fixed_size.go) +- [Creating a worker pool with a Context](./examples/pool_context/pool_context.go) +- [Exporting worker pool metrics to Prometheus](./examples/prometheus/prometheus.go) +- [Submitting groups of related tasks](./examples/task_group/task_group.go) + ## API Reference Full API reference is available at https://pkg.go.dev/github.com/alitto/pond diff --git a/benchmark/go.mod b/benchmark/go.mod index cc573f3..ac02c36 100644 --- a/benchmark/go.mod +++ b/benchmark/go.mod @@ -3,7 +3,7 @@ module github.com/alitto/pond/benchmark go 1.17 require ( - github.com/alitto/pond v1.3.0 + github.com/alitto/pond v1.6.1 github.com/gammazero/workerpool v1.1.2 github.com/panjf2000/ants/v2 v2.4.7 ) diff --git a/examples/dynamic_size/go.mod b/examples/dynamic_size/go.mod index 5b18ea5..8a91fb9 100644 --- a/examples/dynamic_size/go.mod +++ b/examples/dynamic_size/go.mod @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/dynamic_size go 1.17 require ( - github.com/alitto/pond v1.5.1 + github.com/alitto/pond v1.6.1 ) replace github.com/alitto/pond => ../../ diff --git a/examples/fixed_size/go.mod b/examples/fixed_size/go.mod index 54131d3..6ac8aa1 100644 --- a/examples/fixed_size/go.mod +++ b/examples/fixed_size/go.mod @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size go 1.17 require ( - github.com/alitto/pond v1.5.1 + github.com/alitto/pond v1.6.1 ) replace github.com/alitto/pond => ../../ diff --git a/examples/pool_context/go.mod b/examples/pool_context/go.mod new file mode 100644 index 0000000..449b708 --- /dev/null +++ b/examples/pool_context/go.mod @@ -0,0 +1,7 @@ +module github.com/alitto/pond/examples/pool_context + +go 1.17 + +require github.com/alitto/pond v1.6.1 + +replace github.com/alitto/pond => ../../ diff --git a/examples/pool_context/pool_context.go b/examples/pool_context/pool_context.go new file mode 100644 index 0000000..14ee93c --- /dev/null +++ b/examples/pool_context/pool_context.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "time" + + "github.com/alitto/pond" +) + +// Pressing Ctrl+C while this program is running will cause the program to terminate gracefully. +// Tasks being processed will continue until they finish, but queued tasks are cancelled. +func main() { + + // Create a context that will be cancelled when the user presses Ctrl+C (process receives termination signal). + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + // Create a pool and pass the context to it. + pool := pond.New(1, 1000, pond.Context(ctx)) + defer pool.StopAndWait() + + // Submit several long runnning tasks + var count int = 100 + for i := 0; i < count; i++ { + n := i + pool.Submit(func() { + fmt.Printf("Task #%d started\n", n) + time.Sleep(1 * time.Second) + fmt.Printf("Task #%d finished\n", n) + }) + } +} diff --git a/examples/prometheus/go.mod b/examples/prometheus/go.mod index 98e3b32..51a7975 100644 --- a/examples/prometheus/go.mod +++ b/examples/prometheus/go.mod @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/fixed_size go 1.17 require ( - github.com/alitto/pond v1.5.1 + github.com/alitto/pond v1.6.1 github.com/prometheus/client_golang v1.9.0 ) diff --git a/examples/task_group/go.mod b/examples/task_group/go.mod index 07cfeb7..83c53ca 100644 --- a/examples/task_group/go.mod +++ b/examples/task_group/go.mod @@ -3,7 +3,7 @@ module github.com/alitto/pond/examples/task_group go 1.17 require ( - github.com/alitto/pond v1.5.1 + github.com/alitto/pond v1.6.1 ) replace github.com/alitto/pond => ../../ diff --git a/pond.go b/pond.go index c5c849a..9f29c5b 100644 --- a/pond.go +++ b/pond.go @@ -1,6 +1,7 @@ package pond import ( + "context" "errors" "fmt" "runtime/debug" @@ -54,22 +55,31 @@ func Strategy(strategy ResizingStrategy) Option { } } -// PanicHandler allows to change the panic handler function for a worker pool +// PanicHandler allows to change the panic handler function of a worker pool func PanicHandler(panicHandler func(interface{})) Option { return func(pool *WorkerPool) { pool.panicHandler = panicHandler } } +// Context configures a parent context on a worker pool to stop all workers when it is cancelled +func Context(parentCtx context.Context) Option { + return func(pool *WorkerPool) { + pool.context, pool.contextCancel = context.WithCancel(parentCtx) + } +} + // WorkerPool models a pool of workers type WorkerPool struct { // Configurable settings - maxWorkers int - maxCapacity int - minWorkers int - idleTimeout time.Duration - strategy ResizingStrategy - panicHandler func(interface{}) + maxWorkers int + maxCapacity int + minWorkers int + idleTimeout time.Duration + strategy ResizingStrategy + panicHandler func(interface{}) + context context.Context + contextCancel context.CancelFunc // Atomic counters workerCount int32 idleWorkerCount int32 @@ -78,12 +88,11 @@ type WorkerPool struct { successfulTaskCount uint64 failedTaskCount uint64 // Private properties - tasks chan func() - purgerQuit chan struct{} - stopOnce sync.Once - waitGroup sync.WaitGroup - mutex sync.Mutex - stopped bool + tasks chan func() + stopOnce sync.Once + waitGroup sync.WaitGroup + mutex sync.Mutex + stopped bool } // New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers). @@ -120,17 +129,16 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool { pool.idleTimeout = defaultIdleTimeout } - // Create internal channels + // Initialize base context (if not already set) + if pool.context == nil { + Context(context.Background())(pool) + } + + // Create tasks channel pool.tasks = make(chan func(), pool.maxCapacity) - pool.purgerQuit = make(chan struct{}) // Start purger goroutine - pool.waitGroup.Add(1) - go func() { - defer pool.waitGroup.Done() - - pool.purge() - }() + go pool.purge() // Start minWorkers workers if pool.minWorkers > 0 { @@ -298,7 +306,7 @@ func (p *WorkerPool) SubmitAndWait(task func()) { } // SubmitBefore attempts to send a task for execution to this worker pool but aborts it -// if the task did not start before the given deadline +// if the task did not start before the given deadline. func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { if task == nil { return @@ -318,23 +326,59 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) { }) } -// Stop causes this pool to stop accepting tasks, without waiting for goroutines to exit +// Stop causes this pool to stop accepting new tasks and signals all workers to stop processing new tasks. +// Tasks being processed by workers will continue until completion unless the process is terminated. +// This method can only be called once. func (p *WorkerPool) Stop() { p.stopOnce.Do(func() { // Mark pool as stopped p.stopped = true - // Send the signal to stop the purger goroutine - close(p.purgerQuit) + // Stop accepting new tasks + close(p.tasks) + + // Terminate all workers & purger goroutine + p.contextCancel() }) } -// StopAndWait causes this pool to stop accepting tasks, waiting for all tasks in the queue to complete +// StopAndWait causes this pool to stop accepting new tasks and then waits for all tasks in the queue +// to complete before returning. This method can only be called once. func (p *WorkerPool) StopAndWait() { - p.Stop() + p.stopOnce.Do(func() { + // Mark pool as stopped + p.stopped = true + + // Stop accepting new tasks + close(p.tasks) + + // Wait for all workers to exit + p.waitGroup.Wait() + + // Terminate all workers & purger goroutine + p.contextCancel() + }) +} + +// StopAndWaitFor stops this pool and waits for all tasks in the queue to complete before returning +// or until the given deadline is reached, whichever comes first. This method can only be called once. +func (p *WorkerPool) StopAndWaitFor(deadline time.Duration) { + + // Detect if worker pool is already stopped + workersDone := make(chan struct{}) + go func() { + p.StopAndWait() + workersDone <- struct{}{} + }() - // Wait for all goroutines to exit - p.waitGroup.Wait() + // Wait until either all workers have exited or the deadline is reached + select { + case <-workersDone: + return + case <-time.After(deadline): + p.contextCancel() + return + } } // purge represents the work done by the purger goroutine @@ -343,7 +387,6 @@ func (p *WorkerPool) purge() { idleTicker := time.NewTicker(p.idleTimeout) defer idleTicker.Stop() -Purge: for { select { // Timed out waiting for any activity to happen, attempt to kill an idle worker @@ -351,14 +394,11 @@ Purge: if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers { p.tasks <- nil } - case <-p.purgerQuit: - break Purge + // Pool context was cancelled, exit + case <-p.context.Done(): + return } } - - // Send signal to stop all workers - close(p.tasks) - } // startWorkers creates new worker goroutines to run the given tasks @@ -370,7 +410,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool { } // Launch worker - go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) + go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask) return true } @@ -435,7 +475,7 @@ func (p *WorkerPool) Group() *TaskGroup { } // worker launches a worker goroutine -func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { +func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) { defer func() { // Decrement idle count @@ -452,20 +492,26 @@ func worker(firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitH // Increment idle count atomic.AddInt32(idleWorkerCount, 1) - for task := range tasks { - if task == nil { - // We have received a signal to quit + for { + select { + case <-context.Done(): + // Pool context was cancelled, exit return - } + case task, ok := <-tasks: + if task == nil || !ok { + // We have received a signal to quit + return + } - // Decrement idle count - atomic.AddInt32(idleWorkerCount, -1) + // Decrement idle count + atomic.AddInt32(idleWorkerCount, -1) - // We have received a task, execute it - taskExecutor(task) + // We have received a task, execute it + taskExecutor(task) - // Increment idle count - atomic.AddInt32(idleWorkerCount, 1) + // Increment idle count + atomic.AddInt32(idleWorkerCount, 1) + } } } diff --git a/pond_blackbox_test.go b/pond_blackbox_test.go index 88934db..36967b1 100644 --- a/pond_blackbox_test.go +++ b/pond_blackbox_test.go @@ -1,6 +1,7 @@ package pond_test import ( + "context" "fmt" "sync/atomic" "testing" @@ -23,7 +24,7 @@ func assertNotEqual(t *testing.T, expected interface{}, actual interface{}) { } } -func TestSubmitAndStopWaiting(t *testing.T) { +func TestSubmitAndStopWait(t *testing.T) { pool := pond.New(1, 5) @@ -42,6 +43,42 @@ func TestSubmitAndStopWaiting(t *testing.T) { assertEqual(t, int32(17), atomic.LoadInt32(&doneCount)) } +func TestSubmitAndStopWaitFor(t *testing.T) { + + pool := pond.New(1, 10) + + // Submit a long running task + var doneCount int32 + pool.Submit(func() { + time.Sleep(2 * time.Second) + atomic.AddInt32(&doneCount, 1) + }) + + // Wait 100ms for the task to complete + pool.StopAndWaitFor(50 * time.Millisecond) + + assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) +} + +func TestSubmitAndStopWaitForWithEnoughDeadline(t *testing.T) { + + pool := pond.New(1, 10) + + // Submit tasks + var doneCount int32 + for i := 0; i < 10; i++ { + pool.Submit(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }) + } + + // Wait until all submitted tasks complete + pool.StopAndWaitFor(1 * time.Second) + + assertEqual(t, int32(10), atomic.LoadInt32(&doneCount)) +} + func TestSubmitAndStopWaitingWithMoreWorkersThanTasks(t *testing.T) { pool := pond.New(18, 5) @@ -146,16 +183,22 @@ func TestSubmitBefore(t *testing.T) { atomic.AddInt32(&doneCount, 1) }) - // Submit a task that times out after 2ms + // Submit a task that times out after 5ms pool.SubmitBefore(func() { time.Sleep(5 * time.Millisecond) atomic.AddInt32(&doneCount, 1) }, 5*time.Millisecond) + // Submit a task that times out after 1s + pool.SubmitBefore(func() { + time.Sleep(5 * time.Millisecond) + atomic.AddInt32(&doneCount, 1) + }, 1*time.Second) + pool.StopAndWait() - // Only the first task must have executed - assertEqual(t, int32(1), atomic.LoadInt32(&doneCount)) + // Only 2 tasks must have executed + assertEqual(t, int32(2), atomic.LoadInt32(&doneCount)) } func TestSubmitBeforeWithNilTask(t *testing.T) { @@ -496,3 +539,32 @@ func TestMetricsAndGetters(t *testing.T) { assertEqual(t, uint64(17), pool.CompletedTasks()) assertEqual(t, uint64(0), pool.WaitingTasks()) } + +func TestSubmitWithContext(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + + pool := pond.New(1, 5, pond.Context(ctx)) + + var doneCount, taskCount int32 + + // Submit a long-running, cancellable task + pool.Submit(func() { + atomic.AddInt32(&taskCount, 1) + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + atomic.AddInt32(&doneCount, 1) + return + } + }) + + // Cancel the context + cancel() + + pool.StopAndWait() + + assertEqual(t, int32(1), atomic.LoadInt32(&taskCount)) + assertEqual(t, int32(0), atomic.LoadInt32(&doneCount)) +}