diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index dc4151e6a..ab3ac8667 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -3,16 +3,18 @@ package cmd import ( "context" "fmt" + "os" "strings" "time" + dockercli "github.com/docker/docker/client" "github.com/packethost/pkg/log" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/tinkerbell/tink/client" - "github.com/tinkerbell/tink/cmd/tink-worker/internal" + "github.com/tinkerbell/tink/cmd/tink-worker/worker" pb "github.com/tinkerbell/tink/protos/workflow" "google.golang.org/grpc" ) @@ -68,10 +70,33 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { } rClient := pb.NewWorkflowServiceClient(conn) - regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) - worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize) - - err = worker.ProcessWorkflowActions(ctx, workerID, captureActionLogs) + dockerClient, err := dockercli.NewClientWithOpts(dockercli.FromEnv, dockercli.WithAPIVersionNegotiation()) + if err != nil { + return err + } + containerManager := worker.NewContainerManager( + logger, + dockerClient, + worker.RegistryConnDetails{ + Registry: registry, + Username: user, + Password: pwd, + }) + + logCapturer := worker.NewDockerLogCapturer(dockerClient, logger, os.Stdout) + + w := worker.NewWorker( + workerID, + rClient, + containerManager, + logCapturer, + logger, + worker.WithMaxFileSize(maxFileSize), + worker.WithRetries(retryInterval, retries), + worker.WithLogCapture(captureActionLogs), + worker.WithPrivileged(true)) + + err = w.ProcessWorkflowActions(ctx) if err != nil { return errors.Wrap(err, "worker Finished with error") } diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go deleted file mode 100644 index 546930b4d..000000000 --- a/cmd/tink-worker/internal/action.go +++ /dev/null @@ -1,116 +0,0 @@ -package internal - -import ( - "context" - "path" - "path/filepath" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/packethost/pkg/log" - "github.com/pkg/errors" - pb "github.com/tinkerbell/tink/protos/workflow" -) - -const ( - errFailedToWait = "failed to wait for completion of action" - errFailedToRunCmd = "failed to run on-timeout command" - - infoWaitFinished = "wait finished for failed or timeout container" -) - -func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs bool) (string, error) { - registry := w.registry - config := &container.Config{ - Image: path.Join(registry, action.GetImage()), - AttachStdout: true, - AttachStderr: true, - Cmd: cmd, - Tty: true, - Env: action.GetEnvironment(), - } - if !captureLogs { - config.AttachStdout = false - config.AttachStderr = false - config.Tty = false - } - - wfDir := filepath.Join(dataDir, wfID) - hostConfig := &container.HostConfig{ - Privileged: true, - Binds: []string{wfDir + ":/workflow"}, - } - - if pidConfig := action.GetPid(); pidConfig != "" { - w.logger.With("pid", pidConfig).Info("creating container") - hostConfig.PidMode = container.PidMode(pidConfig) - } - - hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) - w.logger.With("command", cmd).Info("creating container") - resp, err := w.registryClient.ContainerCreate(ctx, config, hostConfig, nil, nil, action.GetName()) - if err != nil { - return "", errors.Wrap(err, "DOCKER CREATE") - } - return resp.ID, nil -} - -func startContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { - l.With("containerID", id).Debug("starting container") - return errors.Wrap(cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START") -} - -func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.State, error) { - // Inspect whether the container is in running state - if _, err := cli.ContainerInspect(ctx, id); err != nil { - return pb.State_STATE_FAILED, nil // nolint:nilerr // error is not nil, but it returns nil - } - - // send API call to wait for the container completion - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - if status.StatusCode == 0 { - return pb.State_STATE_SUCCESS, nil - } - return pb.State_STATE_FAILED, nil - case err := <-errC: - return pb.State_STATE_FAILED, err - case <-ctx.Done(): - return pb.State_STATE_TIMEOUT, ctx.Err() - } -} - -func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.State) { - // send API call to wait for the container completion - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - if status.StatusCode == 0 { - failedActionStatus <- pb.State_STATE_SUCCESS - } - failedActionStatus <- pb.State_STATE_FAILED - case err := <-errC: - l.Error(err) - failedActionStatus <- pb.State_STATE_FAILED - case <-ctx.Done(): - l.Error(ctx.Err()) - failedActionStatus <- pb.State_STATE_TIMEOUT - } -} - -func removeContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { - // create options for removing container - opts := types.ContainerRemoveOptions{ - Force: true, - RemoveLinks: false, - RemoveVolumes: true, - } - l.With("containerID", id).Info("removing container") - - // send API call to remove the container - return cli.ContainerRemove(ctx, id, opts) -} diff --git a/cmd/tink-worker/internal/registry.go b/cmd/tink-worker/internal/registry.go deleted file mode 100644 index a85ddc81f..000000000 --- a/cmd/tink-worker/internal/registry.go +++ /dev/null @@ -1,94 +0,0 @@ -package internal - -import ( - "context" - "encoding/base64" - "encoding/json" - "io" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/client" - "github.com/packethost/pkg/log" - "github.com/pkg/errors" -) - -// RegistryConnDetails are the connection details for accessing a Docker -// registry and logging activities. -type RegistryConnDetails struct { - registry, - user, - pwd string - logger log.Logger -} - -// ImagePullStatus is the status of the downloaded Image chunk. -type ImagePullStatus struct { - Status string `json:"status"` - Error string `json:"error"` - Progress string `json:"progress"` - ProgressDetail struct { - Current int `json:"current"` - Total int `json:"total"` - } `json:"progressDetail"` -} - -// NewRegistryConnDetails creates a new RegistryConnDetails. -func NewRegistryConnDetails(registry, user, pwd string, logger log.Logger) *RegistryConnDetails { - return &RegistryConnDetails{ - registry: registry, - user: user, - pwd: pwd, - logger: logger, - } -} - -// NewClient uses the RegistryConnDetails to create a new Docker Client. -func (r *RegistryConnDetails) NewClient() (*client.Client, error) { - if r.registry == "" { - return nil, errors.New("required DOCKER_REGISTRY") - } - c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, errors.Wrap(err, "DOCKER CLIENT") - } - - return c, nil -} - -type imagePuller interface { - ImagePull(context.Context, string, types.ImagePullOptions) (io.ReadCloser, error) -} - -// pullImage outputs to stdout the contents of the requested image (relative to the registry). -func (r *RegistryConnDetails) pullImage(ctx context.Context, cli imagePuller, image string) error { - authConfig := types.AuthConfig{ - Username: r.user, - Password: r.pwd, - ServerAddress: r.registry, - } - encodedJSON, err := json.Marshal(authConfig) - if err != nil { - return errors.Wrap(err, "DOCKER AUTH") - } - authStr := base64.URLEncoding.EncodeToString(encodedJSON) - - out, err := cli.ImagePull(ctx, r.registry+"/"+image, types.ImagePullOptions{RegistryAuth: authStr}) - if err != nil { - return errors.Wrap(err, "DOCKER PULL") - } - defer out.Close() - fd := json.NewDecoder(out) - var status *ImagePullStatus - for { - if err := fd.Decode(&status); err != nil { - if errors.Is(err, io.EOF) { - break - } - return errors.Wrap(err, "DOCKER PULL") - } - if status.Error != "" { - return errors.Wrap(errors.New(status.Error), "DOCKER PULL") - } - } - return nil -} diff --git a/cmd/tink-worker/internal/registry_test.go b/cmd/tink-worker/internal/registry_test.go deleted file mode 100644 index 063facdc9..000000000 --- a/cmd/tink-worker/internal/registry_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package internal - -import ( - "context" - "errors" - "io" - "io/ioutil" - "strings" - "testing" - - "github.com/docker/docker/api/types" - "github.com/packethost/pkg/log" - "github.com/stretchr/testify/assert" -) - -func setupTestLogger(t *testing.T) log.Logger { - t.Helper() - - service := "github.com/tinkerbell/tink" - logger, err := log.Init(service) - if err != nil { - t.Fatal(err) - } - return logger -} - -type imagePullerMock struct { - stringReadCloser io.ReadCloser - imagePullErr error -} - -func (d *imagePullerMock) ImagePull(_ context.Context, _ string, _ types.ImagePullOptions) (io.ReadCloser, error) { - return d.stringReadCloser, d.imagePullErr -} - -func TestPullImageAnyFailure(t *testing.T) { - for _, test := range []struct { - testName string - testString string - testImagePullErr error - testErr error - }{ - { - testName: "success", - testString: "{\"status\": \"hello\",\"error\":\"\"}{\"status\":\"world\",\"error\":\"\"}", - testImagePullErr: nil, - testErr: nil, - }, - { - testName: "fail", - testString: "{\"error\": \"\"}", - testImagePullErr: errors.New("Tested, failure of the image pull"), - testErr: errors.New("DOCKER PULL: Tested, failure of the image pull"), - }, - { - testName: "fail_partial", - testString: "{\"status\": \"hello\",\"error\":\"\"}{\"status\":\"world\",\"error\":\"Tested, failure of No space left on device\"}", - testImagePullErr: nil, - testErr: errors.New("DOCKER PULL: Tested, failure of No space left on device"), - }, - } { - t.Run(test.testName, func(t *testing.T) { - ctx := context.Background() - rcon := NewRegistryConnDetails("test", "testUser", "testPwd", setupTestLogger(t)) - stringReader := strings.NewReader(test.testString) - cli := &imagePullerMock{ - stringReadCloser: ioutil.NopCloser(stringReader), - imagePullErr: test.testImagePullErr, - } - err := rcon.pullImage(ctx, cli, test.testName) - if test.testErr != nil { - assert.Equal(t, err.Error(), test.testErr.Error()) - } else { - assert.Equal(t, err, test.testErr) - } - }) - } -} diff --git a/cmd/tink-worker/worker/container_manager.go b/cmd/tink-worker/worker/container_manager.go new file mode 100644 index 000000000..34191218a --- /dev/null +++ b/cmd/tink-worker/worker/container_manager.go @@ -0,0 +1,143 @@ +package worker + +import ( + "context" + "path" + "path/filepath" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" + "github.com/pkg/errors" + pb "github.com/tinkerbell/tink/protos/workflow" +) + +const ( + ErrFailedToWait = "failed to wait for completion of action" + ErrFailedToRunCmd = "failed to run on-timeout command" + + InfoWaitFinished = "wait finished for failed or timeout container" +) + +// DockerClient is a subset of the interfaces implemented by docker's client.Client. +type DockerClient interface { + client.ImageAPIClient + client.ContainerAPIClient +} + +type containerManager struct { + logger log.Logger + cli DockerClient + registryDetails RegistryConnDetails +} + +// getLogger is a helper function to get logging out of a context, or use the default logger. +func (m *containerManager) getLogger(ctx context.Context) *log.Logger { + loggerIface := ctx.Value(loggingContextKey) + if loggerIface == nil { + return &m.logger + } + return loggerIface.(*log.Logger) +} + +// NewContainerManager returns a new container manager. +func NewContainerManager(logger log.Logger, cli DockerClient, registryDetails RegistryConnDetails) ContainerManager { + return &containerManager{logger, cli, registryDetails} +} + +func (m *containerManager) CreateContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs, privileged bool) (string, error) { + l := m.getLogger(ctx) + config := &container.Config{ + Image: path.Join(m.registryDetails.Registry, action.GetImage()), + AttachStdout: true, + AttachStderr: true, + Cmd: cmd, + Tty: true, + Env: action.GetEnvironment(), + } + if !captureLogs { + config.AttachStdout = false + config.AttachStderr = false + config.Tty = false + } + + wfDir := filepath.Join(defaultDataDir, wfID) + hostConfig := &container.HostConfig{ + Privileged: privileged, + Binds: []string{wfDir + ":/workflow"}, + } + + if pidConfig := action.GetPid(); pidConfig != "" { + hostConfig.PidMode = container.PidMode(pidConfig) + } + + hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) + l.With("command", cmd).Info("creating container") + resp, err := m.cli.ContainerCreate(ctx, config, hostConfig, nil, nil, action.GetName()) + if err != nil { + return "", errors.Wrap(err, "DOCKER CREATE") + } + return resp.ID, nil +} + +func (m *containerManager) StartContainer(ctx context.Context, id string) error { + m.getLogger(ctx).With("containerID", id).Debug("starting container") + return errors.Wrap(m.cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START") +} + +func (m *containerManager) WaitForContainer(ctx context.Context, id string) (pb.State, error) { + // Inspect whether the container is in running state + if _, err := m.cli.ContainerInspect(ctx, id); err != nil { + return pb.State_STATE_FAILED, nil // nolint:nilerr // error is not nil, but it returns nil + } + + // send API call to wait for the container completion + wait, errC := m.cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + if status.StatusCode == 0 { + return pb.State_STATE_SUCCESS, nil + } + return pb.State_STATE_FAILED, nil + case err := <-errC: + return pb.State_STATE_FAILED, err + case <-ctx.Done(): + return pb.State_STATE_TIMEOUT, ctx.Err() + } +} + +func (m *containerManager) WaitForFailedContainer(ctx context.Context, id string, failedActionStatus chan pb.State) { + l := m.getLogger(ctx) + // send API call to wait for the container completion + wait, errC := m.cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + if status.StatusCode == 0 { + failedActionStatus <- pb.State_STATE_SUCCESS + return + } + failedActionStatus <- pb.State_STATE_FAILED + case err := <-errC: + l.Error(err) + failedActionStatus <- pb.State_STATE_FAILED + case <-ctx.Done(): + l.Error(ctx.Err()) + failedActionStatus <- pb.State_STATE_TIMEOUT + } +} + +func (m *containerManager) RemoveContainer(ctx context.Context, id string) error { + // create options for removing container + opts := types.ContainerRemoveOptions{ + Force: true, + RemoveLinks: false, + RemoveVolumes: true, + } + m.getLogger(ctx).With("containerID", id).Info("removing container") + + // send API call to remove the container + return errors.Wrap(m.cli.ContainerRemove(ctx, id, opts), "DOCKER STOP") +} diff --git a/cmd/tink-worker/worker/container_manager_test.go b/cmd/tink-worker/worker/container_manager_test.go new file mode 100644 index 000000000..4cba909e4 --- /dev/null +++ b/cmd/tink-worker/worker/container_manager_test.go @@ -0,0 +1,380 @@ +package worker + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/docker/docker/api/types" + containertypes "github.com/docker/docker/api/types/container" + networktypes "github.com/docker/docker/api/types/network" + "github.com/docker/docker/client" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/packethost/pkg/log" + pb "github.com/tinkerbell/tink/protos/workflow" +) + +type fakeDockerClient struct { + client.ImageAPIClient + client.ContainerAPIClient + + imagePullContent string + containerID string + delay time.Duration + statusCode int + err error + waitErr error +} + +func newFakeDockerClient(containerID, imagePullContent string, delay time.Duration, statusCode int, err, waitErr error) *fakeDockerClient { + return &fakeDockerClient{ + containerID: containerID, + imagePullContent: imagePullContent, + delay: delay, + statusCode: statusCode, + err: err, + waitErr: waitErr, + } +} + +func (c *fakeDockerClient) ContainerCreate( + context.Context, *containertypes.Config, *containertypes.HostConfig, *networktypes.NetworkingConfig, *specs.Platform, string) (containertypes.ContainerCreateCreatedBody, error) { + if c.err != nil { + return containertypes.ContainerCreateCreatedBody{}, c.err + } + + return containertypes.ContainerCreateCreatedBody{ + ID: c.containerID, + }, nil +} + +func (c *fakeDockerClient) ContainerStart(context.Context, string, types.ContainerStartOptions) error { + if c.err != nil { + return c.err + } + return nil +} + +func (c *fakeDockerClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) { + if c.err != nil { + return types.ContainerJSON{}, c.err + } + return types.ContainerJSON{}, nil +} + +func (c *fakeDockerClient) ContainerWait(context.Context, string, containertypes.WaitCondition) (<-chan containertypes.ContainerWaitOKBody, <-chan error) { + respChan := make(chan containertypes.ContainerWaitOKBody) + errChan := make(chan error) + go func(e error) { + time.Sleep(c.delay) + if e != nil { + errChan <- e + return + } + respChan <- containertypes.ContainerWaitOKBody{ + StatusCode: int64(c.statusCode), + } + }(c.waitErr) + return respChan, errChan +} + +func (c *fakeDockerClient) ContainerRemove(context.Context, string, types.ContainerRemoveOptions) error { + if c.err != nil { + return c.err + } + return nil +} + +func TestContainerManagerCreate(t *testing.T) { + cases := []struct { + name string + workflowName string + action *pb.WorkflowAction + containerID string + registry string + clientErr error + wantErr error + }{ + { + name: "Happy Path", + workflowName: "saveTheRebelBase", + action: &pb.WorkflowAction{ + TaskName: "UseTheForce", + Name: "blow up the death star", + Image: "yav.in/4/forestmoon", + Environment: []string{"MODE=insane", ""}, + Volumes: []string{"/tie-fighter/darth_vader:/behind_you"}, + Pid: "1", + }, + containerID: "nomedalforchewie", + registry: "rebelba.se", + }, + { + name: "create failure", + workflowName: "saveTheRebelBase", + action: &pb.WorkflowAction{ + TaskName: "UseTheForce", + Name: "blow up the death star", + Image: "yav.in/4/forestmoon", + Environment: []string{"MODE=insane", ""}, + Volumes: []string{"/tie-fighter/darth_vader:/behind_you"}, + Pid: "1", + }, + containerID: "nomedalforchewie", + registry: "rebelba.se", + clientErr: errors.New("You missed the shot"), + wantErr: errors.New("DOCKER CREATE: You missed the shot"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient(tc.containerID, "", 0, 0, tc.clientErr, nil), RegistryConnDetails{Registry: tc.registry}) + + ctx := context.Background() + got, gotErr := mgr.CreateContainer(ctx, []string{}, tc.workflowName, tc.action, false, true) + if gotErr != nil { + if tc.wantErr == nil { + t.Errorf(`Got unexpected error: %v"`, gotErr) + } else if gotErr.Error() != tc.wantErr.Error() { + t.Errorf(`Got unexpected error: got "%v" wanted "%v"`, gotErr, tc.wantErr) + } + return + } + if gotErr == nil && tc.wantErr != nil { + t.Errorf("Missing expected error: %v", tc.wantErr) + return + } + + if got != tc.containerID { + t.Errorf("Unexpected response: got '%s', expected '%s'", got, tc.containerID) + } + }) + } +} + +func TestContainerManagerStart(t *testing.T) { + cases := []struct { + name string + containerID string + clientErr error + wantErr error + }{ + { + name: "Happy Path", + containerID: "nomedalforchewie", + }, + { + name: "start failure", + containerID: "nomedalforchewie", + clientErr: errors.New("You missed the shot"), + wantErr: errors.New("DOCKER START: You missed the shot"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient(tc.containerID, "", 0, 0, tc.clientErr, nil), RegistryConnDetails{Registry: ""}) + + ctx := context.Background() + gotErr := mgr.StartContainer(ctx, tc.containerID) + if gotErr != nil { + if tc.wantErr == nil { + t.Errorf(`Got unexpected error: %v"`, gotErr) + } else if gotErr.Error() != tc.wantErr.Error() { + t.Errorf(`Got unexpected error: got "%v" wanted "%v"`, gotErr, tc.wantErr) + } + return + } + if gotErr == nil && tc.wantErr != nil { + t.Errorf("Missing expected error: %v", tc.wantErr) + return + } + }) + } +} + +func TestContainerManagerWait(t *testing.T) { + cases := []struct { + name string + containerID string + dockerResponse int + contextTimeout time.Duration + clientErr error + waitErr error + wantState pb.State + wantErr error + }{ + { + name: "Happy Path", + containerID: "nomedalforchewie", + dockerResponse: 0, + wantState: pb.State_STATE_SUCCESS, + }, + { + name: "start failure", + containerID: "chewieDied", + dockerResponse: 1, + wantState: pb.State_STATE_FAILED, + waitErr: nil, + }, + { + name: "client wait failure", + containerID: "nomedalforchewie", + dockerResponse: 1, + wantState: pb.State_STATE_FAILED, + waitErr: errors.New("Vader Won"), + wantErr: errors.New("Vader Won"), + }, + { + name: "client inspect failure", + containerID: "nomedalforchewie", + wantState: pb.State_STATE_FAILED, + clientErr: errors.New("inspect failed"), + wantErr: nil, + }, + { + name: "client timeout", + containerID: "nomedalforchewie", + wantState: pb.State_STATE_TIMEOUT, + contextTimeout: time.Millisecond * 2, + waitErr: errors.New("Vader Won"), + wantErr: errors.New("context deadline exceeded"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient(tc.containerID, "", time.Millisecond*20, tc.dockerResponse, tc.clientErr, tc.waitErr), RegistryConnDetails{Registry: ""}) + ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout) + defer cancel() + if tc.contextTimeout == 0 { + ctx = context.Background() + } + + got, gotErr := mgr.WaitForContainer(ctx, tc.containerID) + if gotErr != nil { + if tc.wantErr == nil { + t.Errorf(`Got unexpected error: %v"`, gotErr) + } else if gotErr.Error() != tc.wantErr.Error() { + t.Errorf(`Got unexpected error: got "%v" wanted "%v"`, gotErr, tc.wantErr) + } + return + } + if gotErr == nil && tc.wantErr != nil { + t.Errorf("Missing expected error: %v", tc.wantErr) + return + } + if got.String() != tc.wantState.String() { + t.Errorf("Unexpected response: got %s wanted %s", got, tc.wantState) + } + }) + } +} + +func TestContainerManagerWaitFailed(t *testing.T) { + cases := []struct { + name string + containerID string + dockerResponse int + contextTimeout time.Duration + waitTime time.Duration + clientErr error + wantState pb.State + }{ + { + name: "Happy Path", + containerID: "nomedalforchewie", + dockerResponse: 0, + waitTime: 0, + wantState: pb.State_STATE_SUCCESS, + }, + { + name: "start failure", + containerID: "chewieDied", + dockerResponse: 1, + wantState: pb.State_STATE_FAILED, + clientErr: nil, + }, + { + name: "client wait failure", + containerID: "nomedalforchewie", + dockerResponse: 1, + wantState: pb.State_STATE_FAILED, + clientErr: errors.New("Vader Won"), + }, + { + name: "client timeout", + containerID: "nomedalforchewie", + wantState: pb.State_STATE_TIMEOUT, + waitTime: time.Millisecond * 20, + contextTimeout: time.Millisecond * 10, + clientErr: errors.New("Vader Won"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient(tc.containerID, "", tc.waitTime, tc.dockerResponse, nil, tc.clientErr), RegistryConnDetails{Registry: ""}) + ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout) + defer cancel() + if tc.contextTimeout == 0 { + ctx = context.Background() + } + failedChan := make(chan pb.State) + go mgr.WaitForFailedContainer(ctx, tc.containerID, failedChan) + got := <-failedChan + + if got.String() != tc.wantState.String() { + t.Errorf("Unexpected response: got %s wanted %s", got, tc.wantState) + } + }) + } +} + +func TestContainerManagerRemove(t *testing.T) { + cases := []struct { + name string + containerID string + clientErr error + wantErr error + }{ + { + name: "Happy Path", + containerID: "nomedalforchewie", + }, + { + name: "start failure", + containerID: "nomedalforchewie", + clientErr: errors.New("You missed the shot"), + wantErr: errors.New("DOCKER STOP: You missed the shot"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient(tc.containerID, "", 0, 0, tc.clientErr, nil), RegistryConnDetails{Registry: ""}) + + ctx := context.Background() + gotErr := mgr.RemoveContainer(ctx, tc.containerID) + if gotErr != nil { + if tc.wantErr == nil { + t.Errorf(`Got unexpected error: %v"`, gotErr) + } else if gotErr.Error() != tc.wantErr.Error() { + t.Errorf(`Got unexpected error: got "%v" wanted "%v"`, gotErr, tc.wantErr) + } + return + } + if gotErr == nil && tc.wantErr != nil { + t.Errorf("Missing expected error: %v", tc.wantErr) + return + } + }) + } +} diff --git a/cmd/tink-worker/worker/log_capturer.go b/cmd/tink-worker/worker/log_capturer.go new file mode 100644 index 000000000..b6a1ba60d --- /dev/null +++ b/cmd/tink-worker/worker/log_capturer.go @@ -0,0 +1,57 @@ +package worker + +import ( + "bufio" + "context" + "fmt" + "io" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" +) + +// DockerLogCapturer is a LogCapturer that can stream docker container logs to an io.Writer. +type DockerLogCapturer struct { + dockerClient client.ContainerAPIClient + logger log.Logger + writer io.Writer +} + +// getLogger is a helper function to get logging out of a context, or use the default logger. +func (l *DockerLogCapturer) getLogger(ctx context.Context) *log.Logger { + loggerIface := ctx.Value(loggingContextKey) + if loggerIface == nil { + return &l.logger + } + return loggerIface.(*log.Logger) +} + +// NewDockerLogCapturer returns a LogCapturer that can stream container logs to a given writer. +func NewDockerLogCapturer(cli client.ContainerAPIClient, logger log.Logger, writer io.Writer) *DockerLogCapturer { + return &DockerLogCapturer{ + dockerClient: cli, + logger: logger, + writer: writer, + } +} + +// CaptureLogs streams container logs to the capturer's writer. +func (l *DockerLogCapturer) CaptureLogs(ctx context.Context, id string) { + reader, err := l.dockerClient.ContainerLogs(ctx, id, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: false, + }) + if err != nil { + l.getLogger(ctx).Error(err, "failed to capture logs for container ", id) + return + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + fmt.Fprintln(l.writer, scanner.Text()) + } +} diff --git a/cmd/tink-worker/worker/log_capturer_test.go b/cmd/tink-worker/worker/log_capturer_test.go new file mode 100644 index 000000000..e305af71f --- /dev/null +++ b/cmd/tink-worker/worker/log_capturer_test.go @@ -0,0 +1,110 @@ +package worker + +import ( + "bytes" + "context" + "io" + "os" + "strings" + "testing" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" + "github.com/pkg/errors" +) + +type fakeDockerLoggerClient struct { + client.ContainerAPIClient + content string + err error +} + +func (c *fakeDockerLoggerClient) ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error) { + if c.err != nil { + return nil, c.err + } + return io.NopCloser(strings.NewReader(c.content)), nil +} + +func newFakeDockerLoggerClient(content string, err error) *fakeDockerLoggerClient { + return &fakeDockerLoggerClient{ + content: content, + err: err, + } +} + +func TestLogCapturer(t *testing.T) { + cases := []struct { + name string + writer bytes.Buffer + wanterr error + content string + }{ + { + name: "Content written to buffer", + writer: *bytes.NewBufferString(""), + wanterr: nil, + content: "Line1\nline2\n", + }, + { + name: "empty buffer from error", + writer: *bytes.NewBufferString(""), + wanterr: errors.New("Docker failure"), + content: "", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + ctx := context.Background() + clogger := NewDockerLogCapturer( + newFakeDockerLoggerClient(tc.content, tc.wanterr), + logger, + &tc.writer) + clogger.CaptureLogs(ctx, tc.name) + got := tc.writer.String() + if got != tc.content { + t.Errorf("Wrong content written to buffer. Expected '%s', got '%s'", tc.content, got) + } + }) + } +} + +func TestLogCapturerContextLogger(t *testing.T) { + cases := []struct { + name string + logger func() *log.Logger + writer bytes.Buffer + }{ + { + name: "no context logger", + logger: nil, + }, + { + name: "with context logger", + logger: func() *log.Logger { + l := log.Test(t, "github.com/tinkerbell/tink/test") + return &l + }, + writer: *bytes.NewBufferString(""), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + ctx := context.Background() + if tc.logger != nil { + ctx = context.WithValue(ctx, loggingContextKey, tc.logger()) + } + clogger := &DockerLogCapturer{ + newFakeDockerLoggerClient("", nil), + logger, + os.Stdout, + } + clogger.getLogger(ctx) + }) + } +} diff --git a/cmd/tink-worker/worker/registry.go b/cmd/tink-worker/worker/registry.go new file mode 100644 index 000000000..9c8e41dc7 --- /dev/null +++ b/cmd/tink-worker/worker/registry.go @@ -0,0 +1,68 @@ +package worker + +import ( + "context" + "encoding/base64" + "encoding/json" + "io" + + "github.com/docker/docker/api/types" + "github.com/pkg/errors" +) + +// RegistryConnDetails are the connection details for accessing a Docker registry. +type RegistryConnDetails struct { + Registry string + Username string + Password string +} + +// ImagePullStatus is the status of the downloaded Image chunk. +type ImagePullStatus struct { + Status string `json:"status"` + Error string `json:"error"` + Progress string `json:"progress"` + ProgressDetail struct { + Current int `json:"current"` + Total int `json:"total"` + } `json:"progressDetail"` +} + +// PullImage outputs to stdout the contents of the requested image (relative to the registry). +func (m *containerManager) PullImage(ctx context.Context, image string) error { + l := m.getLogger(ctx) + authConfig := types.AuthConfig{ + Username: m.registryDetails.Username, + Password: m.registryDetails.Password, + ServerAddress: m.registryDetails.Registry, + } + encodedJSON, err := json.Marshal(authConfig) + if err != nil { + return errors.Wrap(err, "DOCKER AUTH") + } + authStr := base64.URLEncoding.EncodeToString(encodedJSON) + + out, err := m.cli.ImagePull(ctx, m.registryDetails.Registry+"/"+image, types.ImagePullOptions{RegistryAuth: authStr}) + if err != nil { + return errors.Wrap(err, "DOCKER PULL") + } + defer func() { + if err := out.Close(); err != nil { + l.Error(err) + } + }() + fd := json.NewDecoder(out) + var status *ImagePullStatus + for { + if err := fd.Decode(&status); err != nil { + if errors.Is(err, io.EOF) { + break + } + return errors.Wrap(err, "DOCKER PULL") + } + if status.Error != "" { + return errors.Wrap(errors.New(status.Error), "DOCKER PULL") + } + } + return nil +} diff --git a/cmd/tink-worker/worker/registry_test.go b/cmd/tink-worker/worker/registry_test.go new file mode 100644 index 000000000..06efd9534 --- /dev/null +++ b/cmd/tink-worker/worker/registry_test.go @@ -0,0 +1,71 @@ +package worker + +import ( + "context" + "errors" + "io" + "strings" + "testing" + + "github.com/docker/docker/api/types" + "github.com/packethost/pkg/log" +) + +func (c *fakeDockerClient) ImagePull(context.Context, string, types.ImagePullOptions) (io.ReadCloser, error) { + if c.err != nil { + return nil, c.err + } + return io.NopCloser(strings.NewReader(c.imagePullContent)), nil +} + +func TestContainerManagerPullImage(t *testing.T) { + cases := []struct { + name string + image string + responseContent string + registry RegistryConnDetails + clientErr error + wantErr error + }{ + { + name: "Happy Path", + image: "yav.in/4/deathstar:nomedalforchewie", + responseContent: "{}\n{}", + }, + { + name: "malformed JSON", + image: "yav.in/4/deathstar:nomedalforchewie", + responseContent: "{", + clientErr: errors.New("You missed the shot"), + wantErr: errors.New("DOCKER PULL: You missed the shot"), + }, + { + name: "pull error", + image: "yav.in/4/deathstar:nomedalforchewie", + responseContent: `{"error": "You missed the shot"}`, + wantErr: errors.New("DOCKER PULL: You missed the shot"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + logger := log.Test(t, "github.com/tinkerbell/tink") + mgr := NewContainerManager(logger, newFakeDockerClient("", tc.responseContent, 0, 0, tc.clientErr, nil), tc.registry) + + ctx := context.Background() + gotErr := mgr.PullImage(ctx, tc.image) + if gotErr != nil { + if tc.wantErr == nil { + t.Errorf(`Got unexpected error: %v"`, gotErr) + } else if gotErr.Error() != tc.wantErr.Error() { + t.Errorf(`Got unexpected error: got "%v" wanted "%v"`, gotErr, tc.wantErr) + } + return + } + if gotErr == nil && tc.wantErr != nil { + t.Errorf("Missing expected error: %v", tc.wantErr) + return + } + }) + } +} diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/worker/worker.go similarity index 63% rename from cmd/tink-worker/internal/worker.go rename to cmd/tink-worker/worker/worker.go index 3d358eee3..95d8ce7e1 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/worker/worker.go @@ -1,7 +1,6 @@ -package internal +package worker import ( - "bufio" "context" sha "crypto/sha256" "encoding/base64" @@ -14,8 +13,6 @@ import ( "strings" "time" - "github.com/docker/docker/api/types" - "github.com/docker/docker/client" "github.com/packethost/pkg/log" "github.com/pkg/errors" pb "github.com/tinkerbell/tink/protos/workflow" @@ -23,8 +20,8 @@ import ( ) const ( - dataFile = "data" - dataDir = "/worker" + dataFile = "data" + defaultDataDir = "/worker" errGetWfContext = "failed to get workflow context" errGetWfActions = "failed to get actions for workflow" @@ -33,6 +30,10 @@ const ( msgTurn = "it's turn for a different worker: %s" ) +type loggingContext string + +var loggingContextKey loggingContext = "logger" + var ( workflowcontexts = map[string]*pb.WorkflowContext{} workflowDataSHA = map[string]string{} @@ -47,64 +48,124 @@ type WorkflowMetadata struct { SHA string `json:"sha256"` } -// Worker details provide all the context needed to run a. -type Worker struct { - client pb.WorkflowServiceClient - regConn *RegistryConnDetails - registryClient *client.Client - logger log.Logger - registry string - retries int - retryInterval time.Duration - maxSize int64 +// Option is a type for modifying a worker. +type Option func(*Worker) + +// WithRetries adds custom retries to a worker. +func WithRetries(interval time.Duration, retries int) Option { + return func(w *Worker) { + w.retries = retries + w.retryInterval = interval + } } -// NewWorker creates a new Worker, creating a new Docker registry client. -func NewWorker(c pb.WorkflowServiceClient, regConn *RegistryConnDetails, logger log.Logger, registry string, retries int, retryInterval time.Duration, maxFileSize int64) *Worker { - registryClient, err := regConn.NewClient() - if err != nil { - panic(err) - } - return &Worker{ - client: c, - regConn: regConn, - registryClient: registryClient, - logger: logger, - registry: registry, - retries: retries, - retryInterval: retryInterval, - maxSize: maxFileSize, +// WithDataDir changes the default directory for a worker. +func WithDataDir(dir string) Option { + return func(w *Worker) { + w.dataDir = dir } } -func (w *Worker) captureLogs(ctx context.Context, id string) { - reader, err := w.registryClient.ContainerLogs(ctx, id, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: false, - }) - if err != nil { - panic(err) +// WithMaxFileSize changes the max file size for a worker. +func WithMaxFileSize(maxSize int64) Option { + return func(w *Worker) { + w.maxSize = maxSize + } +} + +// WithLogCapture enables capture of container logs. +func WithLogCapture(capture bool) Option { + return func(w *Worker) { + w.captureLogs = capture } - defer reader.Close() +} - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - fmt.Println(scanner.Text()) +// WithPrivileged enables containers to be privileged. +func WithPrivileged(privileged bool) Option { + return func(w *Worker) { + w.createPrivileged = privileged } } +// LogCapturer emits container logs. +type LogCapturer interface { + CaptureLogs(ctx context.Context, containerID string) +} + +// ContainerManager manages linux containers for Tinkerbell workers. +type ContainerManager interface { + CreateContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs, privileged bool) (string, error) + StartContainer(ctx context.Context, id string) error + WaitForContainer(ctx context.Context, id string) (pb.State, error) + WaitForFailedContainer(ctx context.Context, id string, failedActionStatus chan pb.State) + RemoveContainer(ctx context.Context, id string) error + PullImage(ctx context.Context, image string) error +} + +// Worker details provide all the context needed to run workflows. +type Worker struct { + workerID string + logCapturer LogCapturer + containerManager ContainerManager + tinkClient pb.WorkflowServiceClient + logger log.Logger + + dataDir string + maxSize int64 + + createPrivileged bool + captureLogs bool + + retries int + retryInterval time.Duration +} + +// NewWorker creates a new Worker, creating a new Docker registry client. +func NewWorker( + workerID string, + tinkClient pb.WorkflowServiceClient, + containerManager ContainerManager, + logCapturer LogCapturer, + logger log.Logger, + opts ...Option) *Worker { + w := &Worker{ + workerID: workerID, + dataDir: defaultDataDir, + containerManager: containerManager, + logCapturer: logCapturer, + tinkClient: tinkClient, + logger: logger, + captureLogs: false, + createPrivileged: false, + retries: 3, + retryInterval: time.Second * 3, + maxSize: 1 << 20, + } + for _, opt := range opts { + opt(w) + } + + return w +} + +// getLogger is a helper function to get logging out of a context, or use the default logger. +func (w Worker) getLogger(ctx context.Context) *log.Logger { + loggerIface := ctx.Value(loggingContextKey) + if loggerIface == nil { + return &w.logger + } + return loggerIface.(*log.Logger) +} + // execute executes a workflow action, optionally capturing logs. -func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction, captureLogs bool) (pb.State, error) { - l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) { + l := w.getLogger(ctx).With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) - cli := w.registryClient - if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil { + if err := w.containerManager.PullImage(ctx, action.GetImage()); err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "pull image") } - id, err := w.createContainer(ctx, action.Command, wfID, action, captureLogs) + id, err := w.containerManager.CreateContainer(ctx, action.Command, wfID, action, w.captureLogs, w.createPrivileged) if err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "create container") } @@ -121,23 +182,23 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } defer cancel() - err = startContainer(timeCtx, l, cli, id) + err = w.containerManager.StartContainer(timeCtx, id) if err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "start container") } - if captureLogs { - go w.captureLogs(ctx, id) + if w.captureLogs { + go w.logCapturer.CaptureLogs(ctx, id) } - st, err := waitContainer(timeCtx, cli, id) + st, err := w.containerManager.WaitForContainer(timeCtx, id) l.With("status", st.String()).Info("wait container completed") // If we've made it this far, the container has successfully completed. // Everything after this is just cleanup. defer func() { - if err := removeContainer(ctx, l, cli, id); err != nil { + if err := w.containerManager.RemoveContainer(ctx, id); err != nil { l.With("containerID", id).Error(err) } l.With("status", st.String()).Info("container removed") @@ -155,16 +216,16 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } if st == pb.State_STATE_TIMEOUT && action.OnTimeout != nil { - rst := w.executeReaction(ctx, st.String(), action.OnTimeout, wfID, action, captureLogs, l) + rst := w.executeReaction(ctx, st.String(), action.OnTimeout, wfID, action) l.With("status", rst).Info("action timeout") } else if action.OnFailure != nil { - rst := w.executeReaction(ctx, st.String(), action.OnFailure, wfID, action, captureLogs, l) + rst := w.executeReaction(ctx, st.String(), action.OnFailure, wfID, action) l.With("status", rst).Info("action failed") } - l.Info(infoWaitFinished) + l.Info(InfoWaitFinished) if err != nil { - l.Error(errors.Wrap(err, errFailedToWait)) + l.Error(errors.Wrap(err, ErrFailedToWait)) } l.With("status", st).Info("action container exited") @@ -172,40 +233,44 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } // executeReaction executes special case OnTimeout/OnFailure actions. -func (w *Worker) executeReaction(ctx context.Context, reaction string, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs bool, l log.Logger) pb.State { - id, err := w.createContainer(ctx, cmd, wfID, action, captureLogs) +func (w *Worker) executeReaction(ctx context.Context, reaction string, cmd []string, wfID string, action *pb.WorkflowAction) pb.State { + l := w.getLogger(ctx) + id, err := w.containerManager.CreateContainer(ctx, cmd, wfID, action, w.captureLogs, w.createPrivileged) if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) + l.Error(errors.Wrap(err, ErrFailedToRunCmd)) } l.With("containerID", id, "actionStatus", reaction, "command", cmd).Info("container created") - if captureLogs { - go w.captureLogs(ctx, id) + + if w.captureLogs { + go w.logCapturer.CaptureLogs(ctx, id) } st := make(chan pb.State) - go waitFailedContainer(ctx, l, w.registryClient, id, st) - err = startContainer(ctx, l, w.registryClient, id) + go w.containerManager.WaitForFailedContainer(ctx, id, st) + err = w.containerManager.StartContainer(ctx, id) if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) + l.Error(errors.Wrap(err, ErrFailedToRunCmd)) } return <-st } // ProcessWorkflowActions gets all Workflow contexts and processes their actions. -func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, captureActionLogs bool) error { - l := w.logger.With("workerID", workerID) +func (w *Worker) ProcessWorkflowActions(ctx context.Context) error { + l := w.logger.With("workerID", w.workerID) for { - res, err := w.client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) + res, err := w.tinkClient.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: w.workerID}) if err != nil { return errors.Wrap(err, errGetWfContext) } for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() { wfID := wfContext.GetWorkflowId() l = l.With("workflowID", wfID) - actions, err := w.client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) + ctx := context.WithValue(ctx, loggingContextKey, &l) + + actions, err := w.tinkClient.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) if err != nil { return errors.Wrap(err, errGetWfActions) } @@ -214,7 +279,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca actionIndex := 0 var nextAction *pb.WorkflowAction if wfContext.GetCurrentAction() == "" { - if actions.GetActionList()[0].GetWorkerId() == workerID { + if actions.GetActionList()[0].GetWorkerId() == w.workerID { actionIndex = 0 turn = true } @@ -243,18 +308,18 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca "totalNumberOfActions", wfContext.GetTotalNumberOfActions(), ) l.Info("current context") - if nextAction.GetWorkerId() == workerID { + if nextAction.GetWorkerId() == w.workerID { turn = true } } if turn { - wfDir := dataDir + string(os.PathSeparator) + wfID + wfDir := filepath.Join(w.dataDir, wfID) l := l.With("actionName", actions.GetActionList()[actionIndex].GetName(), "taskName", actions.GetActionList()[actionIndex].GetTaskName(), ) if _, err := os.Stat(wfDir); os.IsNotExist(err) { - err := os.Mkdir(wfDir, os.FileMode(0o755)) + err := os.MkdirAll(wfDir, os.FileMode(0o755)) if err != nil { l.Error(err) os.Exit(1) @@ -281,6 +346,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca l := l.With("actionName", action.GetName(), "taskName", action.GetTaskName(), ) + ctx := context.WithValue(ctx, loggingContextKey, &l) if wfContext.GetCurrentActionState() != pb.State_STATE_RUNNING { actionStatus := &pb.WorkflowActionStatus{ WorkflowId: wfID, @@ -291,7 +357,6 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca Message: "Started execution", WorkerId: action.GetWorkerId(), } - err := w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err, l) @@ -300,11 +365,11 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca } // get workflow data - getWorkflowData(ctx, l, w.client, workerID, wfID) + w.getWorkflowData(ctx, wfID) // start executing the action start := time.Now() - st, err := w.execute(ctx, wfID, action, captureActionLogs) + st, err := w.execute(ctx, wfID, action) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{ @@ -350,7 +415,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, ca } nextAction := actions.GetActionList()[actionIndex+1] - if nextAction.GetWorkerId() != workerID { + if nextAction.GetWorkerId() != w.workerID { l.Debug(fmt.Sprintf(msgTurn, nextAction.GetWorkerId())) turn = false } else { @@ -376,14 +441,15 @@ func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) } func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error { - l := w.logger.With("workflowID", actionStatus.GetWorkflowId, + l := w.getLogger(ctx).With("workflowID", actionStatus.GetWorkflowId(), "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) var err error for r := 1; r <= w.retries; r++ { - _, err = w.client.ReportActionStatus(ctx, actionStatus) + l.Info("reporting Action Status") + _, err = w.tinkClient.ReportActionStatus(ctx, actionStatus) if err != nil { l.Error(errors.Wrap(err, errReportActionStatus)) <-time.After(w.retryInterval) @@ -395,19 +461,23 @@ func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.Workfl return err } -func getWorkflowData(ctx context.Context, logger log.Logger, c pb.WorkflowServiceClient, workerID, workflowID string) { - l := logger.With("workflowID", workflowID, - "workerID", workerID, +func (w *Worker) getWorkflowData(ctx context.Context, workflowID string) { + l := w.getLogger(ctx).With("workflowID", workflowID, + "workerID", w.workerID, ) - res, err := c.GetWorkflowData(ctx, &pb.GetWorkflowDataRequest{WorkflowId: workflowID}) + res, err := w.tinkClient.GetWorkflowData(ctx, &pb.GetWorkflowDataRequest{WorkflowId: workflowID}) if err != nil { l.Error(err) } if len(res.GetData()) != 0 { - wfDir := dataDir + string(os.PathSeparator) + workflowID + wfDir := filepath.Join(w.dataDir, w.workerID) f := openDataFile(wfDir, l) - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + l.With("file", f.Name()).Error(err) + } + }() _, err := f.Write(res.GetData()) if err != nil { @@ -419,15 +489,19 @@ func getWorkflowData(ctx context.Context, logger log.Logger, c pb.WorkflowServic } func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.WorkflowActionStatus) { - l := w.logger.With("workflowID", actionStatus.GetWorkflowId, + l := w.getLogger(ctx).With("workflowID", actionStatus.GetWorkflowId, "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) - wfDir := dataDir + string(os.PathSeparator) + actionStatus.GetWorkflowId() + wfDir := filepath.Join(w.dataDir, actionStatus.GetWorkflowId()) f := openDataFile(wfDir, l) - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + l.With("file", f.Name()).Error(err) + } + }() data, err := ioutil.ReadAll(f) if err != nil { @@ -439,18 +513,18 @@ func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.Workfl if _, ok := workflowDataSHA[actionStatus.GetWorkflowId()]; !ok { checksum := base64.StdEncoding.EncodeToString(h.Sum(data)) workflowDataSHA[actionStatus.GetWorkflowId()] = checksum - sendUpdate(ctx, w.logger, w.client, actionStatus, data, checksum) + w.sendUpdate(ctx, actionStatus, data, checksum) } else { newSHA := base64.StdEncoding.EncodeToString(h.Sum(data)) if !strings.EqualFold(workflowDataSHA[actionStatus.GetWorkflowId()], newSHA) { - sendUpdate(ctx, w.logger, w.client, actionStatus, data, newSHA) + w.sendUpdate(ctx, actionStatus, data, newSHA) } } } } -func sendUpdate(ctx context.Context, logger log.Logger, c pb.WorkflowServiceClient, st *pb.WorkflowActionStatus, data []byte, checksum string) { - l := logger.With("workflowID", st.GetWorkflowId, +func (w *Worker) sendUpdate(ctx context.Context, st *pb.WorkflowActionStatus, data []byte, checksum string) { + l := w.getLogger(ctx).With("workflowID", st.GetWorkflowId, "workerID", st.GetWorkerId(), "actionName", st.GetActionName(), "taskName", st.GetTaskName(), @@ -468,7 +542,7 @@ func sendUpdate(ctx context.Context, logger log.Logger, c pb.WorkflowServiceClie os.Exit(1) } - _, err = c.UpdateWorkflowData(ctx, &pb.UpdateWorkflowDataRequest{ + _, err = w.tinkClient.UpdateWorkflowData(ctx, &pb.UpdateWorkflowDataRequest{ WorkflowId: st.GetWorkflowId(), Data: data, Metadata: metadata, diff --git a/go.mod b/go.mod index 0243337fe..13139bb19 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/lib/pq v1.10.1 github.com/matryer/moq v0.2.3 github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect - github.com/opencontainers/image-spec v1.0.2 // indirect + github.com/opencontainers/image-spec v1.0.2 github.com/packethost/pkg v0.0.0-20200903155310-0433e0605550 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0