Skip to content

Commit

Permalink
job/job: Create a scheduler for task runs
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Jung <a.jung@lancs.ac.uk>
  • Loading branch information
nderjung committed Dec 20, 2020
1 parent e9fdaf0 commit 6082ed5
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 7 deletions.
9 changes: 6 additions & 3 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,12 @@ func doRunCmd(cmd *cobra.Command, args []string) {
os.Exit(1)
}

j.Start(&job.RuntimeConfig{
Cpus: cpus,
})
// Start the job with its various tasks
err = j.Start()
if err != nil {
log.Errorf("Could not start job: %s", err)
cleanup()
}

// We're all done now
cleanup()
Expand Down
150 changes: 146 additions & 4 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Run struct {
Devices []string `yaml:"devices"`
Cmd string `yaml:"cmd"`
Path string `yaml:"path"`
exitCode int
}

type Job struct {
Expand Down Expand Up @@ -347,11 +348,152 @@ func (j *Job) tasks() ([]*Task, error) {
return tasks, nil
}

// Start the job
func (j *Job) Start(cfg *RuntimeConfig) error {
log.Info("Starting job...")
// Start the job and all of its tasks
func (j *Job) Start() error {
var freeCores []int
var wg sync.WaitGroup

// Continuously iterate over the wait list and the queue of the task to
// determine whether there is space for the task's run to be scheduled
// on the available list of cores.
for i := 0; j.waitList.Len() > 0; {
// Continiously updates the number of available cores free so this
// particular task's run so we can decide whether to schedule it.
freeCores = tasksInFlight.FreeCores()
if len(freeCores) == 0 {
log.Infof("No free cores, waiting...")
}

// Get the next job from the task's job queue
task, err := j.waitList.Get(i)
if err != nil {
i = 0 // jump back to task 0 in case we overflow
log.Errorf("Could not get task from wait list: %s", err)
continue
}

// Without removing an in-order run from the queue, peak at it so we can
// determine whether it is schedulable based on the number of cores which
// are available.
nextRun, err := task.(*Task).runs.Peak()
if err != nil {
log.Errorf("Could not peak next run for task: %d: %s", i, err)

// Can we schedule this run? Use an else if here so we don't ruin the
// ordering of the iterator `i`
} else if len(freeCores) >= nextRun.(Run).Cores {
// Check if the peaked run is currently active
for _, atr := range tasksInFlight.All() {
if atr != nil {
if atr.Task.UUID() == task.(*Task).UUID() {
goto iterator
}
}
}

log.Infof("Scheduling task run %s-%s...", task.(*Task).UUID(), nextRun.(Run).Name)

// Select some core IDs for this run based on how many it requires
var cores []int
for j := 0; j < nextRun.(Run).Cores; j++ {
cores = append(cores, freeCores[len(freeCores)-1])
freeCores = freeCores[:len(freeCores)-1]
}

// Initialize the task run
activeTaskRun, err := NewActiveTaskRun(task.(*Task), nextRun.(Run), cores)
if err != nil {
log.Errorf("Could not initialize run for this task: %s", err)

// By cancelling all the subsequent runs, the task will be removed from
// scheduler.
task.(*Task).Cancel()
goto iterator
}

// Finally, we can dequeue the run since we are about to schedule it
nextRun, err = task.(*Task).runs.Dequeue()

// Add the active task to the list of utilised cores
j := 1
for len(cores) > 0 {
coreId := cores[len(cores)-j]
err := tasksInFlight.Set(coreId, activeTaskRun)
if err != nil {
log.Warnf("Could not schedule task on core ID %d: %s", coreId, err)

// Use an offset to be able to skip over unavailable cores
if j >= len(cores) {
j = 1
} else {
j = j + 1
}
continue
}

// If we are able to use the core, remove it from the list
cores = cores[:len(cores)-j]
}

// Create a thread where we oversee the runtime of this task's run. By
// starting this run, it will decide how to consume the cores we have
// provided to it.
wg.Add(1) // Update wait group for this thread to complete
go func() {
returnCode, err := activeTaskRun.Start()
if err != nil {
log.Errorf(
"Could not complete run: %s: %s",
activeTaskRun.UUID(),
err,
)

// By cancelling all subsequent runs, the task will be removed from
// scheduler.
task.(*Task).Cancel()
} else if returnCode != 0 {
log.Errorf(
"Could not complete run: %s: exited with return code %d",
activeTaskRun.UUID(),
returnCode,
)

// By cancelling all subsequent runs, the task will be removed from
// scheduler.
task.(*Task).Cancel()
}

// Set the return code


log.Debugf("Run finished: %s", activeTaskRun.UUID())
wg.Done() // We're done here

// Remove utilized cores from this active task's run
for _, coreId := range activeTaskRun.CoreIds {
tasksInFlight.Unset(coreId)
}
}()
}

iterator:
time.Sleep(time.Duration(j.scheduleGrace) * time.Second)

// Remove the task if the queue is empty
if task.(*Task).runs.Len() == 0 {
j.waitList.Remove(i)
i = i - 1
}

// Have we reached the end of the list? Go back to zero otherwise continue.
if j.waitList.Len() == i + 1 {
i = 0
} else {
i = i + 1
}
}

// TODO: Create a matrix from all the parameters
wg.Wait() // Wait for all controller threads for the task's run to finish

return nil
}

0 comments on commit 6082ed5

Please sign in to comment.