Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel pipeline a pipeline run #117

Merged
merged 10 commits into from
Oct 15, 2018
73 changes: 69 additions & 4 deletions frontend/client/views/pipeline/detail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,61 @@
<span v-else>{{ props.row.status }}</span>
</td>
<td>{{ calculateDuration(props.row.startdate, props.row.finishdate) }}</td>
<td>
<a v-on:click="stopPipelineModal(pipelineID, props.row.id)"><i class="fa fa-ban" style="color: whitesmoke;"></i></a>
</td>
</template>
<div slot="emptystate" class="empty-table-text">
No pipeline runs found in database.
</div>
</vue-good-table>
</article>

<!-- stop pipeline run modal -->
<modal :visible="showStopPipelineModal" class="modal-z-index" @close="close">
<div class="box stop-pipeline-modal">
<article class="media">
<div class="media-content">
<div class="content">
<p>
<span style="color: white;">Do you really want to cancel this run?</span>
</p>
</div>
<div class="modal-footer">
<div style="float: left;">
<button class="button is-primary" v-on:click="stopPipeline" style="width:150px;">Yes</button>
</div>
<div style="float: right;">
<button class="button is-danger" v-on:click="close" style="width:130px;">No</button>
</div>
</div>
</div>
</article>
</div>
</modal>
</div>

</div>
</div>
</template>

<script>
import Vue from 'vue'
import Vis from 'vis'
import { Modal } from 'vue-bulma-modal'
import VueGoodTable from 'vue-good-table'
import moment from 'moment'

Vue.use(VueGoodTable)

export default {
components: {
Modal
},

data () {
return {
showStopPipelineModal: false,
pipelineID: null,
runID: null,
nodes: null,
Expand All @@ -87,6 +119,9 @@ export default {
},
{
label: 'Duration'
},
{
label: 'Actions'
}
],
runsRows: [],
Expand Down Expand Up @@ -220,6 +255,32 @@ export default {
return this.$http.get('/api/v1/pipelinerun/' + pipelineID + '/' + runID, { showProgressBar: false })
},

stopPipeline () {
this.close()
this.$http
.post('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/stop', { showProgressBar: false })
.then(response => {
if (response.data) {
this.$router.push({path: '/pipeline/detail', query: { pipelineid: this.pipeline.id, runid: response.data.id }})
}
})
.catch((error) => {
this.$store.commit('clearIntervals')
this.$onError(error)
})
},

stopPipelineModal (pipelineID, runID) {
this.pipelineID = pipelineID
this.runID = runID
this.showStopPipelineModal = true
},

close () {
this.showStopPipelineModal = false
this.$emit('close')
},

getPipelineRuns (pipelineID) {
return this.$http.get('/api/v1/pipelinerun/' + pipelineID, { showProgressBar: false })
},
Expand Down Expand Up @@ -399,9 +460,13 @@ export default {

<style lang="scss">

#pipeline-detail {
width: 100%;
height: 400px;
}
#pipeline-detail {
width: 100%;
height: 400px;
}
.stop-pipeline-modal {
text-align: center;
background-color: #2a2735;
}

</style>
3 changes: 3 additions & 0 deletions gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ const (
// RunRunning status
RunRunning PipelineRunStatus = "running"

// RunCancelled status
RunCancelled PipelineRunStatus = "cancelled"

// JobWaitingExec status
JobWaitingExec JobStatus = "waiting for execution"

Expand Down
1 change: 1 addition & 0 deletions handlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func InitHandlers(e *echo.Echo) error {
e.POST(p+"pipeline/githook", GitWebHook)

// PipelineRun
e.POST(p+"pipelinerun/:pipelineid/:runid/stop", PipelineStop)
e.GET(p+"pipelinerun/:pipelineid/:runid", PipelineRunGet)
e.GET(p+"pipelinerun/:pipelineid", PipelineGetAllRuns)
e.GET(p+"pipelinerun/:pipelineid/latest", PipelineGetLatestRun)
Expand Down
42 changes: 41 additions & 1 deletion handlers/pipeline_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/services"
"github.com/gaia-pipeline/gaia/workers/pipeline"
"github.com/labstack/echo"
)

Expand Down Expand Up @@ -51,6 +52,45 @@ func PipelineRunGet(c echo.Context) error {
return c.JSON(http.StatusOK, pipelineRun)
}

// PipelineStop stops a running pipeline.
func PipelineStop(c echo.Context) error {
schedulerService, _ := services.SchedulerService()
// Get parameters and validate
pipelineID := c.Param("pipelineid")
pipelineRunID := c.Param("runid")

// Transform pipelineid to int
p, err := strconv.Atoi(pipelineID)
if err != nil {
return c.String(http.StatusBadRequest, "invalid pipeline id given")
}

// Transform pipelinerunid to int
r, err := strconv.Atoi(pipelineRunID)
if err != nil {
return c.String(http.StatusBadRequest, "invalid pipeline run id given")
}

// Look up pipeline for the given id
var foundPipeline gaia.Pipeline
for pipeline := range pipeline.GlobalActivePipelines.Iter() {
if pipeline.ID == p {
foundPipeline = pipeline
}
}

if foundPipeline.Name != "" {
err = schedulerService.StopPipelineRun(&foundPipeline, r)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
return c.String(http.StatusOK, "pipeline successfully stopped")
}

// Pipeline not found
return c.String(http.StatusNotFound, errPipelineNotFound.Error())
}

// PipelineGetAllRuns returns all runs about the given pipeline.
func PipelineGetAllRuns(c echo.Context) error {
// Convert string to int because id is int
Expand Down Expand Up @@ -119,7 +159,7 @@ func GetJobLogs(c echo.Context) error {
jL := jobLogs{}

// Determine if job has been finished
if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess {
if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess || run.Status == gaia.RunCancelled {
jL.Finished = true
}

Expand Down
3 changes: 2 additions & 1 deletion workers/pipeline/create_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (ms *mockScheduler) Init() error { return nil }
func (ms *mockScheduler) SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error) {
return nil, nil
}
func (ms *mockScheduler) SetPipelineJobs(p *gaia.Pipeline) error { return ms.Error }
func (ms *mockScheduler) SetPipelineJobs(p *gaia.Pipeline) error { return ms.Error }
func (ms *mockScheduler) StopPipelineRun(p *gaia.Pipeline, runid int) error { return ms.Error }

func TestCreatePipelineUnknownType(t *testing.T) {
tmp, _ := ioutil.TempDir("", "TestCreatePipelineUnknownType")
Expand Down
46 changes: 46 additions & 0 deletions workers/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type GaiaScheduler interface {
Init() error
SchedulePipeline(p *gaia.Pipeline, args []gaia.Argument) (*gaia.PipelineRun, error)
SetPipelineJobs(p *gaia.Pipeline) error
StopPipelineRun(p *gaia.Pipeline, runID int) error
}

var _ GaiaScheduler = (*Scheduler)(nil)
Expand Down Expand Up @@ -260,6 +261,31 @@ func (s *Scheduler) schedule() {
}
}

// killedPipelineRun is used to signal the scheduler to abort a pipeline run.
// This has the size one for delayed guarantee of signal delivery.
var killedPipelineRun = make(chan *gaia.PipelineRun, 1)

// StopPipelineRun will prematurely cancel a pipeline run by killing all of its
// jobs and running processes immediately.
func (s *Scheduler) StopPipelineRun(p *gaia.Pipeline, runID int) error {
pr, err := s.storeService.PipelineGetRunByPipelineIDAndID(p.ID, runID)
if err != nil {
return err
}
if pr.Status != gaia.RunRunning {
Skarlso marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("pipeline is not in running state")
}

pr.Status = gaia.RunCancelled
err = s.storeService.PipelinePutRun(pr)
if err != nil {
return err
}
killedPipelineRun <- pr

return nil
}

var schedulerLock = sync.RWMutex{}

// SchedulePipeline schedules a pipeline. We create a new schedule object
Expand Down Expand Up @@ -457,6 +483,8 @@ func (s *Scheduler) executeScheduledJobs(r gaia.PipelineRun, pS Plugin) {

if runFail {
s.finishPipelineRun(&r, gaia.RunFailed)
} else if r.Status == gaia.RunCancelled {
s.finishPipelineRun(&r, gaia.RunCancelled)
} else {
s.finishPipelineRun(&r, gaia.RunSuccess)
}
Expand Down Expand Up @@ -512,6 +540,24 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS Plugin) {
finished := make(chan bool, 1)
for {
select {
case pr, ok := <-killedPipelineRun:
if ok {
if pr.ID == r.ID {
for _, job := range r.Jobs {
if job.Status == gaia.JobRunning || job.Status == gaia.JobWaitingExec {
job.Status = gaia.JobFailed
job.FailPipeline = true
}
}
r.Status = gaia.RunCancelled
s.storeService.PipelinePutRun(r)
close(done)
close(executeScheduler)
finished <- true
finalize = true
return
}
}
case <-finished:
close(pipelineFinished)
return
Expand Down
74 changes: 74 additions & 0 deletions workers/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,80 @@ func TestSetPipelineJobs(t *testing.T) {
}
}

func TestStopPipelineRunFailIfPipelineNotInRunningState(t *testing.T) {
gaia.Cfg = &gaia.Config{}
storeInstance := store.NewBoltStore()
tmp, _ := ioutil.TempDir("", "TestStopPipelineRunFailIfPipelineNotInRunningState")
gaia.Cfg.DataPath = tmp
gaia.Cfg.WorkspacePath = filepath.Join(tmp, "tmp")
gaia.Cfg.Bolt.Mode = 0600
gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: hclog.DefaultOutput,
Name: "Gaia",
})
gaia.Cfg.Worker = "2"
if err := storeInstance.Init(); err != nil {
t.Fatal(err)
}
p, _ := prepareTestData()
storeInstance.PipelinePut(&p)
s := NewScheduler(storeInstance, &PluginFakeFailed{}, &CAFake{}, &VaultFake{})
_, err := s.SchedulePipeline(&p, prepareArgs())
if err != nil {
t.Fatal(err)
}
s.schedule()
r, err := storeInstance.PipelineGetRunByPipelineIDAndID(p.ID, 1)
if err != nil {
t.Fatal(err)
}
if r.Status != gaia.RunScheduled {
t.Fatalf("run has status %s but should be %s\n", r.Status, string(gaia.RunScheduled))
}
err = s.StopPipelineRun(&p, 1)
if err == nil {
t.Fatal("error was nil. should have failed")
}
if err.Error() != "pipeline is not in running state" {
t.Fatal("error was not what was expected 'pipeline is not in running state'. got: ", err.Error())
}
}

func TestStopPipelineRun(t *testing.T) {
gaia.Cfg = &gaia.Config{}
storeInstance := store.NewBoltStore()
tmp, _ := ioutil.TempDir("", "TestStopPipelineRun")
gaia.Cfg.DataPath = tmp
gaia.Cfg.WorkspacePath = filepath.Join(tmp, "tmp")
gaia.Cfg.Bolt.Mode = 0600
gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
Level: hclog.Trace,
Output: hclog.DefaultOutput,
Name: "Gaia",
})
gaia.Cfg.Worker = "2"
if err := storeInstance.Init(); err != nil {
t.Fatal(err)
}
p, r := prepareTestData()
storeInstance.PipelinePut(&p)
s := NewScheduler(storeInstance, &PluginFake{}, &CAFake{}, &VaultFake{})

r.Status = gaia.RunRunning
storeInstance.PipelinePutRun(&r)

run, _ := storeInstance.PipelineGetRunByPipelineIDAndID(p.ID, r.ID)
err := s.StopPipelineRun(&p, run.ID)
if err != nil {
t.Fatal(err)
}
run, _ = storeInstance.PipelineGetRunByPipelineIDAndID(p.ID, r.ID)
if run.Status != gaia.RunCancelled {
t.Fatal("expected pipeline state to be cancelled. got: ", r.Status)
}
}

func prepareArgs() []gaia.Argument {
arg1 := gaia.Argument{
Description: "First Arg",
Expand Down