Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Add "WatchTask" operation to watch ODR launch tasks and expose more debug info #3306

Merged
merged 13 commits into from
May 2, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/3306.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:improvement
core: on-demand runner logs are now captured from the underlying platform
and stored in the job system.
```
16 changes: 16 additions & 0 deletions builtin/aws/ecs/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/waypoint/builtin/aws/utils"
"github.com/oklog/ulid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint-plugin-sdk/component"
"github.com/hashicorp/waypoint-plugin-sdk/docs"
Expand All @@ -34,6 +36,11 @@ func (p *TaskLauncher) StopTaskFunc() interface{} {
return p.StopTask
}

// WatchTaskFunc implements component.TaskLauncher
func (p *TaskLauncher) WatchTaskFunc() interface{} {
return p.WatchTask
}

// TaskLauncherConfig is the configuration structure for the task plugin. At
// this time all these are simply copied from what the Waypoint Server
// installation is using, with the only exception being the TaskRoleName.
Expand Down Expand Up @@ -213,6 +220,15 @@ func (p *TaskLauncher) StopTask(
return nil
}

// WatchTask implements TaskLauncher
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ti *TaskInfo,
) (*component.TaskResult, error) {
return nil, status.Errorf(codes.Unimplemented, "WatchTask not implemented")
}

// StartTask runs an ECS Task to perform the requested job.
func (p *TaskLauncher) StartTask(
ctx context.Context,
Expand Down
88 changes: 88 additions & 0 deletions builtin/docker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"crypto/rand"
"fmt"
"io"
"strings"
"time"

"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
Expand All @@ -14,6 +16,7 @@ import (
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stdcopy"
goUnits "github.com/docker/go-units"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/waypoint-plugin-sdk/component"
Expand All @@ -22,6 +25,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint-plugin-sdk/terminal"
wpdockerclient "github.com/hashicorp/waypoint/builtin/docker/client"
)

Expand All @@ -40,6 +44,11 @@ func (b *TaskLauncher) StopTaskFunc() interface{} {
return b.StopTask
}

// WatchFunc implements component.TaskLauncher
func (b *TaskLauncher) WatchTaskFunc() interface{} {
return b.WatchTask
}

type TaskResources struct {
// How many CPU shares to allocate to each task
CpuShares int64 `hcl:"cpu,optional"`
Expand Down Expand Up @@ -425,3 +434,82 @@ func (b *TaskLauncher) StartTask(

return ti, nil
}

// WatchTask implements TaskLauncher
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ti *TaskInfo,
ui terminal.UI,
) (*component.TaskResult, error) {
cli, err := wpdockerclient.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "unable to create Docker client: %s", err)
}
cli.NegotiateAPIVersion(ctx)

// Accumulate our result on this
var result component.TaskResult

// Grab the logs reader
logsR, err := cli.ContainerLogs(ctx, ti.Id, types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
})
if err != nil {
return nil, err
}

// Get our writers for the UI
outW, errW, err := ui.OutputWriters()
if err != nil {
return nil, err
}

// Start a goroutine to copy our logs. The goroutine will exit on its own
// when EOF or when this RPC ends because the UI will EOF.
logsDoneCh := make(chan struct{})
go func() {
defer close(logsDoneCh)
_, err := stdcopy.StdCopy(outW, errW, logsR)
if err != io.EOF {
log.Warn("error reading container logs", "err", err)
ui.Output("Error reading container logs: %s", err, terminal.WithErrorStyle())
}
}()

// Wait for the container to exit
waitCh, errCh := cli.ContainerWait(ctx, ti.Id, container.WaitConditionNotRunning)
select {
case err := <-errCh:
// Error talking to Docker daemon.
return nil, err

case info := <-waitCh:
result.ExitCode = int(info.StatusCode)

// If we got an error, it is from the process (not Docker)
if err := info.Error; err != nil {
log.Warn("error from container process: %s", err.Message)

// We also write it to the UI so that it is more easily
// seen in UIs.
ui.Output("Error reported by container: %s", err.Message, terminal.WithErrorStyle())
}

// Wait for our logs to end
log.Debug("container exited, waiting for logs to finish", "code", info.StatusCode)
select {
case <-logsDoneCh:
case <-time.After(1 * time.Minute):
// They should never continue for 1 minute after the container
// exited. To avoid hanging a runner process, lets warn and exit.
log.Error("container logs never exited! please look into this")
}
}

return &result, nil
}

var _ component.TaskLauncher = (*TaskLauncher)(nil)
16 changes: 16 additions & 0 deletions builtin/k8s/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (p *TaskLauncher) StopTaskFunc() interface{} {
return p.StopTask
}

// WatchTaskFunc implements component.TaskLauncher
func (p *TaskLauncher) WatchTaskFunc() interface{} {
return p.WatchTask
}

// TaskLauncherConfig is the configuration structure for the task plugin.
type TaskLauncherConfig struct {
// Context specifies the kube context to use.
Expand Down Expand Up @@ -326,3 +331,14 @@ func (p *TaskLauncher) StartTask(
Id: name,
}, nil
}

// WatchTask implements TaskLauncher
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ti *TaskInfo,
) (*component.TaskResult, error) {
return nil, status.Errorf(codes.Unimplemented, "WatchTask not implemented")
}

var _ component.TaskLauncher = (*TaskLauncher)(nil)
18 changes: 18 additions & 0 deletions builtin/nomad/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/api"
"github.com/oklog/ulid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint-plugin-sdk/component"
"github.com/hashicorp/waypoint-plugin-sdk/docs"
Expand All @@ -31,6 +33,11 @@ func (p *TaskLauncher) StopTaskFunc() interface{} {
return p.StopTask
}

// WatchTaskFunc implements component.TaskLauncher.
func (p *TaskLauncher) WatchTaskFunc() interface{} {
return p.WatchTask
}

const (
// Build plugins like pack require a decemt amount of memory to build
// an artifact. This default may seem large, but if we used the default
Expand Down Expand Up @@ -263,3 +270,14 @@ func (p *TaskLauncher) getNomadClient() (*api.Client, error) {
}
return client, nil
}

// WatchTask implements TaskLauncher
func (p *TaskLauncher) WatchTask(
ctx context.Context,
log hclog.Logger,
ti *TaskInfo,
) (*component.TaskResult, error) {
return nil, status.Errorf(codes.Unimplemented, "WatchTask not implemented")
}

var _ component.TaskLauncher = (*TaskLauncher)(nil)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ require (
github.com/evanphx/grpc-gateway v1.16.1-0.20220211183845-48e5be386c15
github.com/hashicorp/go-grpc-net-conn v0.0.0-20220321172933-7ab38178cb90
github.com/hashicorp/opaqueany v0.0.0-20220321170339-a5c6ff5bb0ec
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220321195238-4dcd10d01b5f
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220426185440-53cedbd83f2e
github.com/jinzhu/now v1.1.1 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/rs/cors v1.7.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1136,8 +1136,8 @@ github.com/hashicorp/vault/sdk v0.1.14-0.20201202172114-ee5ebeb30fef h1:YKouRHFf
github.com/hashicorp/vault/sdk v0.1.14-0.20201202172114-ee5ebeb30fef/go.mod h1:cAGI4nVnEfAyMeqt9oB+Mase8DNn3qA/LDNHURiwssY=
github.com/hashicorp/waypoint-hzn v0.0.0-20201008221232-97cd4d9120b9 h1:i9hzlv2SpmaNcQ8ZLGn01fp2Gqyejj0juVs7rYDgecE=
github.com/hashicorp/waypoint-hzn v0.0.0-20201008221232-97cd4d9120b9/go.mod h1:ObgQSWSX9rsNofh16kctm6XxLW2QW1Ay6/9ris6T6DU=
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220321195238-4dcd10d01b5f h1:DT7Ng4hnzuMCGNBBoWe5ODo1KSAC9uYv6GBsOAvLyzs=
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220321195238-4dcd10d01b5f/go.mod h1:rogx91d8Lpgj/LC5yHtD7fea1OikuGKiut6QW75wNOA=
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220426185440-53cedbd83f2e h1:HMoxI6OMO16akeEQ2Po+S/knBFUESG2WFp03XWY+z40=
github.com/hashicorp/waypoint-plugin-sdk v0.0.0-20220426185440-53cedbd83f2e/go.mod h1:rogx91d8Lpgj/LC5yHtD7fea1OikuGKiut6QW75wNOA=
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/job_inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (c *JobInspectCommand) Run(args []string) int {
op = "StartTask"
case *pb.Job_StopTask:
op = "StopTask"
case *pb.Job_WatchTask:
op = "WatchTask"
case *pb.Job_Init:
op = "Init"
default:
Expand Down
2 changes: 2 additions & 0 deletions internal/cli/job_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func (c *JobListCommand) Run(args []string) int {
op = "StartTask"
case *pb.Job_StopTask:
op = "StopTask"
case *pb.Job_WatchTask:
op = "WatchTask"
case *pb.Job_Init:
op = "Init"
default:
Expand Down
2 changes: 2 additions & 0 deletions internal/runner/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ func (r *Runner) prepareAndExecuteJob(
return r.executeStartTaskOp(ctx, log, ui, job)
case *pb.Job_StopTask:
return r.executeStopTaskOp(ctx, log, ui, job)
case *pb.Job_WatchTask:
return r.executeWatchTaskOp(ctx, log, ui, job)
}

// We need to get our data source next prior to executing.
Expand Down
110 changes: 110 additions & 0 deletions internal/runner/operation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,113 @@ func (r *Runner) executeStopTaskOp(

return &pb.Job_Result{}, nil
}

func (r *Runner) executeWatchTaskOp(
ctx context.Context,
log hclog.Logger,
ui terminal.UI,
job *pb.Job,
) (*pb.Job_Result, error) {
op, ok := job.Operation.(*pb.Job_WatchTask)
if !ok {
// this shouldn't happen since the call to this function is gated
// on the above type match.
panic("operation not expected type")
}

// Look up the state from a start job.
startId := op.WatchTask.StartJob.Id
log = log.With("start-job-id", startId)
log.Debug("looking up start job to get state")
job, err := r.client.GetJob(ctx, &pb.GetJobRequest{
JobId: startId,
})

// If the job is not found, this is not an error. This means the
// start job never ran for whatever reason and we should not watch
// anything.
if status.Code(err) == codes.NotFound {
log.Warn("start job not found, skipping watch")
return nil, nil
} else if err != nil {
return nil, errors.Wrapf(err, "failed to look up job with id %s", startId)
}

// If the job is not in a terminal state, then its an error.
if job.State != pb.Job_SUCCESS && job.State != pb.Job_ERROR {
return nil, status.Errorf(codes.FailedPrecondition,
"cannot stop task when the start job is not terminal: %q",
mitchellh marked this conversation as resolved.
Show resolved Hide resolved
job.State)
}

// If the job is not a start task launch operation, then error.
startOp, ok := job.Operation.(*pb.Job_StartTask)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition,
"start job ID must reference a job with a StartTask op, got %T",
job.Operation)
}

// If we have no result, do nothing, assume start failed.
if job.Result == nil {
log.Warn("start job has no result, ignoring")
return nil, nil
}

result := job.Result.StartTask
if result == nil || result.State == nil {
log.Warn("start job has no state, ignoring")
return nil, nil
}

// The state we use is the resulting state
state := result.State

// At this point, state should not be nil. There are cases earlier
// where we may exit early with a nil state, but we do not allow a
// nil state here.
if state == nil {
return nil, status.Errorf(codes.FailedPrecondition,
"nil start task state provided")
}

// We copy the launch params from the start task because we should
// be using the same task launcher plugin.
params := startOp.StartTask.Params

// Launch our plugin
pi, c, err := plugin.Open(ctx, log, &plugin.PluginRequest{
Config: plugin.Config{
Name: params.PluginType,
},
Dir: "/tmp",
ConfigData: params.HclConfig,
JsonConfig: params.HclFormat == pb.Hcl_JSON,
Type: component.TaskLauncherType,
})
if err != nil {
return nil, err
}
defer pi.Close()

watch := c.(component.TaskLauncher).WatchTaskFunc()
output, err := pi.Invoke(ctx, log, watch,
plugin.ArgNamedAny("state", state),
ui,
)
if err != nil {
return nil, err
}

taskResult, ok := output.(*component.TaskResult)
if !ok {
return nil, status.Errorf(codes.FailedPrecondition,
"plugin should've returned TaskResult, got %T", output)
}

return &pb.Job_Result{
WatchTask: &pb.Job_WatchTaskResult{
ExitCode: int32(taskResult.ExitCode),
},
}, nil
}
Loading