From 50976ac650476eb501fe64c815a8757897adf9c2 Mon Sep 17 00:00:00 2001 From: Connor Hicks Date: Wed, 20 Jan 2021 20:06:36 -0500 Subject: [PATCH 1/2] add prewarm option when mounting a runnable --- hive/hive_test.go | 34 ++++++++++++++++++++++++++++++++++ hive/opts.go | 9 +++++++++ hive/scheduler.go | 13 +++++++++---- hive/worker.go | 12 +++++++++++- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/hive/hive_test.go b/hive/hive_test.go index 5c1baca9..d597063c 100644 --- a/hive/hive_test.go +++ b/hive/hive_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/suborbital/grav/testutil" ) type generic struct{} @@ -124,3 +125,36 @@ func TestHiveResultThenDo(t *testing.T) { <-wait <-wait } + +type prewarmRunnable struct { + counter *testutil.AsyncCounter +} + +func (p *prewarmRunnable) Run(job Job, ctx *Ctx) (interface{}, error) { + return nil, nil +} + +func (p *prewarmRunnable) OnChange(change ChangeEvent) error { + if change == ChangeTypeStart { + p.counter.Count() + } + + return nil +} + +func TestPreWarmWorker(t *testing.T) { + counter := testutil.NewAsyncCounter(10) + + runnable := &prewarmRunnable{ + counter: counter, + } + + h := New() + h.Handle("prewarm", runnable, PoolSize(3), PreWarm()) + + // checking to see if the prewarmRunnable's OnChange function is called + // without ever sending it a job (see Runnable above) + if err := counter.Wait(3, 1); err != nil { + t.Error(err) + } +} diff --git a/hive/opts.go b/hive/opts.go index a71f3509..ad67ce22 100644 --- a/hive/opts.go +++ b/hive/opts.go @@ -34,3 +34,12 @@ func MaxRetries(count int) Option { return opts } } + +// PreWarm sets the worker to pre-warm itself to minimize cold start time. +// if not enabled, worker will "warm up" when it receives its first job. +func PreWarm() Option { + return func(opts workerOpts) workerOpts { + opts.preWarm = true + return opts + } +} diff --git a/hive/scheduler.go b/hive/scheduler.go index 1a88de78..56184df0 100644 --- a/hive/scheduler.go +++ b/hive/scheduler.go @@ -71,10 +71,15 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option) } w := newWorker(runnable, s.store, s.cache, opts) - if s.workers == nil { - s.workers = map[string]*worker{jobType: w} - } else { - s.workers[jobType] = w + + s.workers[jobType] = w + + if opts.preWarm { + go func() { + if err := w.start(s.schedule); err != nil { + // should log something here + } + }() } } diff --git a/hive/worker.go b/hive/worker.go index 82316353..89bac671 100644 --- a/hive/worker.go +++ b/hive/worker.go @@ -57,7 +57,7 @@ func (w *worker) schedule(job JobReference) { } func (w *worker) start(doFunc DoFunc) error { - // this should only be run once per worker + // this should only be run once per worker, unless startup fails the first time if isStarted := w.started.Load().(bool); isStarted { return nil } @@ -89,6 +89,12 @@ func (w *worker) start(doFunc DoFunc) error { break } else { if attempts >= w.options.numRetries { + if started == 0 { + // if no threads were able to start, ensure that + // the next job causes another attempt + w.started.Store(false) + } + return fmt.Errorf("attempted to start worker %d times, Runnable returned error each time", w.options.numRetries) } @@ -141,6 +147,8 @@ func (wt *workThread) run(doFunc DoFunc) { // wait for the next job jobRef := <-wt.workChan + // TODO: check to see if the workThread pool is sufficient, and attempt to fill it if not + // fetch the full job from storage job, err := wt.store.Get(jobRef.uuid) if err != nil { @@ -203,6 +211,7 @@ type workerOpts struct { jobTimeoutSeconds int numRetries int retrySecs int + preWarm bool } func defaultOpts(jobType string) workerOpts { @@ -212,6 +221,7 @@ func defaultOpts(jobType string) workerOpts { jobTimeoutSeconds: 0, retrySecs: 3, numRetries: 5, + preWarm: false, } return o From 8a2a8dd90280f61bd8917550aa55fe950d0811ed Mon Sep 17 00:00:00 2001 From: Connor Hicks Date: Wed, 20 Jan 2021 20:11:14 -0500 Subject: [PATCH 2/2] update documentation --- README.md | 2 +- docs/getstarted.md | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 21b30af5..a7bab8d6 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ func (g generic) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) { // OnChange is called when Hive starts or stops a worker to handle jobs, // and allows the Runnable to set up before receiving jobs or tear down if needed. -func (g generic) OnChange(change ChangeEvent) error { +func (g generic) OnChange(change hive.ChangeEvent) error { return nil } ``` diff --git a/docs/getstarted.md b/docs/getstarted.md index c6e6aaef..0645623a 100644 --- a/docs/getstarted.md +++ b/docs/getstarted.md @@ -21,7 +21,7 @@ func (r recursive) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) { return fmt.Sprintf("finished %s", job.String()), nil } -func (r recursive) OnChange(change ChangeEvent) error { +func (r recursive) OnChange(change hive.ChangeEvent) error { return nil } ``` @@ -129,6 +129,12 @@ doBad := h.Handle("badRunner", badRunner{}, hive.RetrySeconds(1), hive.MaxRetrie ``` Any error from a failed worker will be returned to the first job that is attempted for that Runnable. +### Pre-warming +When a Runnable is mounted, it is simply registered as available to receive work. The Runnable is not actually invoked until the first job of the given type is received. For basic Runnables, this is normally fine, but for Runnables who use the `OnChange` method to provision resources, this can cause the first job to be slow. The `PreWarm` option is available to allow Runnables to be started as soon as they are mounted, rather than waiting for the first job. This mitigates cold-starts when anything expensive is needed at startup. +``` +doExpensive := h.Handle("expensive", expensiveRunnable{}, hive.PreWarm()) +``` + ### Shortcuts There are also some shortcuts to make working with Hive a bit easier: