Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prewarm #50

Merged
merged 2 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (g generic) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) {

// OnChange is called when Hive starts or stops a worker to handle jobs,
// and allows the Runnable to set up before receiving jobs or tear down if needed.
func (g generic) OnChange(change ChangeEvent) error {
func (g generic) OnChange(change hive.ChangeEvent) error {
return nil
}
```
Expand Down
8 changes: 7 additions & 1 deletion docs/getstarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (r recursive) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) {
return fmt.Sprintf("finished %s", job.String()), nil
}

func (r recursive) OnChange(change ChangeEvent) error {
func (r recursive) OnChange(change hive.ChangeEvent) error {
return nil
}
```
Expand Down Expand Up @@ -129,6 +129,12 @@ doBad := h.Handle("badRunner", badRunner{}, hive.RetrySeconds(1), hive.MaxRetrie
```
Any error from a failed worker will be returned to the first job that is attempted for that Runnable.

### Pre-warming
When a Runnable is mounted, it is simply registered as available to receive work. The Runnable is not actually invoked until the first job of the given type is received. For basic Runnables, this is normally fine, but for Runnables who use the `OnChange` method to provision resources, this can cause the first job to be slow. The `PreWarm` option is available to allow Runnables to be started as soon as they are mounted, rather than waiting for the first job. This mitigates cold-starts when anything expensive is needed at startup.
```
doExpensive := h.Handle("expensive", expensiveRunnable{}, hive.PreWarm())
```

### Shortcuts

There are also some shortcuts to make working with Hive a bit easier:
Expand Down
34 changes: 34 additions & 0 deletions hive/hive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/suborbital/grav/testutil"
)

type generic struct{}
Expand Down Expand Up @@ -124,3 +125,36 @@ func TestHiveResultThenDo(t *testing.T) {
<-wait
<-wait
}

type prewarmRunnable struct {
counter *testutil.AsyncCounter
}

func (p *prewarmRunnable) Run(job Job, ctx *Ctx) (interface{}, error) {
return nil, nil
}

func (p *prewarmRunnable) OnChange(change ChangeEvent) error {
if change == ChangeTypeStart {
p.counter.Count()
}

return nil
}

func TestPreWarmWorker(t *testing.T) {
counter := testutil.NewAsyncCounter(10)

runnable := &prewarmRunnable{
counter: counter,
}

h := New()
h.Handle("prewarm", runnable, PoolSize(3), PreWarm())

// checking to see if the prewarmRunnable's OnChange function is called
// without ever sending it a job (see Runnable above)
if err := counter.Wait(3, 1); err != nil {
t.Error(err)
}
}
9 changes: 9 additions & 0 deletions hive/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ func MaxRetries(count int) Option {
return opts
}
}

// PreWarm sets the worker to pre-warm itself to minimize cold start time.
// if not enabled, worker will "warm up" when it receives its first job.
func PreWarm() Option {
return func(opts workerOpts) workerOpts {
opts.preWarm = true
return opts
}
}
13 changes: 9 additions & 4 deletions hive/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option)
}

w := newWorker(runnable, s.store, s.cache, opts)
if s.workers == nil {
s.workers = map[string]*worker{jobType: w}
} else {
s.workers[jobType] = w

s.workers[jobType] = w

if opts.preWarm {
go func() {
if err := w.start(s.schedule); err != nil {
// should log something here
}
}()
}
}

Expand Down
12 changes: 11 additions & 1 deletion hive/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (w *worker) schedule(job JobReference) {
}

func (w *worker) start(doFunc DoFunc) error {
// this should only be run once per worker
// this should only be run once per worker, unless startup fails the first time
if isStarted := w.started.Load().(bool); isStarted {
return nil
}
Expand Down Expand Up @@ -89,6 +89,12 @@ func (w *worker) start(doFunc DoFunc) error {
break
} else {
if attempts >= w.options.numRetries {
if started == 0 {
// if no threads were able to start, ensure that
// the next job causes another attempt
w.started.Store(false)
}

return fmt.Errorf("attempted to start worker %d times, Runnable returned error each time", w.options.numRetries)
}

Expand Down Expand Up @@ -141,6 +147,8 @@ func (wt *workThread) run(doFunc DoFunc) {
// wait for the next job
jobRef := <-wt.workChan

// TODO: check to see if the workThread pool is sufficient, and attempt to fill it if not

// fetch the full job from storage
job, err := wt.store.Get(jobRef.uuid)
if err != nil {
Expand Down Expand Up @@ -203,6 +211,7 @@ type workerOpts struct {
jobTimeoutSeconds int
numRetries int
retrySecs int
preWarm bool
}

func defaultOpts(jobType string) workerOpts {
Expand All @@ -212,6 +221,7 @@ func defaultOpts(jobType string) workerOpts {
jobTimeoutSeconds: 0,
retrySecs: 3,
numRetries: 5,
preWarm: false,
}

return o
Expand Down