Skip to content

Commit

Permalink
Merge pull request #33 from suborbital/connor/job-timeout
Browse files Browse the repository at this point in the history
Add TimeoutSeconds worker option
  • Loading branch information
cohix authored Aug 26, 2020
2 parents 28b2924 + 829c4db commit a10159e
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
```
When you `Do` some work, you get a `Result`. A result is like a Rust future or a JavaScript promise, it is something you can get the job's result from once it is finished.

Calling `Then()` will block until the job is complete, and then give you the return value from the Runnable's `Run`. Make sense?
Calling `Then()` will block until the job is complete, and then give you the return value from the Runnable's `Run`. Cool, right?

### Hive has some very powerful capabilities, visit the [get started guide](./docs/getstarted.md) to learn more.

Expand Down
9 changes: 9 additions & 0 deletions docs/getstarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ if err := grp.Wait(); err != nil {
```
Passing `PoolSize(3)` will spawn three goroutines to process `generic` jobs.

### Timeouts
By default, if a job becomes stuck and is blocking execution, it will block forever. If you want to have a worker time out after a certain amount of seconds on a stuck job, pass `hive.TimeoutSeconds` to Handle:
``` golang
h := hive.New()

doTimeout := h.Handle("timeout", timeoutRunner{}, hive.TimeoutSeconds(3))
```
When `TimeoutSeconds` is set and a job executes for longer than the provided number of seconds, the worker will move on to the next job and `ErrJobTimeout` will be returned to the Result. The failed job will continue to execute in the background, but its result will be discarded.

### Advanced Runnables

The `Runnable` interface defines an `OnStart` function which gives the Runnable the ability to prepare itself for incoming jobs. For example, when a Runnable is registered with a pool size greater than 1, the Runnable may need to provision resources for itself to enable handling jobs concurrently, and `OnStart` will be called once for each of those workers. Our [wasm implementation](https://github.com/suborbital/hivew/wasm) is a good example of this.
Expand Down
8 changes: 8 additions & 0 deletions hive/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ func PoolSize(size int) Option {
}
}

//TimeoutSeconds returns an Option with the job timeout seconds set
func TimeoutSeconds(timeout int) Option {
return func(opts workerOpts) workerOpts {
opts.jobTimeoutSeconds = timeout
return opts
}
}

//RetrySeconds returns an Option to set the worker retry seconds
func RetrySeconds(secs int) Option {
return func(opts workerOpts) workerOpts {
Expand Down
82 changes: 62 additions & 20 deletions hive/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import (
"github.com/pkg/errors"
)

const defaultChanSize = 1024
const (
defaultChanSize = 256
)

// ErrJobTimeout and others are errors related to workers
var (
ErrJobTimeout = errors.New("job timeout")
)

type worker struct {
runner Runnable
Expand Down Expand Up @@ -52,7 +59,7 @@ func (w *worker) start(runFunc RunFunc) error {
for {
// fill the "pool" with workThreads
for i := started; i < w.options.poolSize; i++ {
wt := newWorkThread(w.runner, w.workChan)
wt := newWorkThread(w.runner, w.workChan, w.options.jobTimeoutSeconds)

// give the runner opportunity to provision resources if needed
if err := w.runner.OnStart(); err != nil {
Expand Down Expand Up @@ -87,20 +94,22 @@ func (w *worker) isStarted() bool {
}

type workThread struct {
runner Runnable
workChan chan Job
ctx context.Context
cancelFunc context.CancelFunc
runner Runnable
workChan chan Job
timeoutSeconds int
ctx context.Context
cancelFunc context.CancelFunc
}

func newWorkThread(runner Runnable, workChan chan Job) *workThread {
func newWorkThread(runner Runnable, workChan chan Job, timeoutSeconds int) *workThread {
ctx, cancelFunc := context.WithCancel(context.Background())

wt := &workThread{
runner: runner,
workChan: workChan,
ctx: ctx,
cancelFunc: cancelFunc,
runner: runner,
workChan: workChan,
timeoutSeconds: timeoutSeconds,
ctx: ctx,
cancelFunc: cancelFunc,
}

return wt
Expand All @@ -117,7 +126,15 @@ func (wt *workThread) run(runFunc RunFunc) {
// wait for the next job
job := <-wt.workChan

result, err := wt.runner.Run(job, runFunc)
var result interface{}
var err error

if wt.timeoutSeconds == 0 {
result, err = wt.runner.Run(job, runFunc)
} else {
result, err = wt.runWithTimeout(job, runFunc)
}

if err != nil {
job.result.sendErr(err)
continue
Expand All @@ -128,23 +145,48 @@ func (wt *workThread) run(runFunc RunFunc) {
}()
}

func (wt *workThread) runWithTimeout(job Job, runFunc RunFunc) (interface{}, error) {
resultChan := make(chan interface{})
errChan := make(chan error)

go func() {
result, err := wt.runner.Run(job, runFunc)
if err != nil {
errChan <- err
} else {
resultChan <- result
}
}()

select {
case result := <-resultChan:
return result, nil
case err := <-errChan:
return nil, err
case <-time.After(time.Duration(time.Second * time.Duration(wt.timeoutSeconds))):
return nil, ErrJobTimeout
}
}

func (wt *workThread) Stop() {
wt.cancelFunc()
}

type workerOpts struct {
jobType string
poolSize int
numRetries int
retrySecs int
jobType string
poolSize int
jobTimeoutSeconds int
numRetries int
retrySecs int
}

func defaultOpts(jobType string) workerOpts {
o := workerOpts{
jobType: jobType,
poolSize: 1,
retrySecs: 3,
numRetries: 5,
jobType: jobType,
poolSize: 1,
jobTimeoutSeconds: 0,
retrySecs: 3,
numRetries: 5,
}

return o
Expand Down
24 changes: 24 additions & 0 deletions hive/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hive
import (
"log"
"testing"
"time"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -54,3 +55,26 @@ func TestRunnerWithOptionsAndError(t *testing.T) {
t.Error("expected error, did not get one")
}
}

type timeoutRunner struct{}

// Run runs a timeoutRunner job
func (g timeoutRunner) Run(job Job, run RunFunc) (interface{}, error) {
time.Sleep(time.Duration(time.Second * 3))

return nil, nil
}

func (g timeoutRunner) OnStart() error {
return nil
}

func TestRunnerWithJobTimeout(t *testing.T) {
h := New()

doTimeout := h.Handle("timeout", timeoutRunner{}, TimeoutSeconds(1))

if _, err := doTimeout("hello").Then(); err != ErrJobTimeout {
t.Error("job should have timed out, but did not")
}
}

0 comments on commit a10159e

Please sign in to comment.