Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3506 from hashicorp/f-pipelines
Browse files Browse the repository at this point in the history
feature: Initial Pipelines
  • Loading branch information
briancain authored Jul 29, 2022
2 parents 057d580 + 3f68172 commit 2d06a43
Show file tree
Hide file tree
Showing 150 changed files with 23,692 additions and 13,775 deletions.
5 changes: 5 additions & 0 deletions .changelog/3410.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
```release-note:improvement
cli: Implement `waypoint job get-stream` to allow users to attach to running job
streams and receieve output, or get the output from an existing job stream that
already finished.
```
2 changes: 1 addition & 1 deletion builtin/docker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (p *TaskLauncher) WatchTask(
go func() {
defer close(logsDoneCh)
_, err := stdcopy.StdCopy(outW, errW, logsR)
if err != io.EOF {
if err != nil && err != io.EOF {
log.Warn("error reading container logs", "err", err)
ui.Output("Error reading container logs: %s", err, terminal.WithErrorStyle())
}
Expand Down
32 changes: 28 additions & 4 deletions internal/cli/config_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,44 @@ func (c *ConfigSyncCommand) Run(args []string) int {
}

err := c.DoApp(c.Ctx, func(ctx context.Context, app *clientpkg.App) error {
app.UI.Output("Synchronizing application %q configuration with Waypoint server...",
app.Ref().Application, terminal.WithHeaderStyle())

sg := app.UI.StepGroup()
defer sg.Wait()

step := sg.Add("Synchronizing configuration variables...")
step := sg.Add("Synchronizing configuration variables and pipeline configs...")
defer step.Abort()

_, err := app.ConfigSync(ctx, &pb.Job_ConfigSyncOp{})
jobResult, err := app.ConfigSync(ctx, &pb.Job_ConfigSyncOp{})
if err != nil {
app.UI.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return ErrSentinel
}

step.Update("Configuration variables synchronized successfully!")
step.Update("Configuration variables synchronized!")
step.Done()

if jobResult.PipelineConfigSync != nil && len(jobResult.PipelineConfigSync.SyncedPipelines) > 0 {
step := sg.Add("Configuration for pipelines synchronized!")
step.Done()

// Extra space
app.UI.Output("")
for name, ref := range jobResult.PipelineConfigSync.SyncedPipelines {
pipelineRef, ok := ref.Ref.(*pb.Ref_Pipeline_Owner)
if !ok {
app.UI.Output("failed to convert pipeline ref", terminal.WithErrorStyle())
return ErrSentinel
}

app.UI.Output("✔ Pipeline %q (%s) synchronized!", name, pipelineRef.Owner.Project, terminal.WithInfoStyle())
}
}

step = sg.Add("Application configuration for %q synchronized successfully with Waypoint server!", app.Ref().Application)
step.Done()

return nil
})
if err != nil {
Expand All @@ -63,7 +87,7 @@ func (c *ConfigSyncCommand) AutocompleteFlags() complete.Flags {
}

func (c *ConfigSyncCommand) Synopsis() string {
return "Synchronize declared variables in waypoint.hcl"
return "Synchronize declared variables and pipeline configs in a waypoint.hcl"
}

func (c *ConfigSyncCommand) Help() string {
Expand Down
30 changes: 14 additions & 16 deletions internal/cli/job_get_stream.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package cli

import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint-plugin-sdk/terminal"
"github.com/hashicorp/waypoint/internal/clierrors"
jobstream "github.com/hashicorp/waypoint/internal/jobstream"
"github.com/hashicorp/waypoint/internal/pkg/flag"
pb "github.com/hashicorp/waypoint/pkg/server/gen"
)

type JobGetStreamCommand struct {
Expand All @@ -24,7 +21,6 @@ func (c *JobGetStreamCommand) Run(args []string) int {
); err != nil {
return 1
}
ctx := c.Ctx

var jobId string
if len(c.args) == 0 {
Expand All @@ -34,22 +30,24 @@ func (c *JobGetStreamCommand) Run(args []string) int {
jobId = c.args[0]
}

_, err := c.project.Client().GetJobStream(ctx, &pb.GetJobStreamRequest{
JobId: jobId,
})
if err != nil {
if status.Code(err) == codes.NotFound {
c.ui.Output("Job id not found: %s", clierrors.Humanize(err),
terminal.WithErrorStyle())
return 1
}
sg := c.ui.StepGroup()
defer sg.Wait()

step := sg.Add("Reading job stream (jobId: %s) ...", jobId)
defer step.Abort()

// Ignore the job result for now
_, err := jobstream.Stream(c.Ctx, jobId,
jobstream.WithClient(c.project.Client()),
jobstream.WithUI(c.ui))

if err != nil {
c.ui.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return 1
}

// TODO(briancain): process and print terminal events like `internal/client/job.go`
c.ui.Output("Job stream is not implemented yet!", terminal.WithWarningStyle())
step.Update("Finished reading job stream")
step.Done()

return 0
}
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/job_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (c *JobInspectCommand) Run(args []string) int {
op = "WatchTask"
case *pb.Job_Init:
op = "Init"
case *pb.Job_PipelineStep:
op = "PipelineStep"
default:
op = "Unknown"
}
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/job_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (c *JobListCommand) Run(args []string) int {
op = "WatchTask"
case *pb.Job_Init:
op = "Init"
case *pb.Job_PipelineStep:
op = "PipelineStep"
default:
op = "Unknown"
}
Expand Down
32 changes: 32 additions & 0 deletions internal/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,28 @@ func Commands(
}, nil
},

"pipeline": func() (cli.Command, error) {
return &helpCommand{
SynopsisText: helpText["pipeline"][0],
HelpText: helpText["pipeline"][1],
}, nil
},
"pipeline list": func() (cli.Command, error) {
return &PipelineListCommand{
baseCommand: baseCommand,
}, nil
},
"pipeline inspect": func() (cli.Command, error) {
return &PipelineInspectCommand{
baseCommand: baseCommand,
}, nil
},
"pipeline run": func() (cli.Command, error) {
return &PipelineRunCommand{
baseCommand: baseCommand,
}, nil
},

"project": func() (cli.Command, error) {
return &helpCommand{
SynopsisText: helpText["project"][0],
Expand Down Expand Up @@ -1049,6 +1071,16 @@ Manage and check the status of jobs in Waypoint.
`,
},

"pipeline": {
"Pipeline management",
`
Pipeline config management.
Pipelines are custom defined Waypoint operations that allow you to customize
your application delivery experience.
`,
},

"project": {
"Project management",
`
Expand Down
152 changes: 152 additions & 0 deletions internal/cli/pipeline_inspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package cli

import (
"fmt"

"github.com/golang/protobuf/jsonpb"
"github.com/posener/complete"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint-plugin-sdk/terminal"
"github.com/hashicorp/waypoint/internal/clierrors"
"github.com/hashicorp/waypoint/internal/pkg/flag"
pb "github.com/hashicorp/waypoint/pkg/server/gen"
)

type PipelineInspectCommand struct {
*baseCommand

flagJson bool
flagPipelineId string
}

func (c *PipelineInspectCommand) Run(args []string) int {
// Initialize. If we fail, we just exit since Init handles the UI.
if err := c.Init(
WithArgs(args),
WithFlags(c.Flags()),
); err != nil {
return 1
}

var pipelineId string
if len(c.args) == 0 {
c.ui.Output("Pipeline ID required.\n\n%s", c.Help(), terminal.WithErrorStyle())
return 1
}

pipelineId = c.args[0]

// Pre-calculate our project ref
projectRef := &pb.Ref_Project{Project: c.flagProject}
if c.flagProject == "" {
if c.project != nil {
projectRef = c.project.Ref()
}

if projectRef == nil {
c.ui.Output("You must specify a project with -project or be inside an existing project directory.\n"+c.Help(),
terminal.WithErrorStyle())
return 1
}
}

resp, err := c.project.Client().GetPipeline(c.Ctx, &pb.GetPipelineRequest{
Pipeline: &pb.Ref_Pipeline{
Ref: &pb.Ref_Pipeline_Id{
Id: &pb.Ref_PipelineId{
Id: pipelineId,
},
},
},
})
if err != nil {
if status.Code(err) == codes.NotFound {
c.ui.Output("Pipeline not found: %s", clierrors.Humanize(err),
terminal.WithErrorStyle())
return 1
}

c.ui.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return 1
}
if resp == nil {
c.ui.Output("The requested pipeline response was empty", terminal.WithWarningStyle())
return 0
}

if c.flagJson {
var m jsonpb.Marshaler
m.Indent = "\t"
str, err := m.MarshalToString(resp)
if err != nil {
c.ui.Output(clierrors.Humanize(err), terminal.WithErrorStyle())
return 1
}

fmt.Println(str)
return 0
}

var owner string
switch po := resp.Pipeline.Owner.(type) {
case *pb.Pipeline_Project:
owner = po.Project.Project
default:
owner = "???"
}

c.ui.Output("Pipeline Configuration", terminal.WithHeaderStyle())
c.ui.NamedValues([]terminal.NamedValue{
{
Name: "ID", Value: resp.Pipeline.Id,
},
{
Name: "Name", Value: resp.Pipeline.Name,
},
{
Name: "Owner", Value: owner,
},
{
Name: "Root Step Name", Value: resp.RootStep,
},
}, terminal.WithInfoStyle())

// TODO(briancain): Use graphviz to build a pipeline graph and display in the terminal?

return 0
}

func (c *PipelineInspectCommand) Flags() *flag.Sets {
return c.flagSet(flagSetOperation, func(set *flag.Sets) {
f := set.NewSet("Command Options")
f.BoolVar(&flag.BoolVar{
Name: "json",
Target: &c.flagJson,
Default: false,
Usage: "Output the Pipeline as json.",
})
})
}

func (c *PipelineInspectCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}

func (c *PipelineInspectCommand) AutocompleteFlags() complete.Flags {
return c.Flags().Completions()
}

func (c *PipelineInspectCommand) Synopsis() string {
return "Inspect the full details of a pipeline by id"
}

func (c *PipelineInspectCommand) Help() string {
return formatHelp(`
Usage: waypoint pipeline inspect [options] <pipeline-id>
Inspect the full details of a pipeline by id for a project.
` + c.Flags().Help())
}
Loading

0 comments on commit 2d06a43

Please sign in to comment.