From 5bb3bf63d7f62064188316feace4119ddf1a4838 Mon Sep 17 00:00:00 2001 From: Ryan Couto Date: Mon, 28 Feb 2022 11:34:09 -0500 Subject: [PATCH] pre/post processors (#581) * pass pre/post processors through to worker's StartServer * log pre/post processor errors & name --- runner/runners/invoke.go | 34 ++++++++++++++++----- runner/runners/polling_test.go | 2 +- runner/runners/queue.go | 8 +++-- runner/runners/queue_test.go | 2 +- runner/runners/single_test.go | 8 ++--- scheduler/server/stateful_scheduler_test.go | 4 +-- scheduler/setup/worker/makers.go | 4 +-- worker/starter/start_server.go | 4 ++- worker/workerserver/main.go | 2 ++ 9 files changed, 48 insertions(+), 20 deletions(-) diff --git a/runner/runners/invoke.go b/runner/runners/invoke.go index 7fc04c62..0e2e1384 100644 --- a/runner/runners/invoke.go +++ b/runner/runners/invoke.go @@ -35,23 +35,27 @@ func NewInvoker( stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, + preprocessors []func() error, + postprocessors []func() error, ) *Invoker { if stat == nil { stat = stats.NilStatsReceiver() } - return &Invoker{exec: exec, filerMap: filerMap, output: output, stat: stat, dirMonitor: dirMonitor, rID: rID} + return &Invoker{exec: exec, filerMap: filerMap, output: output, stat: stat, dirMonitor: dirMonitor, rID: rID, preprocessors: preprocessors, postprocessors: postprocessors} } // Invoker Runs a Scoot Command by performing the Scoot setup and gathering. // (E.g., checking out a Snapshot, or saving the Output once it's done) // Unlike a full Runner, it has no idea of what else is running or has run. type Invoker struct { - exec execer.Execer - filerMap runner.RunTypeMap - output runner.OutputCreator - stat stats.StatsReceiver - dirMonitor *stats.DirsMonitor - rID runner.RunnerID + exec execer.Execer + filerMap runner.RunTypeMap + output runner.OutputCreator + stat stats.StatsReceiver + dirMonitor *stats.DirsMonitor + rID runner.RunnerID + preprocessors []func() error + postprocessors []func() error } // Run runs cmd @@ -100,6 +104,22 @@ func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struc var co snapshot.Checkout checkoutCh := make(chan error) + // set up pre/postprocessors + for _, pp := range inv.preprocessors { + log.Info("running preprocessor") + if err := pp(); err != nil { + log.Errorf("Error running preprocessor %s", err) + } + } + defer func() { + for _, pp := range inv.postprocessors { + log.Infof("running postprocessor") + if err := pp(); err != nil { + log.Errorf("Error running postprocessor %s", err) + } + } + }() + // Determine RunType from Command SnapshotID // This invoker supports RunTypeScoot and RunTypeBazel var runType runner.RunType diff --git a/runner/runners/polling_test.go b/runner/runners/polling_test.go index e852ad5e..df2114cf 100644 --- a/runner/runners/polling_test.go +++ b/runner/runners/polling_test.go @@ -16,7 +16,7 @@ func setupPoller() (*execers.SimExecer, *ChaosRunner, runner.Service) { ex := execers.NewSimExecer() filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - single := NewSingleRunner(ex, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + single := NewSingleRunner(ex, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) chaos := NewChaosRunner(single) var nower runner.StatusQueryNower nower = chaos diff --git a/runner/runners/queue.go b/runner/runners/queue.go index a238c57f..5fc458b7 100644 --- a/runner/runners/queue.go +++ b/runner/runners/queue.go @@ -82,6 +82,8 @@ func NewQueueRunner( stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, + preprocessors []func() error, + postprocessors []func() error, ) runner.Service { if stat == nil { stat = stats.NilStatsReceiver() @@ -96,7 +98,7 @@ func NewQueueRunner( } statusManager := NewStatusManager(history) - inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID) + inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID, preprocessors, postprocessors) controller := &QueueController{ statusManager: statusManager, @@ -163,8 +165,10 @@ func NewSingleRunner( stat stats.StatsReceiver, dirMonitor *stats.DirsMonitor, rID runner.RunnerID, + preprocessors []func() error, + postprocessors []func() error, ) runner.Service { - return NewQueueRunner(exec, filerMap, output, 0, stat, dirMonitor, rID) + return NewQueueRunner(exec, filerMap, output, 0, stat, dirMonitor, rID, preprocessors, postprocessors) } // QueueController maintains a queue of commands to run (up to capacity). diff --git a/runner/runners/queue_test.go b/runner/runners/queue_test.go index fe2bd386..c97b4de4 100644 --- a/runner/runners/queue_test.go +++ b/runner/runners/queue_test.go @@ -218,7 +218,7 @@ func setup(capacity int, interval time.Duration, t *testing.T) *env { filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFilerUpdater(updater), IDC: nil} - r := NewQueueRunner(sim, filerMap, outputCreator, capacity, nil, stats.NopDirsMonitor, runner.EmptyID) + r := NewQueueRunner(sim, filerMap, outputCreator, capacity, nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) return &env{sim: sim, r: r, u: updater, uc: &updateCount} } diff --git a/runner/runners/single_test.go b/runner/runners/single_test.go index 43e59763..65a28ec8 100644 --- a/runner/runners/single_test.go +++ b/runner/runners/single_test.go @@ -115,7 +115,7 @@ func TestMemCap(t *testing.T) { e := os_execer.NewBoundedExecer(execer.Memory(10*1024*1024), stats.NilStatsReceiver()) filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil} - r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) if _, err := r.Run(cmd); err != nil { t.Fatalf(err.Error()) } @@ -145,7 +145,7 @@ func TestStats(t *testing.T) { filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil} dirMonitor := stats.NewDirsMonitor([]stats.MonitorDir{{StatSuffix: "cwd", Directory: "./"}}) - r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, dirMonitor, runner.EmptyID) + r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, dirMonitor, runner.EmptyID, []func() error{}, []func() error{}) // Add initial idle time to keep the avg idle time above 50ms time.Sleep(50 * time.Millisecond) @@ -201,7 +201,7 @@ func TestTimeout(t *testing.T) { e := execers.NewSimExecer() filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil} - r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, stats.NopDirsMonitor, runner.EmptyID) + r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) if _, err := r.Run(cmd); err != nil { t.Fatalf(err.Error()) } @@ -241,7 +241,7 @@ func newRunner() (runner.Service, *execers.SimExecer) { filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - r := NewSingleRunner(sim, filerMap, outputCreator, nil, stats.NopDirsMonitor, runner.EmptyID) + r := NewSingleRunner(sim, filerMap, outputCreator, nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) return r, sim } diff --git a/scheduler/server/stateful_scheduler_test.go b/scheduler/server/stateful_scheduler_test.go index 48093844..1cd02f13 100644 --- a/scheduler/server/stateful_scheduler_test.go +++ b/scheduler/server/stateful_scheduler_test.go @@ -283,7 +283,7 @@ func Test_StatefulScheduler_TaskGetsMarkedCompletedAfterMaxRetriesFailedRuns(t * ex.ExecError = errors.New("Test - failed to exec") filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) } s := makeStatefulSchedulerDeps(deps) @@ -768,7 +768,7 @@ func getDepsWithSimWorker() (*schedulerDeps, []*execers.SimExecer) { ex := execers.NewSimExecer() filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - runner := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + runner := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) return runner }, config: SchedulerConfiguration{ diff --git a/scheduler/setup/worker/makers.go b/scheduler/setup/worker/makers.go index a529577f..1a67e650 100644 --- a/scheduler/setup/worker/makers.go +++ b/scheduler/setup/worker/makers.go @@ -22,7 +22,7 @@ func MakeDoneWorker() runner.Service { ex := execers.NewDoneExecer() filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - r := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + r := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) chaos := runners.NewChaosRunner(r) chaos.SetDelay(time.Duration(50) * time.Millisecond) return chaos @@ -33,5 +33,5 @@ func MakeSimWorker() runner.Service { ex := execers.NewSimExecer() filerMap := runner.MakeRunTypeMap() filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil} - return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID) + return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{}) } diff --git a/worker/starter/start_server.go b/worker/starter/start_server.go index 0870fe4a..3280932a 100644 --- a/worker/starter/start_server.go +++ b/worker/starter/start_server.go @@ -57,6 +57,8 @@ func StartServer( dirMonitor *stats.DirsMonitor, memCap uint64, stat *stats.StatsReceiver, + preprocessors []func() error, + postprocessors []func() error, ) { // create worker object: // worker support objects @@ -72,7 +74,7 @@ func StartServer( filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: gitFiler, IDC: db.InitDoneCh} } // the worker object - worker := runners.NewSingleRunner(execer, filerMap, oc, *stat, dirMonitor, rID) + worker := runners.NewSingleRunner(execer, filerMap, oc, *stat, dirMonitor, rID, preprocessors, postprocessors) // add service wrappers // thrift wrapper diff --git a/worker/workerserver/main.go b/worker/workerserver/main.go index bcffb8a7..8bf844b6 100644 --- a/worker/workerserver/main.go +++ b/worker/workerserver/main.go @@ -75,6 +75,8 @@ func main() { stats.NopDirsMonitor, *memCapFlag, &stat, + []func() error{}, + []func() error{}, ) }