Skip to content

Commit

Permalink
Fixing the docker argument in case of pipeline start (#294)
Browse files Browse the repository at this point in the history
* Fixing the docker argument in case of pipeline start

* more aggressive cancel

* switching back as it is a data race

* Locking the scheduler
  • Loading branch information
Skarlso authored Nov 29, 2020
1 parent da4aac4 commit 4086339
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 38 deletions.
2 changes: 1 addition & 1 deletion frontend/src/helper/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default {
StartPipeline (context, pipeline) {
// Send start request
context.$http
.post('/api/v1/pipeline/' + pipeline.id + '/start', { docker: pipeline.docker })
.post('/api/v1/pipeline/' + pipeline.id + '/start', [{ key: 'docker', value: this.docker ? '1' : '0' }])
.then(response => {
if (response.data) {
context.$router.push({ path: '/pipeline/detail', query: { pipelineid: pipeline.id, runid: response.data.id } })
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/views/pipeline/create.vue
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ export default {
height: 30px !important;
border: none !important;
margin: auto !important;
padding: auto !important;
padding: inherit !important;
}
.ti-new-tag-input {
Expand Down
4 changes: 3 additions & 1 deletion frontend/src/views/pipeline/params.vue
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ export default {
startPipeline () {
// Add docker option
this.args.push({ docker: this.docker })
if (this.docker) {
this.args.push({ key: 'docker', value: this.docker ? '1' : '0' })
}
// Send start request
this.$http
Expand Down
6 changes: 3 additions & 3 deletions handlers/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func CreateSecret(c echo.Context) error {
key = s.Key
value = s.Value

return upsertSecret(c, key, err, value)
return upsertSecret(c, key, value)
}

// UpdateSecret updates a given secret
Expand All @@ -64,11 +64,11 @@ func UpdateSecret(c echo.Context) error {
}
key = s.Key
value = s.Value
return upsertSecret(c, key, err, value)
return upsertSecret(c, key, value)
}

// updates or creates a secret
func upsertSecret(c echo.Context, key string, err error, value string) error {
func upsertSecret(c echo.Context, key string, value string) error {
// Handle ignored special keys
if stringhelper.IsContainedInSlice(ignoredVaultKeys, key, true) {
return c.String(http.StatusBadRequest, "key is reserved and cannot be set/changed")
Expand Down
19 changes: 5 additions & 14 deletions providers/pipelines/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,12 +557,6 @@ func (pp *PipelineProvider) PipelineTriggerAuth(c echo.Context) error {
func (pp *PipelineProvider) PipelineStart(c echo.Context) error {
pipelineIDStr := c.Param("pipelineid")

// Decode content
content := echo.Map{}
if err := c.Bind(&content); err != nil {
return c.String(http.StatusBadRequest, "invalid content provided in request")
}

// Look for arguments.
// We do not check for errors here cause arguments are optional.
var args []*gaia.Argument
Expand All @@ -573,13 +567,6 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error {
if err != nil {
return c.String(http.StatusBadRequest, errInvalidPipelineID.Error())
}
var docker bool
if _, ok := content["docker"]; ok {
docker, ok = content["docker"].(bool)
if !ok {
return c.String(http.StatusBadRequest, errWrongDockerValue.Error())
}
}

// Look up pipeline for the given id
var foundPipeline gaia.Pipeline
Expand All @@ -591,7 +578,11 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error {
}

// Overwrite docker setting
foundPipeline.Docker = docker
for _, a := range args {
if a.Key == "docker" {
foundPipeline.Docker = a.Value == "1"
}
}

if foundPipeline.Name != "" {
pipelineRun, err := pp.deps.Scheduler.SchedulePipeline(&foundPipeline, gaia.StartReasonManual, args)
Expand Down
42 changes: 24 additions & 18 deletions workers/scheduler/gaiascheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ type Scheduler struct {

// Atomic Counter that represents the current free workers
freeWorkers *int32

// Lock for scheduling
schedulePipelineLock sync.RWMutex
// Lock for scheduling
schedulerLock sync.RWMutex

// killedPipelineRun is used to signal the scheduler to abort a pipeline run.
// This has the size one for delayed guarantee of signal delivery.
killedPipelineRun chan *gaia.PipelineRun
}

// Dependencies defines the dependencies of the scheduler service.
Expand All @@ -108,13 +117,14 @@ type Dependencies struct {
func NewScheduler(deps Dependencies) (*Scheduler, error) {
// Create new scheduler
s := &Scheduler{
scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit),
storeService: deps.Store,
memDBService: deps.DB,
pluginSystem: deps.PS,
ca: deps.CA,
vault: deps.Vault,
freeWorkers: new(int32),
scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit),
storeService: deps.Store,
memDBService: deps.DB,
pluginSystem: deps.PS,
ca: deps.CA,
vault: deps.Vault,
freeWorkers: new(int32),
killedPipelineRun: make(chan *gaia.PipelineRun, 1),
}
return s, nil
}
Expand Down Expand Up @@ -234,6 +244,9 @@ func (s *Scheduler) prepareAndExec(r gaia.PipelineRun) {

// schedule looks in the store for new work and schedules it.
func (s *Scheduler) schedule() {
s.schedulerLock.Lock()
defer s.schedulerLock.Unlock()

// Do we have space left in our buffer?
if s.CountScheduledRuns() >= schedulerBufferLimit {
// No space left. Exit.
Expand Down Expand Up @@ -334,7 +347,6 @@ func (s *Scheduler) schedule() {

// Reset the docker status manipulation
scheduled[id].Docker = true

storeUpdate(scheduled[id], gaia.RunScheduled)
continue
}
Expand All @@ -347,10 +359,6 @@ func (s *Scheduler) schedule() {
}
}

// killedPipelineRun is used to signal the scheduler to abort a pipeline run.
// This has the size one for delayed guarantee of signal delivery.
var killedPipelineRun = make(chan *gaia.PipelineRun, 1)

// StopPipelineRun will prematurely cancel a pipeline run by killing all of its
// jobs and running processes immediately.
func (s *Scheduler) StopPipelineRun(p *gaia.Pipeline, runID int) error {
Expand All @@ -367,13 +375,11 @@ func (s *Scheduler) StopPipelineRun(p *gaia.Pipeline, runID int) error {
if err != nil {
return err
}
killedPipelineRun <- pr
s.killedPipelineRun <- pr

return nil
}

var schedulerLock = sync.RWMutex{}

// SchedulePipeline schedules a pipeline. We create a new schedule object
// and save it in our store. The scheduler will later pick this up and will continue the work.
func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, startedReason string, args []*gaia.Argument) (*gaia.PipelineRun, error) {
Expand All @@ -384,8 +390,8 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, startedReason string, arg
// This means that one of the calls will take slightly longer (a couple of nanoseconds)
// while the other finishes to save the pipelinerun.
// This is to ensure that the highest ID for the next pipeline is calculated properly.
schedulerLock.Lock()
defer schedulerLock.Unlock()
s.schedulePipelineLock.Lock()
defer s.schedulePipelineLock.Unlock()

// Get highest public id used for this pipeline
highestID, err := s.storeService.PipelineGetRunHighestID(p)
Expand Down Expand Up @@ -633,7 +639,7 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) {
finished := make(chan bool, 1)
for {
select {
case pr, ok := <-killedPipelineRun:
case pr, ok := <-s.killedPipelineRun:
if ok {
if pr.ID == r.ID {
for _, job := range r.Jobs {
Expand Down

0 comments on commit 4086339

Please sign in to comment.