Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Feature: Reattach to a pipeline run by latest or sequence id #4042

Merged
merged 14 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/4042.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:improvement
cli: Introduce new CLI flag `-reattach` for `waypoint pipeline run` which will stream
an existing pipeline run either by the latest known run or a specific sequence id.
```
129 changes: 108 additions & 21 deletions internal/cli/pipeline_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
type PipelineRunCommand struct {
*baseCommand

flagPipelineId string
flagPipelineId string
flagReattachRun bool
flagRunSequence int
}

func (c *PipelineRunCommand) Run(args []string) int {
Expand Down Expand Up @@ -49,26 +51,44 @@ func (c *PipelineRunCommand) Run(args []string) int {
terminal.WithWarningStyle())
}

if c.flagRunSequence > 0 && !c.flagReattachRun {
c.ui.Output("The '-run' flag was specified, automatically assuming '-reattach'. "+
"CLI will attempt to reattach to pipeline run %d",
c.flagRunSequence,
terminal.WithWarningStyle())

c.flagReattachRun = true
}

err := c.DoApp(c.Ctx, func(ctx context.Context, app *clientpkg.App) error {
// setup pipeline name to be used for UI printing
pipelineIdent := pipelineName
if c.flagPipelineId != "" {
pipelineIdent = c.flagPipelineId
}

app.UI.Output("Running pipeline %q for application %q",
pipelineIdent, app.Ref().Application, terminal.WithHeaderStyle())
if c.flagReattachRun {
app.UI.Output("Streaming pipeline %q run for application %q",
pipelineIdent, app.Ref().Application, terminal.WithHeaderStyle())
} else {
app.UI.Output("Running pipeline %q for application %q",
pipelineIdent, app.Ref().Application, terminal.WithHeaderStyle())
}

sg := app.UI.StepGroup()
defer sg.Wait()

step := sg.Add("Syncing pipeline configs...")
step := sg.Add("")
defer step.Abort()

_, err := app.ConfigSync(ctx, &pb.Job_ConfigSyncOp{})
if err != nil {
app.UI.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return ErrSentinel
if !c.flagReattachRun {
step.Update("Syncing pipeline configs...")

_, err := app.ConfigSync(ctx, &pb.Job_ConfigSyncOp{})
if err != nil {
app.UI.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return ErrSentinel
}
}

step.Update("Building pipeline execution request...")
Expand Down Expand Up @@ -103,28 +123,75 @@ func (c *PipelineRunCommand) Run(args []string) int {
}
}

step.Update("Requesting to queue run of pipeline %q...", pipelineIdent)
var (
resp *pb.RunPipelineResponse
respGet *pb.GetPipelineRunResponse
allRunJobs []string
steps int
runSeq uint64

// take pipeline id and queue a RunPipeline with a Job Template.
resp, err := c.project.Client().RunPipeline(c.Ctx, runPipelineReq)
if err != nil {
return err
}
err error
)
if !c.flagReattachRun {
step.Update("Requesting to queue run of pipeline %q...", pipelineIdent)

step.Update("Pipeline %q has started running. Attempting to read job stream sequentially in order", pipelineIdent)
step.Done()
// take pipeline id and queue a RunPipeline with a Job Template.
resp, err = c.project.Client().RunPipeline(c.Ctx, runPipelineReq)
if err != nil {
return err
}

step.Update("Pipeline %q has started running. Attempting to read job stream sequentially in order", pipelineIdent)
step.Done()

steps = len(resp.JobMap)
allRunJobs = resp.AllJobIds
runSeq = resp.Sequence
} else {
getPipelineReq := &pb.GetPipelineRequest{
Pipeline: runPipelineReq.Pipeline,
}

if c.flagRunSequence == 0 {
// take pipeline id and queue a RunPipeline with a Job Template.
respGet, err = c.project.Client().GetLatestPipelineRun(c.Ctx, getPipelineReq)
if err != nil {
return err
}
} else {
// take pipeline id and queue a RunPipeline with a Job Template.
respGet, err = c.project.Client().GetPipelineRun(c.Ctx, &pb.GetPipelineRunRequest{
Pipeline: getPipelineReq.Pipeline,
Sequence: uint64(c.flagRunSequence),
})
if err != nil {
return err
}
}
if respGet == nil {
app.UI.Output("Getting a pipeline run returned a nil response", terminal.WithErrorStyle())
return fmt.Errorf("Response was empty when requesting a pipeline run for pipeline %q", pipelineIdent)
}

step.Update("Attempting to read job stream sequentially in order for run %q", respGet.PipelineRun.Sequence)
step.Done()

steps = len(respGet.PipelineRun.Jobs)
for _, j := range respGet.PipelineRun.Jobs {
allRunJobs = append(allRunJobs, j.Id)
}
runSeq = respGet.PipelineRun.Sequence
}
// Receive job ids from running pipeline, use job client to attach to job stream
// and stream here. First pass can be linear job streaming
step = sg.Add("")
defer step.Abort()

steps := len(resp.JobMap)
step.Update("%d steps detected, run sequence %d", steps, resp.Sequence)
step.Update("%d steps detected, run sequence %d", steps, runSeq)
step.Done()

successful := steps
for _, jobId := range resp.AllJobIds {
for _, jobId := range allRunJobs {
job, err := c.project.Client().GetJob(c.Ctx, &pb.GetJobRequest{
JobId: jobId,
})
Expand All @@ -143,7 +210,9 @@ func (c *PipelineRunCommand) Run(args []string) int {
if job.Workspace != nil {
ws = job.Workspace.Workspace
}
app.UI.Output("Executing Step %q in workspace: %q", resp.JobMap[jobId].Step, ws, terminal.WithHeaderStyle())
//app.UI.Output("Executing Step %q in workspace: %q", resp.JobMap[jobId].Step, ws, terminal.WithHeaderStyle())
stepName := job.Pipeline.Step
app.UI.Output("Executing Step %q in workspace: %q", stepName, ws, terminal.WithHeaderStyle())
app.UI.Output("Reading job stream (jobId: %s)...", jobId, terminal.WithInfoStyle())
app.UI.Output("")

Expand Down Expand Up @@ -193,6 +262,20 @@ func (c *PipelineRunCommand) Flags() *flag.Sets {
Default: "",
Usage: "Run a pipeline by ID.",
})

f.BoolVar(&flag.BoolVar{
Name: "reattach",
Target: &c.flagReattachRun,
Default: false,
Usage: "If set, will replay or reattach to an existing pipeline run. If " +
"'-run' is not specified, will attempt to read the latest run.",
})

f.IntVar(&flag.IntVar{
Name: "run",
Target: &c.flagRunSequence,
Usage: "Replay or attach to a specific pipeline run by sequence number.",
})
})
}

Expand All @@ -212,10 +295,14 @@ func (c *PipelineRunCommand) Help() string {
return formatHelp(`
Usage: waypoint pipeline run [options] <pipeline-name>

Run a pipeline by name. If run outside of a project dir, a '-project' flag is
Run a pipeline by name. If run outside of a project dir, a '-project' flag is
required. Before running a requested pipeline, this command will sync
pipeline configs so it runs the most up to date configuration version for a
pipeline.

If '-reattach' is supplied, the CLI will attempt to reattach to an existing
pipeline run. Defaults to latest, but if '-run' is specified, it will attach
to that specific run by sequence number.

` + c.Flags().Help())
}
30 changes: 30 additions & 0 deletions pkg/server/gen/mocks/waypoint_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/server/gen/mocks/waypoint_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading