Skip to content

Commit

Permalink
Implemented most of the scheduling part. Real job execution is still …
Browse files Browse the repository at this point in the history
…missing
  • Loading branch information
michelvocks committed Mar 6, 2018
1 parent 0e83021 commit 735a682
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 85 deletions.
11 changes: 6 additions & 5 deletions cmd/gaia/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
flag.StringVar(&gaia.Cfg.ListenPort, "port", "8080", "Listen port for gaia")
flag.StringVar(&gaia.Cfg.HomePath, "homepath", "", "Path to the gaia home folder")
flag.StringVar(&gaia.Cfg.Bolt.Path, "dbpath", "gaia.db", "Path to gaia bolt db file")
flag.IntVar(&gaia.Cfg.Workers, "workers", 2, "Number of workers gaia will use to execute pipelines in parallel")

// Default values
gaia.Cfg.Bolt.Mode = 0600
Expand Down Expand Up @@ -79,17 +80,17 @@ func main() {
os.Exit(1)
}

// Initialize scheduler
scheduler := scheduler.NewScheduler(store)
scheduler.Init()

// Initialize handlers
err = handlers.InitHandlers(irisInstance, store)
err = handlers.InitHandlers(irisInstance, store, scheduler)
if err != nil {
gaia.Cfg.Logger.Error("cannot initialize handlers", "error", err.Error())
os.Exit(1)
}

// Initialize scheduler
scheduler := scheduler.NewScheduler(store)
scheduler.Init()

// Start ticker. Periodic job to check for new plugins.
pipeline.InitTicker(store, scheduler)

Expand Down
38 changes: 27 additions & 11 deletions gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,31 @@ import (
// PipelineType represents supported plugin types
type PipelineType string

// PipelineRunStatus represents the different status a run
// can have.
type PipelineRunStatus string

const (
// UNKNOWN plugin type
UNKNOWN PipelineType = "unknown"

// GOLANG plugin type
GOLANG PipelineType = "golang"

// RunNotScheduled status
RunNotScheduled PipelineRunStatus = "not scheduled"

// RunScheduled status
RunScheduled PipelineRunStatus = "scheduled"

// RunFailed status
RunFailed PipelineRunStatus = "failed"

// RunSuccess status
RunSuccess PipelineRunStatus = "success"

// RunRunning status
RunRunning PipelineRunStatus = "running"
)

// User is the user object
Expand Down Expand Up @@ -76,19 +95,15 @@ type PrivateKey struct {
Password string `json:"password,omitempty"`
}

// PipelineRunHistory represents the history of pipeline runs
type PipelineRunHistory struct {
ID int `json:"id,omitempty"`
History []PipelineRun `json:"history,omitempty"`
}

// PipelineRun represents a single run of a pipeline.
type PipelineRun struct {
ID int `json:"id"`
RunDate time.Time `json:"rundate,omitempty"`
ScheduleDate time.Time `json:"scheduledate,omitempty"`
Success bool `json:"success"`
Jobs []Job `json:"jobs,omitempty"`
UniqueID string `json:"uniqueid"`
ID int `json:"id"`
PipelineID int `json:"pipelineid"`
RunDate time.Time `json:"rundate,omitempty"`
ScheduleDate time.Time `json:"scheduledate,omitempty"`
Status PipelineRunStatus `json:"status,omitempty"`
Jobs []Job `json:"jobs,omitempty"`
}

// Cfg represents the global config instance
Expand All @@ -100,6 +115,7 @@ type Config struct {
HomePath string
DataPath string
PipelinePath string
Workers int
Logger hclog.Logger

Bolt struct {
Expand Down
203 changes: 163 additions & 40 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package pipeline
import (
"errors"
"os/exec"
"sync"
"time"

"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/plugin"
"github.com/gaia-pipeline/gaia/store"
uuid "github.com/satori/go.uuid"
)

const (
Expand All @@ -23,30 +25,36 @@ var (
// errCreateCMDForPipeline is thrown when we couldnt create a command to start
// a plugin.
errCreateCMDForPipeline = errors.New("could not create execute command for plugin")

// storeService is an instance of store.
// Use this to talk to the store.
storeService *store.Store
)

// Scheduler represents the schuler object
type Scheduler struct {
// buffered channel which is used as queue
pipelines chan gaia.Pipeline
scheduledRuns chan gaia.PipelineRun

// storeService is an instance of store.
// Use this to talk to the store.
storeService *store.Store
}

// NewScheduler creates a new instance of Scheduler.
func NewScheduler(store *store.Store) *Scheduler {
// Create new scheduler
s := &Scheduler{
pipelines: make(chan gaia.Pipeline, schedulerBufferLimit),
scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit),
storeService: store,
}

return s
}

// Init initializes the scheduler.
func (s *Scheduler) Init() {
// Setup workers
for i := 0; i < gaia.Cfg.Workers; i++ {
go s.work()
}

// Create a periodic job that fills the scheduler with new pipelines.
schedulerJob := time.NewTicker(schedulerIntervalSeconds * time.Second)
go func() {
Expand All @@ -60,68 +68,177 @@ func (s *Scheduler) Init() {
}()
}

// work takes work from the scheduled run buffer channel
// and executes the pipeline. Then repeats.
func (s *Scheduler) work() {
// This worker never stops working.
for {
// Take one scheduled run
r := <-s.scheduledRuns

// Mark the scheduled run as running
r.Status = gaia.RunRunning

// Update entry in store
err := s.storeService.PipelinePutRun(&r)
if err != nil {
gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error())
continue
}

// Get related pipeline from pipeline run
pipeline, err := s.storeService.PipelineGet(r.PipelineID)
if err != nil {
gaia.Cfg.Logger.Debug("cannot access pipeline during execution", "error", err.Error())
continue
} else if pipeline == nil {
gaia.Cfg.Logger.Debug("wanted to execute pipeline which does not exist", "run", r)
continue
}

// Start pipeline run process
s.executePipeline(pipeline, &r)
}
}

// schedule looks in the store for new work to do and schedules it.
func (s *Scheduler) schedule() {
// Do we have space left in our buffer?
if len(s.pipelines) >= schedulerBufferLimit {
if len(s.scheduledRuns) >= schedulerBufferLimit {
// No space left. Exit.
gaia.Cfg.Logger.Debug("scheduler buffer overflow. Cannot schedule new pipelines...")
return
}

// TODO: Implement schedule
// Get scheduled pipelines but limit the returning number of elements.
scheduled, err := s.storeService.PipelineGetScheduled(schedulerBufferLimit)
if err != nil {
gaia.Cfg.Logger.Debug("cannot get scheduled pipelines", "error", err.Error())
return
}

// Iterate scheduled runs
for _, run := range scheduled {
// push scheduled run into our channel
s.scheduledRuns <- (*run)

// Mark them as scheduled
run.Status = gaia.RunScheduled

// Update entry in store
err = s.storeService.PipelinePutRun(run)
if err != nil {
gaia.Cfg.Logger.Debug("could not put pipeline run into store", "error", err.Error())
}
}
}

// SchedulePipeline schedules a pipeline. That means we create a new schedule object
// SchedulePipeline schedules a pipeline. We create a new schedule object
// and save it in our store. The scheduler will later pick up this schedule object
// and will continue the work.
func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) error {
// Load the run history of the pipeline
history, err := storeService.PipelineGetRunHistory(p)
// Get highest public id used for this pipeline
highestID, err := s.storeService.PipelineGetRunHighestID(p)
if err != nil {
gaia.Cfg.Logger.Error("cannot access pipeline run history bucket", "error", err.Error())
gaia.Cfg.Logger.Error("cannot find highest pipeline run id", "error", err.Error())
return err
}

// Check if history is empty
if history == nil {
// Create new history object
history = &gaia.PipelineRunHistory{
ID: p.ID,
History: []gaia.PipelineRun{},
}
}

// Find the highest id and increment by one
var highestID int
for _, run := range history.History {
if run.ID > highestID {
highestID = run.ID
}
}
// increment by one
highestID++

// Create new scheduled pipeline run
// Create new not scheduled pipeline run
run := gaia.PipelineRun{
UniqueID: uuid.Must(uuid.NewV4()).String(),
ID: highestID,
ScheduleDate: time.Now(),
Status: gaia.RunNotScheduled,
}

// Add scheduled pipeline run to history
history.History = append(history.History, run)
// Put run into store
return s.storeService.PipelinePutRun(&run)
}

// executePipeline executes the given pipeline and updates it status periodically.
func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) {
// Set pessimistic values
r.Status = gaia.RunFailed
r.RunDate = time.Now()

// Get all jobs
var err error
r.Jobs, err = s.getPipelineJobs(p)
if err != nil {
gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())

// Update store
s.storeService.PipelinePutRun(r)
return
}

// Put history into store
return storeService.PipelinePutRunHistory(history)
}

// SetPipelineJobs uses the plugin system to get all jobs from the given pipeline.
// This function is blocking and might take some time.
func (s *Scheduler) SetPipelineJobs(p *gaia.Pipeline) error {
func executeJob(job *gaia.Job, wg *sync.WaitGroup) {
// TODO
wg.Done()
}

func executeJobs(jobs []*gaia.Job) {
// We finished all jobs, exit recursive execution.
if len(jobs) == 0 {
return
}

// Find the job with the lowest priority
var lowestPrio int32
for id, job := range jobs {
if job.Priority < lowestPrio || id == 0 {
lowestPrio = job.Priority
}
}

// We allocate a new slice for jobs with higher priority.
// And also a slice for jobs which we execute now.
var nextJobs []*gaia.Job
var execJobs []*gaia.Job

// We might have multiple jobs with the same priority.
// It means these jobs should be started in parallel.
var wg sync.WaitGroup
for _, job := range jobs {
if job.Priority == lowestPrio {
// Increase wait group by one
wg.Add(1)
execJobs = append(execJobs, job)

// Execute this job in a separate goroutine
go executeJob(job, &wg)
} else {
// We add this job to the next list
nextJobs = append(nextJobs, job)
}
}

// Wait until all jobs has been finished
wg.Wait()

// Check if a job has been failed. If so, stop execution.
for _, job := range execJobs {
if !job.Success {
return
}
}

// Run executeJobs again until all jobs have been executed
executeJobs(nextJobs)
}

// getPipelineJobs uses the plugin system to get all jobs from the given pipeline.
func (s *Scheduler) getPipelineJobs(p *gaia.Pipeline) ([]gaia.Job, error) {
// Create the start command for the pipeline
c := createPipelineCmd(p)
if c == nil {
gaia.Cfg.Logger.Debug("cannot set pipeline jobs", "error", errCreateCMDForPipeline.Error(), "pipeline", p)
return errCreateCMDForPipeline
return nil, errCreateCMDForPipeline
}

// Create new plugin instance
Expand All @@ -130,12 +247,18 @@ func (s *Scheduler) SetPipelineJobs(p *gaia.Pipeline) error {
// Connect to plugin(pipeline)
if err := pC.Connect(); err != nil {
gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p)
return err
return nil, err
}
defer pC.Close()

return pC.GetJobs()
}

// SetPipelineJobs uses the plugin system to get all jobs from the given pipeline.
// This function is blocking and might take some time.
func (s *Scheduler) SetPipelineJobs(p *gaia.Pipeline) error {
// Get jobs
jobs, err := pC.GetJobs()
jobs, err := s.getPipelineJobs(p)
if err != nil {
gaia.Cfg.Logger.Debug("cannot get jobs from pipeline", "error", err.Error(), "pipeline", p)
return err
Expand Down
Loading

0 comments on commit 735a682

Please sign in to comment.