diff --git a/cmd/tink-worker/action.go b/cmd/tink-worker/action.go deleted file mode 100644 index f06ff2f26..000000000 --- a/cmd/tink-worker/action.go +++ /dev/null @@ -1,276 +0,0 @@ -package main - -import ( - "bufio" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "os" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/packethost/pkg/log" - "github.com/pkg/errors" - pb "github.com/tinkerbell/tink/protos/workflow" -) - -var ( - registry string - cli *client.Client -) - -const ( - errCreateContainer = "failed to create container" - errRemoveContainer = "failed to remove container" - errFailedToWait = "failed to wait for completion of action" - errFailedToRunCmd = "failed to run on-timeout command" - - infoWaitFinished = "wait finished for failed or timeout container" -) - -func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string) (pb.ActionState, error) { - l := logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) - err := pullActionImage(ctx, action) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL") - } - id, err := createContainer(ctx, l, action, action.Command, wfID) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE") - } - l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created") - // Setting time context for action - timeCtx := ctx - if action.Timeout > 0 { - var cancel context.CancelFunc - timeCtx, cancel = context.WithTimeout(ctx, time.Duration(action.Timeout)*time.Second) - defer cancel() - } - err = startContainer(timeCtx, l, id) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN") - } - - failedActionStatus := make(chan pb.ActionState) - - //capturing logs of action container in a go-routine - go captureLogs(ctx, id) - - status, err := waitContainer(timeCtx, id) - if err != nil { - rerr := removeContainer(ctx, l, id) - if rerr != nil { - rerr = errors.Wrap(rerr, errRemoveContainer) - l.With("containerID", id).Error(rerr) - return status, rerr - } - return status, errors.Wrap(err, "DOCKER_WAIT") - } - rerr := removeContainer(ctx, l, id) - if rerr != nil { - return status, errors.Wrap(rerr, "DOCKER_REMOVE") - } - l.With("status", status.String()).Info("container removed") - if status != pb.ActionState_ACTION_SUCCESS { - if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { - id, err = createContainer(ctx, l, action, action.OnTimeout, wfID) - if err != nil { - l.Error(errors.Wrap(err, errCreateContainer)) - } - l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") - failedActionStatus := make(chan pb.ActionState) - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = startContainer(ctx, l, id) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - onTimeoutStatus := <-failedActionStatus - l.With("status", onTimeoutStatus).Info("action timeout") - } else { - if action.OnFailure != nil { - id, err = createContainer(ctx, l, action, action.OnFailure, wfID) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = startContainer(ctx, l, id) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - onFailureStatus := <-failedActionStatus - l.With("status", onFailureStatus).Info("action failed") - } - } - l.Info(infoWaitFinished) - if err != nil { - rerr := removeContainer(ctx, l, id) - if rerr != nil { - l.Error(errors.Wrap(rerr, errRemoveContainer)) - } - l.Error(errors.Wrap(err, errFailedToWait)) - } - rerr = removeContainer(ctx, l, id) - if rerr != nil { - l.Error(errors.Wrap(rerr, errRemoveContainer)) - } - } - l.With("status", status).Info("action container exited") - return status, nil -} - -func captureLogs(ctx context.Context, id string) { - reader, err := cli.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: false, - }) - if err != nil { - panic(err) - } - defer reader.Close() - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - fmt.Println(scanner.Text()) - } -} - -func pullActionImage(ctx context.Context, action *pb.WorkflowAction) error { - user := os.Getenv("REGISTRY_USERNAME") - pwd := os.Getenv("REGISTRY_PASSWORD") - if user == "" || pwd == "" { - return errors.New("required REGISTRY_USERNAME and REGISTRY_PASSWORD") - } - - authConfig := types.AuthConfig{ - Username: user, - Password: pwd, - ServerAddress: registry, - } - encodedJSON, err := json.Marshal(authConfig) - if err != nil { - return errors.Wrap(err, "DOCKER AUTH") - } - authStr := base64.URLEncoding.EncodeToString(encodedJSON) - - out, err := cli.ImagePull(ctx, registry+"/"+action.GetImage(), types.ImagePullOptions{RegistryAuth: authStr}) - if err != nil { - return errors.Wrap(err, "DOCKER PULL") - } - defer out.Close() - if _, err := io.Copy(os.Stdout, out); err != nil { - return err - } - return nil -} - -func createContainer(ctx context.Context, l log.Logger, action *pb.WorkflowAction, cmd []string, wfID string) (string, error) { - config := &container.Config{ - Image: registry + "/" + action.GetImage(), - AttachStdout: true, - AttachStderr: true, - Tty: true, - Env: action.GetEnvironment(), - } - if cmd != nil { - config.Cmd = cmd - } - - wfDir := dataDir + string(os.PathSeparator) + wfID - hostConfig := &container.HostConfig{ - Privileged: true, - Binds: []string{wfDir + ":/workflow"}, - } - hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) - l.With("command", cmd).Info("creating container") - resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) - if err != nil { - return "", errors.Wrap(err, "DOCKER CREATE") - } - return resp.ID, nil -} - -func startContainer(ctx context.Context, l log.Logger, id string) error { - l.With("containerID", id).Debug("starting container") - err := cli.ContainerStart(ctx, id, types.ContainerStartOptions{}) - if err != nil { - return errors.Wrap(err, "DOCKER START") - } - return nil -} - -func waitContainer(ctx context.Context, id string) (pb.ActionState, error) { - // Inspect whether the container is in running state - _, err := cli.ContainerInspect(ctx, id) - if err != nil { - return pb.ActionState_ACTION_FAILED, nil - } - - // send API call to wait for the container completion - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - if status.StatusCode == 0 { - return pb.ActionState_ACTION_SUCCESS, nil - } - return pb.ActionState_ACTION_FAILED, nil - case err := <-errC: - return pb.ActionState_ACTION_FAILED, err - case <-ctx.Done(): - return pb.ActionState_ACTION_TIMEOUT, ctx.Err() - } -} - -func waitFailedContainer(ctx context.Context, id string, failedActionStatus chan pb.ActionState) { - // send API call to wait for the container completion - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - if status.StatusCode == 0 { - failedActionStatus <- pb.ActionState_ACTION_SUCCESS - } - failedActionStatus <- pb.ActionState_ACTION_FAILED - case err := <-errC: - logger.Error(err) - failedActionStatus <- pb.ActionState_ACTION_FAILED - } -} - -func removeContainer(ctx context.Context, l log.Logger, id string) error { - // create options for removing container - opts := types.ContainerRemoveOptions{ - Force: true, - RemoveLinks: false, - RemoveVolumes: true, - } - l.With("containerID", id).Info("removing container") - - // send API call to remove the container - err := cli.ContainerRemove(ctx, id, opts) - if err != nil { - return err - } - return nil -} - -func initializeDockerClient() (*client.Client, error) { - registry = os.Getenv("DOCKER_REGISTRY") - if registry == "" { - return nil, errors.New("required DOCKER_REGISTRY") - } - c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, errors.Wrap(err, "DOCKER CLIENT") - } - return c, nil -} diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go new file mode 100644 index 000000000..02ad1ed9f --- /dev/null +++ b/cmd/tink-worker/cmd/root.go @@ -0,0 +1,173 @@ +package cmd + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/packethost/pkg/log" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "github.com/tinkerbell/tink/client" + "github.com/tinkerbell/tink/cmd/tink-worker/internal" + pb "github.com/tinkerbell/tink/protos/workflow" + "google.golang.org/grpc" +) + +const ( + defaultRetryInterval = 3 + defaultRetryCount = 3 + defaultMaxFileSize int64 = 10 * 1024 * 1024 //10MB + defaultTimeoutMinutes = 60 +) + +// NewRootCommand creates a new Tink Worker Cobra root command +func NewRootCommand(version string, logger log.Logger) *cobra.Command { + rootCmd := &cobra.Command{ + Use: "tink-worker", + Short: "Tink Worker", + Version: version, + PreRunE: func(cmd *cobra.Command, args []string) error { + viper, err := createViper(logger) + if err != nil { + return err + } + return applyViper(viper, cmd) + }, + RunE: func(cmd *cobra.Command, args []string) error { + retryInterval, _ := cmd.Flags().GetDuration("retry-interval") + retries, _ := cmd.Flags().GetInt("retries") + // TODO(displague) is log-level no longer useful? + // logLevel, _ := cmd.Flags().GetString("log-level") + workerID, _ := cmd.Flags().GetString("id") + maxFileSize, _ := cmd.Flags().GetInt64("max-file-size") + timeOut, _ := cmd.Flags().GetDuration("timeout") + user, _ := cmd.Flags().GetString("registry-username") + pwd, _ := cmd.Flags().GetString("registry-password") + registry, _ := cmd.Flags().GetString("docker-registry") + + logger.With("version", version).Info("starting") + if setupErr := client.Setup(); setupErr != nil { + return setupErr + } + + ctx := context.Background() + if timeOut > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeOut) + defer cancel() + } + + conn, err := tryClientConnection(logger, retryInterval, retries) + if err != nil { + return err + } + rClient := pb.NewWorkflowSvcClient(conn) + + regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) + worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize) + + err = worker.ProcessWorkflowActions(ctx, workerID) + if err != nil { + return errors.Wrap(err, "worker Finished with error") + } + return nil + }, + } + + rootCmd.Flags().Duration("retry-interval", defaultRetryInterval, "Retry interval in seconds (RETRY_INTERVAL)") + + rootCmd.Flags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete (TIMEOUT)") + + rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt (MAX_RETRY)") + + rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)") + + // rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") + + must := func(err error) { + if err != nil { + logger.Fatal(err) + } + } + + rootCmd.Flags().StringP("id", "i", "", "Sets the worker id (ID)") + must(rootCmd.MarkFlagRequired("id")) + + rootCmd.Flags().StringP("docker-registry", "r", "", "Sets the Docker registry (DOCKER_REGISTRY)") + must(rootCmd.MarkFlagRequired("docker-registry")) + + rootCmd.Flags().StringP("registry-username", "u", "", "Sets the registry username (REGISTRY_USERNAME)") + must(rootCmd.MarkFlagRequired("registry-username")) + + rootCmd.Flags().StringP("registry-password", "p", "", "Sets the registry-password (REGISTRY_PASSWORD)") + must(rootCmd.MarkFlagRequired("registry-password")) + + return rootCmd +} + +// createViper creates a Viper object configured to read in configuration files +// (from various paths with content type specific filename extensions) and loads +// environment variables. +func createViper(logger log.Logger) (*viper.Viper, error) { + v := viper.New() + v.AutomaticEnv() + v.SetConfigName("tink-worker") + v.AddConfigPath("/etc/tinkerbell") + v.AddConfigPath(".") + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) + + // If a config file is found, read it in. + if err := v.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + logger.With("configFile", v.ConfigFileUsed()).Error(err, "could not load config file") + return nil, err + } + logger.Info("no config file found") + } else { + logger.With("configFile", v.ConfigFileUsed()).Info("loaded config file") + } + + return v, nil +} + +func applyViper(v *viper.Viper, cmd *cobra.Command) error { + errors := []error{} + + cmd.Flags().VisitAll(func(f *pflag.Flag) { + if !f.Changed && v.IsSet(f.Name) { + val := v.Get(f.Name) + if err := cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)); err != nil { + errors = append(errors, err) + return + } + } + }) + + if len(errors) > 0 { + errs := []string{} + for _, err := range errors { + errs = append(errs, err.Error()) + } + return fmt.Errorf(strings.Join(errs, ", ")) + } + + return nil +} + +func tryClientConnection(logger log.Logger, retryInterval time.Duration, retries int) (*grpc.ClientConn, error) { + for ; retries > 0; retries-- { + c, err := client.GetConnection() + if err != nil { + logger.With("error", err, "duration", retryInterval).Info("failed to connect, sleeping before retrying") + <-time.After(retryInterval * time.Second) + continue + } + + return c, nil + } + return nil, fmt.Errorf("retries exceeded") +} diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go new file mode 100644 index 000000000..22b56608d --- /dev/null +++ b/cmd/tink-worker/internal/action.go @@ -0,0 +1,106 @@ +package internal + +import ( + "context" + "path" + "path/filepath" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" + "github.com/pkg/errors" + pb "github.com/tinkerbell/tink/protos/workflow" +) + +const ( + errCreateContainer = "failed to create container" + errFailedToWait = "failed to wait for completion of action" + errFailedToRunCmd = "failed to run on-timeout command" + + infoWaitFinished = "wait finished for failed or timeout container" +) + +func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) { + registry := w.registry + config := &container.Config{ + Image: path.Join(registry, action.GetImage()), + AttachStdout: true, + AttachStderr: true, + Cmd: cmd, + Tty: true, + Env: action.GetEnvironment(), + } + + wfDir := filepath.Join(dataDir, wfID) + hostConfig := &container.HostConfig{ + Privileged: true, + Binds: []string{wfDir + ":/workflow"}, + } + hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) + w.logger.With("command", cmd).Info("creating container") + resp, err := w.registryClient.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) + if err != nil { + return "", errors.Wrap(err, "DOCKER CREATE") + } + return resp.ID, nil +} + +func startContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { + l.With("containerID", id).Debug("starting container") + return errors.Wrap(cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START") +} + +func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.ActionState, error) { + // Inspect whether the container is in running state + if _, err := cli.ContainerInspect(ctx, id); err != nil { + return pb.ActionState_ACTION_FAILED, nil + } + + // send API call to wait for the container completion + wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + if status.StatusCode == 0 { + return pb.ActionState_ACTION_SUCCESS, nil + } + return pb.ActionState_ACTION_FAILED, nil + case err := <-errC: + return pb.ActionState_ACTION_FAILED, err + case <-ctx.Done(): + return pb.ActionState_ACTION_TIMEOUT, ctx.Err() + } +} + +func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.ActionState) { + // send API call to wait for the container completion + wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + if status.StatusCode == 0 { + failedActionStatus <- pb.ActionState_ACTION_SUCCESS + } + failedActionStatus <- pb.ActionState_ACTION_FAILED + case err := <-errC: + l.Error(err) + failedActionStatus <- pb.ActionState_ACTION_FAILED + case <-ctx.Done(): + l.Error(ctx.Err()) + failedActionStatus <- pb.ActionState_ACTION_TIMEOUT + } +} + +func removeContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { + // create options for removing container + opts := types.ContainerRemoveOptions{ + Force: true, + RemoveLinks: false, + RemoveVolumes: true, + } + l.With("containerID", id).Info("removing container") + + // send API call to remove the container + return cli.ContainerRemove(ctx, id, opts) +} diff --git a/cmd/tink-worker/internal/registry.go b/cmd/tink-worker/internal/registry.go new file mode 100644 index 000000000..a20f12e32 --- /dev/null +++ b/cmd/tink-worker/internal/registry.go @@ -0,0 +1,71 @@ +package internal + +import ( + "context" + "encoding/base64" + "encoding/json" + "io" + "os" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" + "github.com/pkg/errors" +) + +// RegistryConnDetails are the connection details for accessing a Docker +// registry and logging activities +type RegistryConnDetails struct { + registry, + user, + pwd string + logger log.Logger +} + +// NewRegistryConnDetails creates a new RegistryConnDetails +func NewRegistryConnDetails(registry, user, pwd string, logger log.Logger) *RegistryConnDetails { + return &RegistryConnDetails{ + registry: registry, + user: user, + pwd: pwd, + logger: logger, + } +} + +// NewClient uses the RegistryConnDetails to create a new Docker Client +func (r *RegistryConnDetails) NewClient() (*client.Client, error) { + if r.registry == "" { + return nil, errors.New("required DOCKER_REGISTRY") + } + c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + + if err != nil { + return nil, errors.Wrap(err, "DOCKER CLIENT") + } + + return c, nil +} + +// pullImage outputs to stdout the contents of the requested image (relative to the registry) +func (r *RegistryConnDetails) pullImage(ctx context.Context, cli *client.Client, image string) error { + authConfig := types.AuthConfig{ + Username: r.user, + Password: r.pwd, + ServerAddress: r.registry, + } + encodedJSON, err := json.Marshal(authConfig) + if err != nil { + return errors.Wrap(err, "DOCKER AUTH") + } + authStr := base64.URLEncoding.EncodeToString(encodedJSON) + + out, err := cli.ImagePull(ctx, r.registry+"/"+image, types.ImagePullOptions{RegistryAuth: authStr}) + if err != nil { + return errors.Wrap(err, "DOCKER PULL") + } + defer out.Close() + if _, err := io.Copy(os.Stdout, out); err != nil { + return err + } + return nil +} diff --git a/cmd/tink-worker/worker.go b/cmd/tink-worker/internal/worker.go similarity index 56% rename from cmd/tink-worker/worker.go rename to cmd/tink-worker/internal/worker.go index c4c88d0fb..310318cff 100644 --- a/cmd/tink-worker/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -1,6 +1,7 @@ -package main +package internal import ( + "bufio" "context" sha "crypto/sha256" "encoding/base64" @@ -12,6 +13,8 @@ import ( "strings" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" "github.com/packethost/pkg/log" "github.com/pkg/errors" pb "github.com/tinkerbell/tink/protos/workflow" @@ -19,10 +22,8 @@ import ( ) const ( - dataFile = "data" - dataDir = "/worker" - maxFileSize = "MAX_FILE_SIZE" // in bytes - defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes + dataFile = "data" + dataDir = "/worker" errGetWfContext = "failed to get workflow context" errGetWfActions = "failed to get actions for workflow" @@ -45,28 +46,154 @@ type WorkflowMetadata struct { SHA string `json:"sha256"` } -func processWorkflowActions(client pb.WorkflowSvcClient) error { - workerID := os.Getenv("WORKER_ID") - if workerID == "" { - return errors.New("required WORKER_ID") +// Worker details provide all the context needed to run a +type Worker struct { + client pb.WorkflowSvcClient + regConn *RegistryConnDetails + registryClient *client.Client + logger log.Logger + registry string + retries int + retryInterval time.Duration + maxSize int64 +} + +// NewWorker creates a new Worker, creating a new Docker registry client +func NewWorker(client pb.WorkflowSvcClient, regConn *RegistryConnDetails, logger log.Logger, registry string, retries int, retryInterval time.Duration, maxFileSize int64) *Worker { + registryClient, err := regConn.NewClient() + if err != nil { + panic(err) + } + return &Worker{ + client: client, + regConn: regConn, + registryClient: registryClient, + logger: logger, + registry: registry, + retries: retries, + retryInterval: retryInterval, + maxSize: maxFileSize, } +} - ctx := context.Background() - var err error - cli, err = initializeDockerClient() +func (w *Worker) captureLogs(ctx context.Context, id string) { + reader, err := w.registryClient.ContainerLogs(ctx, id, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: false, + }) if err != nil { - return err + panic(err) + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + fmt.Println(scanner.Text()) + } +} + +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.ActionState, error) { + l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) + + cli := w.registryClient + if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil { + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL") + } + id, err := w.createContainer(ctx, action.Command, wfID, action) + if err != nil { + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE") + } + l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created") + + var timeCtx context.Context + var cancel context.CancelFunc + + if action.Timeout > 0 { + timeCtx, cancel = context.WithTimeout(ctx, time.Duration(action.Timeout)*time.Second) + } else { + timeCtx, cancel = context.WithTimeout(ctx, 1*time.Hour) + } + defer cancel() + + err = startContainer(timeCtx, l, cli, id) + if err != nil { + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN") + } + + failedActionStatus := make(chan pb.ActionState) + + // capturing logs of action container in a go-routine + go w.captureLogs(ctx, id) + + status, waitErr := waitContainer(timeCtx, cli, id) + defer func() { + if removalErr := removeContainer(ctx, l, cli, id); removalErr != nil { + l.With("containerID", id).Error(removalErr) + } + }() + + if waitErr != nil { + return status, errors.Wrap(waitErr, "DOCKER_WAIT") } + + l.With("status", status.String()).Info("container removed") + if status != pb.ActionState_ACTION_SUCCESS { + if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { + id, err = w.createContainer(ctx, action.OnTimeout, wfID, action) + if err != nil { + l.Error(errors.Wrap(err, errCreateContainer)) + } + l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") + failedActionStatus := make(chan pb.ActionState) + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, l, cli, id, failedActionStatus) + err = startContainer(ctx, l, cli, id) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + onTimeoutStatus := <-failedActionStatus + l.With("status", onTimeoutStatus).Info("action timeout") + } else { + if action.OnFailure != nil { + id, err = w.createContainer(ctx, action.OnFailure, wfID, action) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, l, cli, id, failedActionStatus) + err = startContainer(ctx, l, cli, id) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + onFailureStatus := <-failedActionStatus + l.With("status", onFailureStatus).Info("action failed") + } + } + l.Info(infoWaitFinished) + if err != nil { + l.Error(errors.Wrap(err, errFailedToWait)) + } + } + l.With("status", status).Info("action container exited") + return status, nil +} + +// ProcessWorkflowActions gets all Workflow contexts and processes their actions +func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error { + l := w.logger.With("workerID", workerID) + for { - l := logger.With("workerID", workerID) - res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) + res, err := w.client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) if err != nil { return errors.Wrap(err, errGetWfContext) } for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() { wfID := wfContext.GetWorkflowId() l = l.With("workflowID", wfID) - actions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) + actions, err := w.client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) if err != nil { return errors.Wrap(err, errGetWfActions) } @@ -153,7 +280,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { WorkerId: action.GetWorkerId(), } - err := reportActionStatus(ctx, client, actionStatus) + err := w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err, l) } @@ -161,11 +288,11 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } // get workflow data - getWorkflowData(ctx, client, workerID, wfID) + getWorkflowData(ctx, l, w.client, workerID, wfID) // start executing the action start := time.Now() - status, err := executeAction(ctx, actions.GetActionList()[actionIndex], wfID) + status, err := w.execute(ctx, wfID, action) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{ @@ -184,9 +311,8 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } l.With("actionStatus", actionStatus.ActionStatus.String()) l.Error(err) - rerr := reportActionStatus(ctx, client, actionStatus) - if rerr != nil { - exitWithGrpcError(rerr, l) + if reportErr := w.reportActionStatus(ctx, actionStatus); reportErr != nil { + exitWithGrpcError(reportErr, l) } delete(workflowcontexts, wfID) return err @@ -195,14 +321,14 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { actionStatus.ActionStatus = pb.ActionState_ACTION_SUCCESS actionStatus.Message = "finished execution successfully" - err = reportActionStatus(ctx, client, actionStatus) + err = w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err, l) } l.Info("sent action status") // send workflow data, if updated - updateWorkflowData(ctx, client, actionStatus) + w.updateWorkflowData(ctx, actionStatus) if len(actions.GetActionList()) == actionIndex+1 { l.Info("reached to end of workflow") @@ -219,8 +345,8 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } } } - // sleep for 3 seconds before asking for new workflows - time.Sleep(retryInterval * time.Second) + // sleep before asking for new workflows + <-time.After(w.retryInterval * time.Second) } } @@ -236,19 +362,19 @@ func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1 } -func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) error { - l := logger.With("workflowID", actionStatus.GetWorkflowId, +func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error { + l := w.logger.With("workflowID", actionStatus.GetWorkflowId, "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) var err error - for r := 1; r <= retries; r++ { - _, err = client.ReportActionStatus(ctx, actionStatus) + for r := 1; r <= w.retries; r++ { + _, err = w.client.ReportActionStatus(ctx, actionStatus) if err != nil { l.Error(errors.Wrap(err, errReportActionStatus)) - l.With("default", retryIntervalDefault).Info("RETRY_INTERVAL not set") - <-time.After(retryInterval * time.Second) + <-time.After(w.retryInterval * time.Second) + continue } return nil @@ -256,7 +382,7 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action return err } -func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workerID, workflowID string) { +func getWorkflowData(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, workerID, workflowID string) { l := logger.With("workflowID", workflowID, "workerID", workerID, ) @@ -279,12 +405,13 @@ func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workerID, } } -func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) { - l := logger.With("workflowID", actionStatus.GetWorkflowId, +func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.WorkflowActionStatus) { + l := w.logger.With("workflowID", actionStatus.GetWorkflowId, "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) + wfDir := dataDir + string(os.PathSeparator) + actionStatus.GetWorkflowId() f := openDataFile(wfDir, l) defer f.Close() @@ -294,22 +421,22 @@ func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, action l.Error(err) } - if isValidDataFile(f, data, l) { + if isValidDataFile(f, w.maxSize, data, l) { h := sha.New() if _, ok := workflowDataSHA[actionStatus.GetWorkflowId()]; !ok { checksum := base64.StdEncoding.EncodeToString(h.Sum(data)) workflowDataSHA[actionStatus.GetWorkflowId()] = checksum - sendUpdate(ctx, client, actionStatus, data, checksum) + sendUpdate(ctx, w.logger, w.client, actionStatus, data, checksum) } else { newSHA := base64.StdEncoding.EncodeToString(h.Sum(data)) if !strings.EqualFold(workflowDataSHA[actionStatus.GetWorkflowId()], newSHA) { - sendUpdate(ctx, client, actionStatus, data, newSHA) + sendUpdate(ctx, w.logger, w.client, actionStatus, data, newSHA) } } } } -func sendUpdate(ctx context.Context, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) { +func sendUpdate(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) { l := logger.With("workflowID", st.GetWorkflowId, "workerID", st.GetWorkerId(), "actionName", st.GetActionName(), @@ -348,7 +475,7 @@ func openDataFile(wfDir string, l log.Logger) *os.File { return f } -func isValidDataFile(f *os.File, data []byte, l log.Logger) bool { +func isValidDataFile(f *os.File, maxSize int64, data []byte, l log.Logger) bool { var dataMap map[string]interface{} err := json.Unmarshal(data, &dataMap) if err != nil { @@ -358,17 +485,9 @@ func isValidDataFile(f *os.File, data []byte, l log.Logger) bool { stat, err := f.Stat() if err != nil { - logger.Error(err) + l.Error(err) return false } - val := os.Getenv(maxFileSize) - if val != "" { - maxSize, err := strconv.ParseInt(val, 10, 64) - if err == nil { - logger.Error(err) - } - return stat.Size() <= maxSize - } - return stat.Size() <= defaultMaxFileSize + return stat.Size() <= maxSize } diff --git a/cmd/tink-worker/main.go b/cmd/tink-worker/main.go index 3259e2537..8a8c0cea0 100644 --- a/cmd/tink-worker/main.go +++ b/cmd/tink-worker/main.go @@ -2,103 +2,31 @@ package main import ( "os" - "strconv" - "time" "github.com/packethost/pkg/log" - "github.com/pkg/errors" - "github.com/tinkerbell/tink/client" - pb "github.com/tinkerbell/tink/protos/workflow" - "google.golang.org/grpc" + "github.com/tinkerbell/tink/cmd/tink-worker/cmd" ) const ( - retryIntervalDefault = 3 - retryCountDefault = 3 - - serviceKey = "github.com/tinkerbell/tink" - invalidRetryInterval = "invalid RETRY_INTERVAL, using default (seconds)" - invalidMaxRetry = "invalid MAX_RETRY, using default" - - errWorker = "worker finished with error" + serviceKey = "github.com/tinkerbell/tink" ) var ( - rClient pb.WorkflowSvcClient - retryInterval time.Duration - retries int - logger log.Logger - // version is set at build time version = "devel" ) func main() { - log, err := log.Init(serviceKey) + logger, err := log.Init(serviceKey) if err != nil { panic(err) } - logger = log - defer logger.Close() - log.With("version", version).Info("starting") - setupRetry() - if setupErr := client.Setup(); setupErr != nil { - log.Error(setupErr) - os.Exit(1) - } - conn, err := tryClientConnection() - if err != nil { - log.Error(err) - os.Exit(1) - } - rClient = pb.NewWorkflowSvcClient(conn) - err = processWorkflowActions(rClient) - if err != nil { - log.Error(errors.Wrap(err, errWorker)) - } -} -func tryClientConnection() (*grpc.ClientConn, error) { - var err error - for r := 1; r <= retries; r++ { - c, e := client.GetConnection() - if e != nil { - err = e - logger.With("error", err, "duration", retryInterval).Info("failed to connect, sleeping before retrying") - <-time.After(retryInterval * time.Second) - continue - } - return c, nil - } - return nil, err -} + defer logger.Close() -func setupRetry() { - interval := os.Getenv("RETRY_INTERVAL") - if interval == "" { - logger.With("default", retryIntervalDefault).Info("RETRY_INTERVAL not set") - retryInterval = retryIntervalDefault - } else { - interval, err := time.ParseDuration(interval) - if err != nil { - logger.With("default", retryIntervalDefault).Info(invalidRetryInterval) - retryInterval = retryIntervalDefault - } else { - retryInterval = interval - } - } + rootCmd := cmd.NewRootCommand(version, logger) - maxRetry := os.Getenv("MAX_RETRY") - if maxRetry == "" { - logger.With("default", retryCountDefault).Info("MAX_RETRY not set") - retries = retryCountDefault - } else { - max, err := strconv.Atoi(maxRetry) - if err != nil { - logger.With("default", retryCountDefault).Info(invalidMaxRetry) - retries = retryCountDefault - } else { - retries = max - } + if err := rootCmd.Execute(); err != nil { + os.Exit(1) } } diff --git a/go.mod b/go.mod index 745fb0f67..32059aa18 100644 --- a/go.mod +++ b/go.mod @@ -25,12 +25,16 @@ require ( github.com/packethost/pkg v0.0.0-20200903155310-0433e0605550 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.3.0 + github.com/prometheus/common v0.7.0 + github.com/rollbar/rollbar-go v1.0.2 // indirect github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v1.0.0 + github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.4.0 go.mongodb.org/mongo-driver v1.1.2 // indirect + golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 diff --git a/http-server/http_handlers.go b/http-server/http_handlers.go index ffcb7c16b..cfba032af 100644 --- a/http-server/http_handlers.go +++ b/http-server/http_handlers.go @@ -8,7 +8,8 @@ import ( "net/http" tt "text/template" - // nolint:staticcheck SA1019 We will do it later + // nolint:staticcheck + // SA1019 We will do it later "github.com/golang/protobuf/jsonpb" "github.com/tinkerbell/tink/protos/template" @@ -23,6 +24,8 @@ import ( "google.golang.org/grpc/status" ) +// RegisterHardwareServiceHandlerFromEndpoint serves Hardware requests at the +// given endpoint over GRPC func RegisterHardwareServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil { @@ -214,6 +217,8 @@ func RegisterHardwareServiceHandlerFromEndpoint(ctx context.Context, mux *runtim return nil } +// RegisterTemplateHandlerFromEndpoint serves Template requests at the given +// endpoint over GRPC func RegisterTemplateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil { @@ -347,6 +352,8 @@ func RegisterTemplateHandlerFromEndpoint(ctx context.Context, mux *runtime.Serve return nil } +// RegisterWorkflowSvcHandlerFromEndpoint serves Workflow requests at the given +// endpoint over GRPC func RegisterWorkflowSvcHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil {