Skip to content

Commit

Permalink
Fixed data race in schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
michelvocks committed Jul 17, 2018
1 parent ab48faa commit f7b139e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 66 deletions.
70 changes: 43 additions & 27 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,18 @@ func (s *Scheduler) work() {
r := <-s.scheduledRuns

// Prepare execution and start it
s.prepareAndExec(&r)
s.prepareAndExec(r)
}
}

// prepareAndExec does the real preparation and start the execution.
func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) {
func (s *Scheduler) prepareAndExec(r gaia.PipelineRun) {
// Mark the scheduled run as running
r.Status = gaia.RunRunning
r.StartDate = time.Now()

// Update entry in store
err := s.storeService.PipelinePutRun(r)
err := s.storeService.PipelinePutRun(&r)
if err != nil {
gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error())
return
Expand All @@ -138,14 +138,14 @@ func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) {

// Update store
r.Status = gaia.RunFailed
s.storeService.PipelinePutRun(r)
s.storeService.PipelinePutRun(&r)
return
}

// Check if this pipeline has jobs declared
if len(r.Jobs) == 0 {
// Finish pipeline run
s.finishPipelineRun(r, gaia.RunSuccess)
s.finishPipelineRun(&r, gaia.RunSuccess)
return
}

Expand All @@ -160,7 +160,7 @@ func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) {
c := createPipelineCmd(pipeline)
if c == nil {
gaia.Cfg.Logger.Debug("cannot create pipeline start command", "error", errCreateCMDForPipeline.Error())
s.finishPipelineRun(r, gaia.RunFailed)
s.finishPipelineRun(&r, gaia.RunFailed)
return
}

Expand All @@ -171,14 +171,14 @@ func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) {
path = filepath.Join(path, gaia.LogsFileName)
if err := pS.Connect(c, &path); err != nil {
gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", pipeline)
s.finishPipelineRun(r, gaia.RunFailed)
s.finishPipelineRun(&r, gaia.RunFailed)
return
}
defer pS.Close()

// Schedule jobs and execute them.
// Also update the run in the store.
s.scheduleJobsByPriority(r, pipeline, pS)
s.scheduleJobsByPriority(r, pS)
}

// schedule looks in the store for new work and schedules it.
Expand Down Expand Up @@ -249,21 +249,22 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error

// executeJob executes a single job.
// This method is blocking.
func executeJob(job *gaia.Job, pS Plugin, wg *sync.WaitGroup, triggerSave chan bool) {
func executeJob(job gaia.Job, pS Plugin, wg *sync.WaitGroup, triggerSave chan gaia.Job) {
defer wg.Done()
defer func() {
triggerSave <- true
triggerSave <- job
}()

// Set Job to running and trigger save
job.Status = gaia.JobRunning
triggerSave <- true
triggerSave <- job

// Execute job
if err := pS.Execute(job); err != nil {
if err := pS.Execute(&job); err != nil {
// TODO: Show it to user
gaia.Cfg.Logger.Debug("error during job execution", "error", err.Error(), "job", job)
job.Status = gaia.JobFailed
return
}

// If we are here, the job execution was ok
Expand All @@ -273,7 +274,7 @@ func executeJob(job *gaia.Job, pS Plugin, wg *sync.WaitGroup, triggerSave chan b
// scheduleJobsByPriority schedules the given jobs by their respective
// priority. This method is designed to be recursive and blocking.
// If jobs have the same priority, they will be executed in parallel.
func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline, pS Plugin) {
func (s *Scheduler) scheduleJobsByPriority(r gaia.PipelineRun, pS Plugin) {
// Do a prescheduling and set it to the first waiting job
var lowestPrio int64
for _, job := range r.Jobs {
Expand All @@ -293,31 +294,54 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
// We might have multiple jobs with the same priority.
// It means these jobs should be started in parallel.
var wg sync.WaitGroup
triggerSave := make(chan bool)
triggerSave := make(chan gaia.Job)
done := make(chan bool)
for id, job := range r.Jobs {
if job.Priority == lowestPrio && job.Status == gaia.JobWaitingExec {
// Increase wait group by one
wg.Add(1)

// Execute this job in a separate goroutine
go executeJob(&r.Jobs[id], pS, &wg, triggerSave)
go executeJob(r.Jobs[id], pS, &wg, triggerSave)
}
}

// Create channel for storing job run results and spawn results routine
go s.getJobResultsAndStore(triggerSave, r)
go func() {
for {
j, open := <-triggerSave

// Channel has been closed
if !open {
done <- true
return
}

// Filter out the job
for id, job := range r.Jobs {
if job.ID == j.ID {
r.Jobs[id].Status = j.Status
break
}
}

// Store update
s.storeService.PipelinePutRun(&r)
}
}()

// Wait until all jobs have been finished and close results channel
wg.Wait()
close(triggerSave)
<-done

// Check if a job has been failed. If so, stop execution.
// We also check if all jobs has been executed.
var notExecJob bool
for _, job := range r.Jobs {
switch job.Status {
case gaia.JobFailed:
s.finishPipelineRun(r, gaia.RunFailed)
s.finishPipelineRun(&r, gaia.RunFailed)
return
case gaia.JobWaitingExec:
notExecJob = true
Expand All @@ -326,20 +350,12 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline

// All jobs have been executed
if !notExecJob {
s.finishPipelineRun(r, gaia.RunSuccess)
s.finishPipelineRun(&r, gaia.RunSuccess)
return
}

// Run scheduleJobsByPriority again until all jobs have been executed
s.scheduleJobsByPriority(r, p, pS)
}

// getJobResultsAndStore
func (s *Scheduler) getJobResultsAndStore(triggerSave chan bool, r *gaia.PipelineRun) {
for range triggerSave {
// Store update
s.storeService.PipelinePutRun(r)
}
s.scheduleJobsByPriority(r, pS)
}

// getPipelineJobs uses the plugin system to get all jobs from the given pipeline.
Expand Down
66 changes: 27 additions & 39 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@ import (
uuid "github.com/satori/go.uuid"
)

type PluginFake struct {
// Fake struct
jobs []gaia.Job
}

var pluginFake *PluginFake
type PluginFake struct{}

func (p *PluginFake) NewPlugin() Plugin {
return &PluginFake{}
}
func (p *PluginFake) NewPlugin() Plugin { return &PluginFake{} }
func (p *PluginFake) Connect(cmd *exec.Cmd, logPath *string) error { return nil }
func (p *PluginFake) Execute(j *gaia.Job) error { return nil }
func (p *PluginFake) GetJobs() ([]gaia.Job, error) { return pluginFake.jobs, nil }
func (p *PluginFake) GetJobs() ([]gaia.Job, error) { return prepareJobs(), nil }
func (p *PluginFake) Close() {}

func TestInit(t *testing.T) {
Expand All @@ -44,8 +37,7 @@ func TestInit(t *testing.T) {
if err := storeInstance.Init(); err != nil {
t.Fatal(err)
}
pluginFake = &PluginFake{}
s := NewScheduler(storeInstance, pluginFake)
s := NewScheduler(storeInstance, &PluginFake{})
err := s.Init()
if err != nil {
t.Fatal(err)
Expand All @@ -72,10 +64,8 @@ func TestPrepareAndExec(t *testing.T) {
t.Fatal(err)
}
p, r := prepareTestData()
storeInstance.PipelinePut(p)
pluginFake = &PluginFake{}
pluginFake.jobs = p.Jobs
s := NewScheduler(storeInstance, pluginFake)
storeInstance.PipelinePut(&p)
s := NewScheduler(storeInstance, &PluginFake{})
s.prepareAndExec(r)

// Iterate jobs
Expand Down Expand Up @@ -108,15 +98,13 @@ func TestSchedulePipeline(t *testing.T) {
t.Fatal(err)
}
p, _ := prepareTestData()
storeInstance.PipelinePut(p)
pluginFake = &PluginFake{}
pluginFake.jobs = p.Jobs
s := NewScheduler(storeInstance, pluginFake)
storeInstance.PipelinePut(&p)
s := NewScheduler(storeInstance, &PluginFake{})
err := s.Init()
if err != nil {
t.Fatal(err)
}
_, err = s.SchedulePipeline(p)
_, err = s.SchedulePipeline(&p)
if err != nil {
t.Fatal(err)
}
Expand All @@ -143,15 +131,13 @@ func TestSchedule(t *testing.T) {
t.Fatal(err)
}
p, _ := prepareTestData()
storeInstance.PipelinePut(p)
pluginFake = &PluginFake{}
pluginFake.jobs = p.Jobs
s := NewScheduler(storeInstance, pluginFake)
storeInstance.PipelinePut(&p)
s := NewScheduler(storeInstance, &PluginFake{})
err := s.Init()
if err != nil {
t.Fatal(err)
}
_, err = s.SchedulePipeline(p)
_, err = s.SchedulePipeline(&p)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -188,11 +174,9 @@ func TestSetPipelineJobs(t *testing.T) {
t.Fatal(err)
}
p, _ := prepareTestData()
pluginFake = &PluginFake{}
pluginFake.jobs = p.Jobs
p.Jobs = nil
s := NewScheduler(storeInstance, pluginFake)
err := s.SetPipelineJobs(p)
s := NewScheduler(storeInstance, &PluginFake{})
err := s.SetPipelineJobs(&p)
if err != nil {
t.Fatal(err)
}
Expand All @@ -205,7 +189,7 @@ func TestSetPipelineJobs(t *testing.T) {
}
}

func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun) {
func prepareJobs() []gaia.Job {
job1 := gaia.Job{
ID: hash("Job1"),
Title: "Job1",
Expand All @@ -231,18 +215,22 @@ func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun)
Status: gaia.JobWaitingExec,
}

pipeline = &gaia.Pipeline{
return []gaia.Job{
job1,
job2,
job3,
job4,
}
}

func prepareTestData() (pipeline gaia.Pipeline, pipelineRun gaia.PipelineRun) {
pipeline = gaia.Pipeline{
ID: 1,
Name: "Test Pipeline",
Type: gaia.PTypeGolang,
Jobs: []gaia.Job{
job1,
job2,
job3,
job4,
},
Jobs: prepareJobs(),
}
pipelineRun = &gaia.PipelineRun{
pipelineRun = gaia.PipelineRun{
ID: 1,
PipelineID: 1,
Status: gaia.RunNotScheduled,
Expand Down

0 comments on commit f7b139e

Please sign in to comment.