Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Bug: Fix pipeline run complete state machine #4053

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/4053.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
pipelines: Properly mark a pipeline run as complete
```
24 changes: 19 additions & 5 deletions internal/server/boltdbstate/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,11 +1772,25 @@ func (s *State) pipelineComplete(jobId string) error {
if job.State == pb.Job_ERROR {
run.State = pb.PipelineRun_ERROR
} else if job.State == pb.Job_SUCCESS {
// If job Id matches last job queued by pipeline.
// We will have to change this in the future when pipeline steps run in parallel,
// and the last job queued may not be the last job to complete in the pipeline
// TODO:XX figure out how ^
if job.Id == run.Jobs[len(run.Jobs)-1].Id {
// Look at all job ids in a run and check if any are not SUCCESS
runComplete := true
for _, j := range run.Jobs {
if j.Id == job.Id {
briancain marked this conversation as resolved.
Show resolved Hide resolved
continue
}

rj, err := s.JobById(j.Id, nil)
if err != nil {
return err
}

if rj.State != pb.Job_SUCCESS {
runComplete = false
break
}
}

if runComplete {
run.State = pb.PipelineRun_SUCCESS
s.log.Trace("pipeline run is complete", "job", job.Id, "pipeline", job.Pipeline.PipelineId, "run", run.Sequence)
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/server/gen/server.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions pkg/server/gen/server.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -9477,16 +9477,19 @@
"description": "The sequence number for this pipeline run."
},
"pipeline": {
"$ref": "#/definitions/hashicorp.waypoint.Ref.Pipeline"
"$ref": "#/definitions/hashicorp.waypoint.Ref.Pipeline",
"description": "The pipeline associated with this run."
},
"jobs": {
"type": "array",
"items": {
"$ref": "#/definitions/hashicorp.waypoint.Ref.Job"
}
},
"description": "The full list of jobs that are associated with this run."
},
"state": {
"$ref": "#/definitions/hashicorp.waypoint.PipelineRun.State"
"$ref": "#/definitions/hashicorp.waypoint.PipelineRun.State",
"title": "The current state of this pipeline run,"
}
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4870,10 +4870,13 @@ message PipelineRun {
// The sequence number for this pipeline run.
uint64 sequence = 2;

// The pipeline associated with this run.
Ref.Pipeline pipeline = 3;

// The full list of jobs that are associated with this run.
repeated Ref.Job jobs = 4;

// The current state of this pipeline run,
State state = 5;

enum State {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/singleprocess/service_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func (s *Service) buildStepJobs(
}
}

// Include a list of all associated jobs for this specific run
pipelineRun.Jobs = append(pipelineRun.Jobs, &pb.Ref_Job{Id: job.Id})
stepJobs = append(stepJobs, &pb.QueueJobRequest{
Job: job,
Expand Down