Skip to content

Commit

Permalink
pre/post processors (#581)
Browse files Browse the repository at this point in the history
* pass pre/post processors through to worker's StartServer

* log pre/post processor errors & name
  • Loading branch information
ryancouto authored Feb 28, 2022
1 parent ac45936 commit 5bb3bf6
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 20 deletions.
34 changes: 27 additions & 7 deletions runner/runners/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,27 @@ func NewInvoker(
stat stats.StatsReceiver,
dirMonitor *stats.DirsMonitor,
rID runner.RunnerID,
preprocessors []func() error,
postprocessors []func() error,
) *Invoker {
if stat == nil {
stat = stats.NilStatsReceiver()
}
return &Invoker{exec: exec, filerMap: filerMap, output: output, stat: stat, dirMonitor: dirMonitor, rID: rID}
return &Invoker{exec: exec, filerMap: filerMap, output: output, stat: stat, dirMonitor: dirMonitor, rID: rID, preprocessors: preprocessors, postprocessors: postprocessors}
}

// Invoker Runs a Scoot Command by performing the Scoot setup and gathering.
// (E.g., checking out a Snapshot, or saving the Output once it's done)
// Unlike a full Runner, it has no idea of what else is running or has run.
type Invoker struct {
exec execer.Execer
filerMap runner.RunTypeMap
output runner.OutputCreator
stat stats.StatsReceiver
dirMonitor *stats.DirsMonitor
rID runner.RunnerID
exec execer.Execer
filerMap runner.RunTypeMap
output runner.OutputCreator
stat stats.StatsReceiver
dirMonitor *stats.DirsMonitor
rID runner.RunnerID
preprocessors []func() error
postprocessors []func() error
}

// Run runs cmd
Expand Down Expand Up @@ -100,6 +104,22 @@ func (inv *Invoker) run(cmd *runner.Command, id runner.RunID, abortCh chan struc
var co snapshot.Checkout
checkoutCh := make(chan error)

// set up pre/postprocessors
for _, pp := range inv.preprocessors {
log.Info("running preprocessor")
if err := pp(); err != nil {
log.Errorf("Error running preprocessor %s", err)
}
}
defer func() {
for _, pp := range inv.postprocessors {
log.Infof("running postprocessor")
if err := pp(); err != nil {
log.Errorf("Error running postprocessor %s", err)
}
}
}()

// Determine RunType from Command SnapshotID
// This invoker supports RunTypeScoot and RunTypeBazel
var runType runner.RunType
Expand Down
2 changes: 1 addition & 1 deletion runner/runners/polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func setupPoller() (*execers.SimExecer, *ChaosRunner, runner.Service) {
ex := execers.NewSimExecer()
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}
single := NewSingleRunner(ex, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
single := NewSingleRunner(ex, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
chaos := NewChaosRunner(single)
var nower runner.StatusQueryNower
nower = chaos
Expand Down
8 changes: 6 additions & 2 deletions runner/runners/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewQueueRunner(
stat stats.StatsReceiver,
dirMonitor *stats.DirsMonitor,
rID runner.RunnerID,
preprocessors []func() error,
postprocessors []func() error,
) runner.Service {
if stat == nil {
stat = stats.NilStatsReceiver()
Expand All @@ -96,7 +98,7 @@ func NewQueueRunner(
}

statusManager := NewStatusManager(history)
inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID)
inv := NewInvoker(exec, filerMap, output, stat, dirMonitor, rID, preprocessors, postprocessors)

controller := &QueueController{
statusManager: statusManager,
Expand Down Expand Up @@ -163,8 +165,10 @@ func NewSingleRunner(
stat stats.StatsReceiver,
dirMonitor *stats.DirsMonitor,
rID runner.RunnerID,
preprocessors []func() error,
postprocessors []func() error,
) runner.Service {
return NewQueueRunner(exec, filerMap, output, 0, stat, dirMonitor, rID)
return NewQueueRunner(exec, filerMap, output, 0, stat, dirMonitor, rID, preprocessors, postprocessors)
}

// QueueController maintains a queue of commands to run (up to capacity).
Expand Down
2 changes: 1 addition & 1 deletion runner/runners/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func setup(capacity int, interval time.Duration, t *testing.T) *env {

filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFilerUpdater(updater), IDC: nil}
r := NewQueueRunner(sim, filerMap, outputCreator, capacity, nil, stats.NopDirsMonitor, runner.EmptyID)
r := NewQueueRunner(sim, filerMap, outputCreator, capacity, nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})

return &env{sim: sim, r: r, u: updater, uc: &updateCount}
}
Expand Down
8 changes: 4 additions & 4 deletions runner/runners/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestMemCap(t *testing.T) {
e := os_execer.NewBoundedExecer(execer.Memory(10*1024*1024), stats.NilStatsReceiver())
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil}
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
if _, err := r.Run(cmd); err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestStats(t *testing.T) {
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil}
dirMonitor := stats.NewDirsMonitor([]stats.MonitorDir{{StatSuffix: "cwd", Directory: "./"}})
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, dirMonitor, runner.EmptyID)
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, dirMonitor, runner.EmptyID, []func() error{}, []func() error{})

// Add initial idle time to keep the avg idle time above 50ms
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestTimeout(t *testing.T) {
e := execers.NewSimExecer()
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeNoopFiler(tmp), IDC: nil}
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, stats.NopDirsMonitor, runner.EmptyID)
r := NewSingleRunner(e, filerMap, NewNullOutputCreator(), stat, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
if _, err := r.Run(cmd); err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func newRunner() (runner.Service, *execers.SimExecer) {
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}

r := NewSingleRunner(sim, filerMap, outputCreator, nil, stats.NopDirsMonitor, runner.EmptyID)
r := NewSingleRunner(sim, filerMap, outputCreator, nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
return r, sim
}

Expand Down
4 changes: 2 additions & 2 deletions scheduler/server/stateful_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func Test_StatefulScheduler_TaskGetsMarkedCompletedAfterMaxRetriesFailedRuns(t *
ex.ExecError = errors.New("Test - failed to exec")
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}
return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
}

s := makeStatefulSchedulerDeps(deps)
Expand Down Expand Up @@ -768,7 +768,7 @@ func getDepsWithSimWorker() (*schedulerDeps, []*execers.SimExecer) {
ex := execers.NewSimExecer()
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}
runner := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
runner := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
return runner
},
config: SchedulerConfiguration{
Expand Down
4 changes: 2 additions & 2 deletions scheduler/setup/worker/makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func MakeDoneWorker() runner.Service {
ex := execers.NewDoneExecer()
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}
r := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
r := runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
chaos := runners.NewChaosRunner(r)
chaos.SetDelay(time.Duration(50) * time.Millisecond)
return chaos
Expand All @@ -33,5 +33,5 @@ func MakeSimWorker() runner.Service {
ex := execers.NewSimExecer()
filerMap := runner.MakeRunTypeMap()
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: snapshots.MakeInvalidFiler(), IDC: nil}
return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID)
return runners.NewSingleRunner(ex, filerMap, runners.NewNullOutputCreator(), nil, stats.NopDirsMonitor, runner.EmptyID, []func() error{}, []func() error{})
}
4 changes: 3 additions & 1 deletion worker/starter/start_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func StartServer(
dirMonitor *stats.DirsMonitor,
memCap uint64,
stat *stats.StatsReceiver,
preprocessors []func() error,
postprocessors []func() error,
) {
// create worker object:
// worker support objects
Expand All @@ -72,7 +74,7 @@ func StartServer(
filerMap[runner.RunTypeScoot] = snapshot.FilerAndInitDoneCh{Filer: gitFiler, IDC: db.InitDoneCh}
}
// the worker object
worker := runners.NewSingleRunner(execer, filerMap, oc, *stat, dirMonitor, rID)
worker := runners.NewSingleRunner(execer, filerMap, oc, *stat, dirMonitor, rID, preprocessors, postprocessors)

// add service wrappers
// thrift wrapper
Expand Down
2 changes: 2 additions & 0 deletions worker/workerserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func main() {
stats.NopDirsMonitor,
*memCapFlag,
&stat,
[]func() error{},
[]func() error{},
)
}

Expand Down

0 comments on commit 5bb3bf6

Please sign in to comment.