Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit and SubmitWait return error if called on stopped workerpool #80

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
Expand All @@ -14,6 +15,8 @@
idleTimeout = 2 * time.Second
)

var ErrStopped = errors.New("workerpool stopped")

// New creates and starts a pool of worker goroutines.
//
// The maxWorkers parameter specifies the maximum number of workers that can
Expand Down Expand Up @@ -104,23 +107,51 @@
// period until there are no more idle workers. Since the time to start new
// goroutines is not significant, there is no need to retain idle workers
// indefinitely.
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
func (p *WorkerPool) Submit(task func()) (err error) {
if task == nil {
return
}
defer func() {
r := recover()
if r != nil {
if e, ok := r.(error); ok {
if e.Error() == "send on closed channel" {
err = ErrStopped
return
}
}
panic(r)
}
}()
p.taskQueue <- task
return
}

// SubmitWait enqueues the given function and waits for it to be executed.
func (p *WorkerPool) SubmitWait(task func()) {
func (p *WorkerPool) SubmitWait(task func()) (err error) {
if task == nil {
return
}
defer func() {
r := recover()
if r != nil {
if e, ok := r.(error); ok {
if e.Error() == "send on closed channel" {
err = ErrStopped
return
}
}
panic(r)
}
}()

doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
return
}

// WaitingQueueSize returns the count of tasks in the waiting queue.
Expand Down Expand Up @@ -149,7 +180,7 @@
ready := new(sync.WaitGroup)
ready.Add(p.maxWorkers)
for i := 0; i < p.maxWorkers; i++ {
p.Submit(func() {

Check failure on line 183 in workerpool.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `p.Submit` is not checked (errcheck)
ready.Done()
select {
case <-ctx.Done():
Expand Down
39 changes: 39 additions & 0 deletions workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand All @@ -20,7 +21,7 @@
rspChan := make(chan string, len(requests))
for _, r := range requests {
r := r
wp.Submit(func() {

Check failure on line 24 in workerpool_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `wp.Submit` is not checked (errcheck)
rspChan <- r
})
}
Expand Down Expand Up @@ -63,7 +64,7 @@

// Start workers, and have them all wait on a channel before completing.
for i := 0; i < max; i++ {
wp.Submit(func() {

Check failure on line 67 in workerpool_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `wp.Submit` is not checked (errcheck)
started <- struct{}{}
<-release
})
Expand Down Expand Up @@ -97,7 +98,7 @@

// Cause worker to be created, and available for reuse before next task.
for i := 0; i < 10; i++ {
wp.Submit(func() { <-release })

Check failure on line 101 in workerpool_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `wp.Submit` is not checked (errcheck)
release <- struct{}{}
time.Sleep(time.Millisecond)
}
Expand Down Expand Up @@ -265,7 +266,7 @@

// Check that these are noop.
wp.Submit(nil)
wp.SubmitWait(nil)

Check failure on line 269 in workerpool_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `wp.SubmitWait` is not checked (errcheck)

done1 := make(chan struct{})
wp.Submit(func() {
Expand All @@ -279,7 +280,7 @@
}

done2 := make(chan struct{})
wp.SubmitWait(func() {

Check failure on line 283 in workerpool_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `wp.SubmitWait` is not checked (errcheck)
time.Sleep(100 * time.Millisecond)
close(done2)
})
Expand Down Expand Up @@ -598,6 +599,44 @@
wp.Stop()
}

func TestPanicSubmit(t *testing.T) {
defer goleak.VerifyNone(t)

wp := New(1)
wp.Stop()

done := make(chan struct{})
err := wp.Submit(func() {
close(done)
})
if err == nil {
t.Fatal("expected an error")
}
if !errors.Is(err, ErrStopped) {
t.Fatal("wrong error:", err)
}
select {
case <-done:
t.Fatal("task function should not have called")
default:
}

err = wp.SubmitWait(func() {
close(done)
})
if err == nil {
t.Fatal("expected an error")
}
if !errors.Is(err, ErrStopped) {
t.Fatal("wrong error:", err)
}
select {
case <-done:
t.Fatal("task function should not have called")
default:
}
}

func anyReady(w *WorkerPool) bool {
release := make(chan struct{})
wait := func() {
Expand Down
Loading