diff --git a/cmd/tink-cli/cmd/root.go b/cmd/tink-cli/cmd/root.go index 418dbc0e1..0bf10090e 100644 --- a/cmd/tink-cli/cmd/root.go +++ b/cmd/tink-cli/cmd/root.go @@ -21,6 +21,7 @@ var rootCmd = &cobra.Command{ func init() { cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVarP(&cfgFile, "facility", "f", "", "used to build grpc and http urls") + viper.BindPFlag("facility", rootCmd.Flags().Lookup("facility")) } func setupClient(_ *cobra.Command, _ []string) error { @@ -37,6 +38,10 @@ func Execute(version string) error { // initConfig reads in config file and ENV variables if set. func initConfig() { viper.AutomaticEnv() // read in environment variables that match + viper.SetConfigName(".tinkerbell") + viper.AddConfigPath("/etc/tinkerbell") + viper.AddConfigPath("$HOME/.config/tinkerbell") + viper.AddConfigPath(".") // If a config file is found, read it in. if err := viper.ReadInConfig(); err == nil { diff --git a/cmd/tink-worker/action.go b/cmd/tink-worker/action.go deleted file mode 100644 index 014bf884d..000000000 --- a/cmd/tink-worker/action.go +++ /dev/null @@ -1,310 +0,0 @@ -package main - -import ( - "bufio" - "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "os" - "strings" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/client" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - pb "github.com/tinkerbell/tink/protos/workflow" -) - -var ( - registry string - cli *client.Client - log *logrus.Entry -) - -func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string) (string, pb.ActionState, error) { - log = logger.WithFields(logrus.Fields{"workflow_id": wfID, "worker_id": action.GetWorkerId()}) - err := pullActionImage(ctx, action) - if err != nil { - return fmt.Sprintf("Failed to pull Image : %s", action.GetImage()), 1, errors.Wrap(err, "DOCKER PULL") - } - id, err := createContainer(ctx, action, action.Command, wfID) - if err != nil { - return "Failed to create container", 1, errors.Wrap(err, "DOCKER CREATE") - } - var timeCtx context.Context - var cancel context.CancelFunc - if action.Timeout > 0 { - timeCtx, cancel = context.WithTimeout(context.Background(), time.Duration(action.Timeout)*time.Second) - } else { - timeCtx, cancel = context.WithTimeout(context.Background(), 1*time.Hour) - } - defer cancel() - //run container with timeout context - //startedAt := time.Now() - err = runContainer(timeCtx, id) - if err != nil { - return "Failed to run container", 1, errors.Wrap(err, "DOCKER RUN") - } - - failedActionStatus := make(chan pb.ActionState) - - //capturing logs of action container in a go-routine - go captureLogs(ctx, id) - - status, err, werr := waitContainer(timeCtx, id) - if werr != nil { - rerr := removeContainer(ctx, id) - if rerr != nil { - log.WithField("container_id", id).Errorln("Failed to remove container as ", rerr) - } - return "Failed to wait for completion of action", status, errors.Wrap(err, "DOCKER_WAIT") - } - rerr := removeContainer(ctx, id) - if rerr != nil { - return "Failed to remove container of action", status, errors.Wrap(rerr, "DOCKER_REMOVE") - } - log.Infoln("Container removed with Status ", pb.ActionState(status)) - if status != pb.ActionState_ACTION_SUCCESS { - if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { - id, err = createContainer(ctx, action, action.OnTimeout, wfID) - if err != nil { - log.Errorln("Failed to create container for on-timeout command: ", err) - } - log.Infoln("Container created with on-timeout command : ", action.OnTimeout) - failedActionStatus := make(chan pb.ActionState) - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = runContainer(ctx, id) - if err != nil { - log.Errorln("Failed to run on-timeout command: ", err) - } - onTimeoutStatus := <-failedActionStatus - log.Infoln("On-Timeout Container status : ", onTimeoutStatus) - } else { - if action.OnFailure != nil { - id, err = createContainer(ctx, action, action.OnFailure, wfID) - if err != nil { - log.Errorln("Failed to create on-failure command: ", err) - } - log.Infoln("Container created with on-failure command : ", action.OnFailure) - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = runContainer(ctx, id) - if err != nil { - log.Errorln("Failed to run on-failure command: ", err) - } - onFailureStatus := <-failedActionStatus - log.Infoln("on-failure Container status : ", onFailureStatus) - } - } - log.Infoln("Wait finished for failed or timeout container") - if err != nil { - rerr := removeContainer(ctx, id) - if rerr != nil { - log.Errorln("Failed to remove container as ", rerr) - } - log.Infoln("Failed to wait for container : ", err) - } - rerr = removeContainer(ctx, id) - if rerr != nil { - log.Errorln("Failed to remove container as ", rerr) - } - } - log.Infoln("Action container exits with status code ", status) - return "Successful Execution", status, nil -} - -func captureLogs(ctx context.Context, id string) { - reader, err := cli.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: false, - }) - if err != nil { - panic(err) - } - defer reader.Close() - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - fmt.Println(scanner.Text()) - } -} - -func pullActionImage(ctx context.Context, action *pb.WorkflowAction) error { - user := os.Getenv("REGISTRY_USERNAME") - pwd := os.Getenv("REGISTRY_PASSWORD") - if user == "" || pwd == "" { - return errors.New("required REGISTRY_USERNAME and REGISTRY_PASSWORD") - } - - authConfig := types.AuthConfig{ - Username: user, - Password: pwd, - ServerAddress: registry, - } - encodedJSON, err := json.Marshal(authConfig) - if err != nil { - return errors.Wrap(err, "DOCKER AUTH") - } - authStr := base64.URLEncoding.EncodeToString(encodedJSON) - - out, err := cli.ImagePull(ctx, registry+"/"+action.GetImage(), types.ImagePullOptions{RegistryAuth: authStr}) - if err != nil { - return errors.Wrap(err, "DOCKER PULL") - } - defer out.Close() - if _, err := io.Copy(os.Stdout, out); err != nil { - return err - } - return nil -} - -func createContainer(ctx context.Context, action *pb.WorkflowAction, cmd []string, wfID string) (string, error) { - config := &container.Config{ - Image: registry + "/" + action.GetImage(), - AttachStdout: true, - AttachStderr: true, - Tty: true, - Env: action.GetEnvironment(), - } - - if cmd != nil { - config.Cmd = cmd - } - - wfDir := dataDir + string(os.PathSeparator) + wfID - hostConfig := &container.HostConfig{ - Privileged: true, - Binds: []string{wfDir + ":/workflow"}, - } - hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) - - log.Infoln("Starting the container with cmd", cmd) - resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) - if err != nil { - return "", errors.Wrap(err, "DOCKER CREATE") - } - return resp.ID, nil -} - -func runContainer(ctx context.Context, id string) error { - log.Debugln("run Container with ID : ", id) - err := cli.ContainerStart(ctx, id, types.ContainerStartOptions{}) - if err != nil { - return errors.Wrap(err, "DOCKER START") - } - return nil -} - -func waitContainer(ctx context.Context, id string) (pb.ActionState, error, error) { - // Inspect whether the container is in running state - inspect, err := cli.ContainerInspect(ctx, id) - if err != nil { - log.Debugln("Container does not exists") - return pb.ActionState_ACTION_FAILED, nil, nil - } - if inspect.ContainerJSONBase.State.Running { - log.Debugln("Container with id : ", id, " is in running state") - //return pb.ActionState_ACTION_FAILED, nil, nil - } - // send API call to wait for the container completion - log.Debugln("Starting Container wait for id : ", id) - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - log.Infoln("Container with id ", id, "finished with status code : ", status.StatusCode) - if status.StatusCode == 0 { - return pb.ActionState_ACTION_SUCCESS, nil, nil - } - return pb.ActionState_ACTION_FAILED, nil, nil - case err := <-errC: - log.Errorln("Container wait failed for id : ", id, " Error : ", err) - return pb.ActionState_ACTION_FAILED, nil, err - case <-ctx.Done(): - log.Errorln("Container wait for id : ", id, " is timedout Error : ", err) - return pb.ActionState_ACTION_TIMEOUT, ctx.Err(), nil - } -} - -func waitFailedContainer(ctx context.Context, id string, failedActionStatus chan pb.ActionState) { - // send API call to wait for the container completion - log.Debugln("Starting Container wait for id : ", id) - wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) - - select { - case status := <-wait: - log.Infoln("Container with id ", id, "finished with status code : ", status.StatusCode) - if status.StatusCode == 0 { - failedActionStatus <- pb.ActionState_ACTION_SUCCESS - } - failedActionStatus <- pb.ActionState_ACTION_FAILED - case err := <-errC: - log.Errorln("Container wait failed for id : ", id, " Error : ", err) - failedActionStatus <- pb.ActionState_ACTION_FAILED - } -} - -func removeContainer(ctx context.Context, id string) error { - // create options for removing container - opts := types.ContainerRemoveOptions{ - Force: true, - RemoveLinks: false, - RemoveVolumes: true, - } - log.Debugln("Start removing container ", id) - // send API call to remove the container - err := cli.ContainerRemove(ctx, id, opts) - if err != nil { - return err - } - return nil -} - -func initializeDockerClient() (*client.Client, error) { - registry = os.Getenv("DOCKER_REGISTRY") - if registry == "" { - return nil, errors.New("required DOCKER_REGISTRY") - } - c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, errors.Wrap(err, "DOCKER CLIENT") - } - - logger.SetFormatter(&logrus.JSONFormatter{}) - return c, nil -} - -func initializeLogger() { - level := os.Getenv("WORKER_LOG_LEVEL") - if level != "" { - switch strings.ToLower(level) { - case "panic": - logger.SetLevel(logrus.PanicLevel) - case "fatal": - logger.SetLevel(logrus.FatalLevel) - case "error": - logger.SetLevel(logrus.ErrorLevel) - case "warn", "warning": - logger.SetLevel(logrus.WarnLevel) - case "info": - logger.SetLevel(logrus.InfoLevel) - case "debug": - logger.SetLevel(logrus.DebugLevel) - case "trace": - logger.SetLevel(logrus.TraceLevel) - default: - logger.SetLevel(logrus.InfoLevel) - logger.Warningln("Invalid value for WORKER_LOG_LEVEL", level, " .Setting it to default(Info)") - } - } else { - logger.SetLevel(logrus.InfoLevel) - logger.Warningln("Variable WORKER_LOG_LEVEL is not set. Default is Info") - } -} diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go new file mode 100644 index 000000000..498eb0b64 --- /dev/null +++ b/cmd/tink-worker/cmd/root.go @@ -0,0 +1,232 @@ +package cmd + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "google.golang.org/grpc" + + "github.com/tinkerbell/tink/client" + "github.com/tinkerbell/tink/cmd/tink-worker/internal" + pb "github.com/tinkerbell/tink/protos/workflow" +) + +const ( + retryIntervalDefault = 3 + retryCountDefault = 3 + defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes +) + +func NewRootCommand(version string, logger *logrus.Logger) *cobra.Command { + must := func(err error) { + if err != nil { + logger.Fatal(err) + } + } + + rootCmd := &cobra.Command{ + Use: "tink-worker", + Short: "Tink Worker", + Version: version, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + viper, err := createViper() + if err == nil { + err = applyViper(viper, cmd) + } + return err + }, + RunE: func(cmd *cobra.Command, args []string) error { + retryInterval, _ := cmd.PersistentFlags().GetDuration("retry-interval") + retries, _ := cmd.PersistentFlags().GetInt("retries") + logLevel, _ := cmd.PersistentFlags().GetString("worker-log-level") + workerID, _ := cmd.PersistentFlags().GetString("worker-id") + maxSize, _ := cmd.PersistentFlags().GetInt64("max-size") + timeOut, _ := cmd.PersistentFlags().GetDuration("worker-timeout") + user, _ := cmd.PersistentFlags().GetString("registry-username") + pwd, _ := cmd.PersistentFlags().GetString("registry-password") + registry, _ := cmd.PersistentFlags().GetString("docker-registry") + + /* param debugging + fmt.Println(map[string]interface{}{ + "retryInterval": retryInterval, + "retries": retries, + "logLevel": logLevel, + "workerID": workerID, + "maxSize": maxSize, + "timeOut": timeOut, + "user": user, + "pwd": pwd, + "registry": registry, + }) + */ + + initializeLogger(logger, logLevel) + logger.Debug("Starting version " + version) + if setupErr := client.Setup(); setupErr != nil { + return setupErr + } + + ctx := context.Background() + if timeOut > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeOut) + defer cancel() + } + + conn, err := tryClientConnection(logger, retryInterval, retries) + if err != nil { + return err + } + rClient := pb.NewWorkflowSvcClient(conn) + + regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) + internal.SetLogger(logger) + worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxSize) + + err = worker.ProcessWorkflowActions(ctx, workerID) + if err != nil { + return errors.Wrap(err, "worker Finished with error") + } + return nil + }, + } + + rootCmd.PersistentFlags().DurationP("retry-interval", "i", retryIntervalDefault, "Retry interval in seconds") + + rootCmd.PersistentFlags().DurationP("worker-timeout", "t", 0, "Max duration to wait for worker to complete") + + rootCmd.PersistentFlags().IntP("max-retry", "m", retryCountDefault, "Maximum number of retries to attempt") + + rootCmd.PersistentFlags().Int64P("max-file-size", "s", defaultMaxFileSize, "Maximum file size in bytes") + + rootCmd.PersistentFlags().StringP("worker-log-level", "l", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") + + rootCmd.PersistentFlags().StringP("worker-id", "w", "", "Sets the worker id") + must(rootCmd.MarkPersistentFlagRequired("worker-id")) + + rootCmd.PersistentFlags().StringP("docker-registry", "r", "", "Sets the Docker registry") + must(rootCmd.MarkPersistentFlagRequired("docker-registry")) + + rootCmd.PersistentFlags().StringP("registry-username", "u", "", "Sets the registry username") + must(rootCmd.MarkPersistentFlagRequired("registry-username")) + + rootCmd.PersistentFlags().StringP("registry-password", "p", "", "Sets the registry-password") + must(rootCmd.MarkPersistentFlagRequired("registry-password")) + + return rootCmd +} + +func createViper() (*viper.Viper, error) { + v := viper.New() + v.AutomaticEnv() // read in environment variables that match + v.SetConfigName(".tinkerbell") + v.AddConfigPath("/etc/tinkerbell") + v.AddConfigPath("$HOME/.config/tinkerbell") + v.AddConfigPath(".") + v.SetEnvPrefix("TINK_WORKER") + + // If a config file is found, read it in. + if err := v.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + return nil, err + } + } else { + fmt.Fprintln(os.Stderr, "Using config file:", v.ConfigFileUsed()) + } + + return v, nil +} + +func applyViper(v *viper.Viper, cmd *cobra.Command) error { + /* + if err := v.BindPFlags(cmd.PersistentFlags()); err != nil { + return err + } + */ + errors := []error{} + + cmd.PersistentFlags().VisitAll(func(f *pflag.Flag) { + // Bind flag names to proper env var name, unhyphenated are handled by AutomaticEnv + if strings.Contains(f.Name, "-") { + envVar := strings.ToUpper(strings.ReplaceAll(f.Name, "-", "_")) + f.Usage = f.Usage + " (" + envVar + ")" + err := v.BindEnv(f.Name, envVar) + if err != nil { + errors = append(errors, err) + return + } + } + + // Apply viper config values when cobra has no value and viper does + if !f.Changed && v.IsSet(f.Name) { + val := v.Get(f.Name) + if err := cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)); err != nil { + errors = append(errors, err) + return + } + } + }) + + if len(errors) > 0 { + errs := []string{} + for _, err := range errors { + errs = append(errs, err.Error()) + } + return fmt.Errorf(strings.Join(errs, ", ")) + } + return nil +} + +func initializeLogger(logger *logrus.Logger, level string) { + if level != "" { + switch strings.ToLower(level) { + case "panic": + logger.SetLevel(logrus.PanicLevel) + case "fatal": + logger.SetLevel(logrus.FatalLevel) + case "error": + logger.SetLevel(logrus.ErrorLevel) + case "warn", "warning": + logger.SetLevel(logrus.WarnLevel) + case "info": + logger.SetLevel(logrus.InfoLevel) + case "debug": + logger.SetLevel(logrus.DebugLevel) + case "trace": + logger.SetLevel(logrus.TraceLevel) + default: + logger.SetLevel(logrus.InfoLevel) + logger.Warningln("Invalid value for WORKER_LOG_LEVEL", level, " .Setting it to default(Info)") + } + } else { + logger.SetLevel(logrus.InfoLevel) + logger.Warningln("Variable WORKER_LOG_LEVEL is not set. Default is Info") + } +} + +func tryClientConnection(logger *logrus.Logger, retryInterval time.Duration, retries int) (*grpc.ClientConn, error) { + for { + if retries <= 0 { + return nil, fmt.Errorf("retries exceeded") + } + retries-- + + c, err := client.GetConnection() + if err != nil { + logger.Errorln(err) + logger.Errorf("retrying after %v seconds", retryInterval) + <-time.After(retryInterval * time.Second) + continue + } + + return c, nil + } +} diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go new file mode 100644 index 000000000..e55b70f37 --- /dev/null +++ b/cmd/tink-worker/internal/action.go @@ -0,0 +1,116 @@ +package internal // import "github.com/tinkerbell/tink/cmd/tink-worker/internal" + +import ( + "context" + "os" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" + "github.com/pkg/errors" + + pb "github.com/tinkerbell/tink/protos/workflow" +) + +func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) { + registry := w.registry + config := &container.Config{ + Image: registry + "/" + action.GetImage(), + AttachStdout: true, + AttachStderr: true, + Tty: true, + Env: action.GetEnvironment(), + } + + if cmd != nil { + config.Cmd = cmd + } + + wfDir := dataDir + string(os.PathSeparator) + wfID + hostConfig := &container.HostConfig{ + Privileged: true, + Binds: []string{wfDir + ":/workflow"}, + } + hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) + + log.Infoln("Starting the container with cmd", cmd) + resp, err := w.registryClient.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) + if err != nil { + return "", errors.Wrap(err, "DOCKER CREATE") + } + return resp.ID, nil +} + +func runContainer(ctx context.Context, cli *client.Client, id string) error { + log.Debugln("run Container with ID : ", id) + err := cli.ContainerStart(ctx, id, types.ContainerStartOptions{}) + if err != nil { + return errors.Wrap(err, "DOCKER START") + } + return nil +} + +func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.ActionState, error, error) { + // Inspect whether the container is in running state + inspect, err := cli.ContainerInspect(ctx, id) + if err != nil { + log.Debugln("Container does not exists") + return pb.ActionState_ACTION_FAILED, nil, nil + } + if inspect.ContainerJSONBase.State.Running { + log.Debugln("Container with id : ", id, " is in running state") + //return pb.ActionState_ACTION_FAILED, nil, nil + } + // send API call to wait for the container completion + log.Debugln("Starting Container wait for id : ", id) + wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + log.Infoln("Container with id ", id, "finished with status code : ", status.StatusCode) + if status.StatusCode == 0 { + return pb.ActionState_ACTION_SUCCESS, nil, nil + } + return pb.ActionState_ACTION_FAILED, nil, nil + case err := <-errC: + log.Errorln("Container wait failed for id : ", id, " Error : ", err) + return pb.ActionState_ACTION_FAILED, nil, err + case <-ctx.Done(): + log.Errorln("Container wait for id : ", id, " is timedout Error : ", err) + return pb.ActionState_ACTION_TIMEOUT, ctx.Err(), nil + } +} + +func waitFailedContainer(ctx context.Context, cli *client.Client, id string, failedActionStatus chan pb.ActionState) { + // send API call to wait for the container completion + log.Debugln("Starting Container wait for id : ", id) + wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) + + select { + case status := <-wait: + log.Infoln("Container with id ", id, "finished with status code : ", status.StatusCode) + if status.StatusCode == 0 { + failedActionStatus <- pb.ActionState_ACTION_SUCCESS + } + failedActionStatus <- pb.ActionState_ACTION_FAILED + case err := <-errC: + log.Errorln("Container wait failed for id : ", id, " Error : ", err) + failedActionStatus <- pb.ActionState_ACTION_FAILED + } +} + +func removeContainer(ctx context.Context, cli *client.Client, id string) error { + // create options for removing container + opts := types.ContainerRemoveOptions{ + Force: true, + RemoveLinks: false, + RemoveVolumes: true, + } + log.Debugln("Start removing container ", id) + // send API call to remove the container + err := cli.ContainerRemove(ctx, id, opts) + if err != nil { + return err + } + return nil +} diff --git a/cmd/tink-worker/internal/registry.go b/cmd/tink-worker/internal/registry.go new file mode 100644 index 000000000..1de8e1041 --- /dev/null +++ b/cmd/tink-worker/internal/registry.go @@ -0,0 +1,75 @@ +package internal + +import ( + "context" + "encoding/base64" + "encoding/json" + "io" + "os" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var log *logrus.Logger + +type RegistryConnDetails struct { + registry, + user, + pwd string + logger *logrus.Logger +} + +func SetLogger(logger *logrus.Logger) { + log = logger +} + +func NewRegistryConnDetails(registry, user, pwd string, logger *logrus.Logger) *RegistryConnDetails { + return &RegistryConnDetails{ + registry: registry, + user: user, + pwd: pwd, + logger: logger, + } +} + +func (r *RegistryConnDetails) NewClient() (*client.Client, error) { + if r.registry == "" { + return nil, errors.New("required DOCKER_REGISTRY") + } + c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + + if err != nil { + return nil, errors.Wrap(err, "DOCKER CLIENT") + } + + r.logger.SetFormatter(&logrus.JSONFormatter{}) + return c, nil +} + +// pullImage outputs to stdout the contents of the requested image (relative to the registry) +func (w *RegistryConnDetails) pullImage(ctx context.Context, cli *client.Client, image string) error { + registry := w.registry + authConfig := types.AuthConfig{ + Username: w.user, + Password: w.pwd, + ServerAddress: w.registry, + } + encodedJSON, err := json.Marshal(authConfig) + if err != nil { + return errors.Wrap(err, "DOCKER AUTH") + } + authStr := base64.URLEncoding.EncodeToString(encodedJSON) + + out, err := cli.ImagePull(ctx, registry+"/"+image, types.ImagePullOptions{RegistryAuth: authStr}) + if err != nil { + return errors.Wrap(err, "DOCKER PULL") + } + defer out.Close() + if _, err := io.Copy(os.Stdout, out); err != nil { + return err + } + return nil +} diff --git a/cmd/tink-worker/worker.go b/cmd/tink-worker/internal/worker.go similarity index 55% rename from cmd/tink-worker/worker.go rename to cmd/tink-worker/internal/worker.go index dfa8f5e46..3c2260366 100644 --- a/cmd/tink-worker/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -1,6 +1,7 @@ -package main +package internal // import "github.com/tinkerbell/tink/cmd/tink-worker/internal" import ( + "bufio" "context" sha "crypto/sha256" "encoding/base64" @@ -8,20 +9,22 @@ import ( "fmt" "io/ioutil" "os" - "strconv" "strings" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/pkg/errors" "github.com/sirupsen/logrus" - pb "github.com/tinkerbell/tink/protos/workflow" "google.golang.org/grpc/status" + + pb "github.com/tinkerbell/tink/protos/workflow" ) const ( - dataFile = "data" - dataDir = "/worker" - maxFileSize = "MAX_FILE_SIZE" // in bytes - defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes + dataFile = "data" + dataDir = "/worker" + maxFileSize = "MAX_FILE_SIZE" // in bytes ) var ( @@ -38,26 +41,157 @@ type WorkflowMetadata struct { SHA string `json:"sha256"` } -func processWorkflowActions(client pb.WorkflowSvcClient) error { - workerID := os.Getenv("WORKER_ID") - if workerID == "" { - return fmt.Errorf("required WORKER_ID") +type Worker struct { + client pb.WorkflowSvcClient + regConn *RegistryConnDetails + registryClient *client.Client + logger *logrus.Logger + registry string + retries int + retryInterval time.Duration + maxSize int64 +} + +func NewWorker(client pb.WorkflowSvcClient, regConn *RegistryConnDetails, logger *logrus.Logger, registry string, retries int, retryInterval time.Duration, maxSize int64) *Worker { + registryClient, err := regConn.NewClient() + if err != nil { + panic(err) } - log = logger.WithField("worker_id", workerID) - ctx := context.Background() - var err error - cli, err = initializeDockerClient() + return &Worker{ + client: client, + regConn: regConn, + registryClient: registryClient, + logger: logger, + registry: registry, + retries: retries, + retryInterval: retryInterval, + maxSize: maxSize, + } +} + +func (w *Worker) captureLogs(ctx context.Context, id string) { + reader, err := w.registryClient.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: false, + }) if err != nil { - return err + panic(err) + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + fmt.Println(scanner.Text()) } +} + +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (string, pb.ActionState, error) { + log := w.logger.WithFields(logrus.Fields{"workflow_id": wfID, "worker_id": action.GetWorkerId()}) + cli := w.registryClient + err := w.regConn.pullImage(ctx, cli, action.GetImage()) + if err != nil { + return fmt.Sprintf("Failed to pull Image : %s", action.GetImage()), 1, errors.Wrap(err, "DOCKER PULL") + } + id, err := w.createContainer(ctx, action.Command, wfID, action) + if err != nil { + return "Failed to create container", 1, errors.Wrap(err, "DOCKER CREATE") + } + var timeCtx context.Context + var cancel context.CancelFunc + if action.Timeout > 0 { + timeCtx, cancel = context.WithTimeout(ctx, time.Duration(action.Timeout)*time.Second) + } else { + timeCtx, cancel = context.WithTimeout(ctx, 1*time.Hour) + } + defer cancel() + //run container with timeout context + //startedAt := time.Now() + err = runContainer(timeCtx, cli, id) + if err != nil { + return "Failed to run container", 1, errors.Wrap(err, "DOCKER RUN") + } + + failedActionStatus := make(chan pb.ActionState) + + //capturing logs of action container in a go-routine + go w.captureLogs(ctx, id) + + status, err, werr := waitContainer(timeCtx, cli, id) + if werr != nil { + rerr := removeContainer(ctx, cli, id) + if rerr != nil { + log.WithField("container_id", id).Errorln("Failed to remove container as ", rerr) + } + return "Failed to wait for completion of action", status, errors.Wrap(err, "DOCKER_WAIT") + } + rerr := removeContainer(ctx, cli, id) + if rerr != nil { + return "Failed to remove container of action", status, errors.Wrap(rerr, "DOCKER_REMOVE") + } + log.Infoln("Container removed with Status ", pb.ActionState(status)) + if status != pb.ActionState_ACTION_SUCCESS { + if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { + id, err = w.createContainer(ctx, action.OnTimeout, wfID, action) + if err != nil { + log.Errorln("Failed to create container for on-timeout command: ", err) + } + log.Infoln("Container created with on-timeout command : ", action.OnTimeout) + failedActionStatus := make(chan pb.ActionState) + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, cli, id, failedActionStatus) + err = runContainer(ctx, cli, id) + if err != nil { + log.Errorln("Failed to run on-timeout command: ", err) + } + onTimeoutStatus := <-failedActionStatus + log.Infoln("On-Timeout Container status : ", onTimeoutStatus) + } else { + if action.OnFailure != nil { + id, err = w.createContainer(ctx, action.OnFailure, wfID, action) + if err != nil { + log.Errorln("Failed to create on-failure command: ", err) + } + log.Infoln("Container created with on-failure command : ", action.OnFailure) + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, cli, id, failedActionStatus) + err = runContainer(ctx, cli, id) + if err != nil { + log.Errorln("Failed to run on-failure command: ", err) + } + onFailureStatus := <-failedActionStatus + log.Infoln("on-failure Container status : ", onFailureStatus) + } + } + log.Infoln("Wait finished for failed or timeout container") + if err != nil { + rerr := removeContainer(ctx, cli, id) + if rerr != nil { + log.Errorln("Failed to remove container as ", rerr) + } + log.Infoln("Failed to wait for container : ", err) + } + rerr = removeContainer(ctx, cli, id) + if rerr != nil { + log.Errorln("Failed to remove container as ", rerr) + } + } + log.Infoln("Action container exits with status code ", status) + return "Successful Execution", status, nil +} + +func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error { + log := w.logger.WithField("worker_id", workerID) + for { - res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) + res, err := w.client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) if err != nil { fmt.Println("failed to get context") } for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() { wfID := wfContext.GetWorkflowId() - actions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) + actions, err := w.client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) if err != nil { return fmt.Errorf("can't find actions for workflow %s", wfID) } @@ -129,7 +263,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { Message: "Started execution", WorkerId: action.GetWorkerId(), } - err := reportActionStatus(ctx, client, actionStatus) + err := w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err) } @@ -138,11 +272,11 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } // get workflow data - getWorkflowData(ctx, client, wfID) + getWorkflowData(ctx, w.client, wfID) // start executing the action start := time.Now() - _, status, err := executeAction(ctx, actions.GetActionList()[actionIndex], wfID) + _, status, err := w.execute(ctx, wfID, action) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{ @@ -163,7 +297,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { actionStatus.ActionStatus = pb.ActionState_ACTION_FAILED actionStatus.Message = "Action Failed" } - rerr := reportActionStatus(ctx, client, actionStatus) + rerr := w.reportActionStatus(ctx, actionStatus) if rerr != nil { exitWithGrpcError(rerr) } @@ -174,14 +308,14 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { actionStatus.ActionStatus = pb.ActionState_ACTION_SUCCESS actionStatus.Message = "Finished Execution Successfully" - err = reportActionStatus(ctx, client, actionStatus) + err = w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err) } log.Infof("Sent action status %s\n", actionStatus) // send workflow data, if updated - updateWorkflowData(ctx, client, actionStatus) + w.updateWorkflowData(ctx, actionStatus) if len(actions.GetActionList()) == actionIndex+1 { log.Infoln("Reached to end of workflow") @@ -199,7 +333,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } } // sleep for 3 seconds before asking for new workflows - time.Sleep(retryInterval * time.Second) + time.Sleep(w.retryInterval * time.Second) } } @@ -215,14 +349,14 @@ func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1 } -func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) error { +func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error { var err error - for r := 1; r <= retries; r++ { - _, err = client.ReportActionStatus(ctx, actionStatus) + for r := 1; r <= w.retries; r++ { + _, err = w.client.ReportActionStatus(ctx, actionStatus) if err != nil { log.Errorln("Report action status to server failed as : ", err) - log.Errorf("Retrying after %v seconds", retryInterval) - <-time.After(retryInterval * time.Second) + log.Errorf("Retrying after %v seconds", w.retryInterval) + <-time.After(w.retryInterval * time.Second) continue } return nil @@ -251,7 +385,7 @@ func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workflowI } } -func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) { +func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.WorkflowActionStatus) { wfDir := dataDir + string(os.PathSeparator) + actionStatus.GetWorkflowId() f := openDataFile(wfDir) defer f.Close() @@ -261,16 +395,16 @@ func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, action log.Fatal(err) } - if isValidDataFile(f, data) { + if isValidDataFile(f, w.maxSize, data) { h := sha.New() if _, ok := workflowDataSHA[actionStatus.GetWorkflowId()]; !ok { checksum := base64.StdEncoding.EncodeToString(h.Sum(data)) workflowDataSHA[actionStatus.GetWorkflowId()] = checksum - sendUpdate(ctx, client, actionStatus, data, checksum) + sendUpdate(ctx, w.client, actionStatus, data, checksum) } else { newSHA := base64.StdEncoding.EncodeToString(h.Sum(data)) if !strings.EqualFold(workflowDataSHA[actionStatus.GetWorkflowId()], newSHA) { - sendUpdate(ctx, client, actionStatus, data, newSHA) + sendUpdate(ctx, w.client, actionStatus, data, newSHA) } } } @@ -308,7 +442,7 @@ func openDataFile(wfDir string) *os.File { return f } -func isValidDataFile(f *os.File, data []byte) bool { +func isValidDataFile(f *os.File, maxSize int64, data []byte) bool { var dataMap map[string]interface{} err := json.Unmarshal(data, &dataMap) if err != nil { @@ -322,13 +456,5 @@ func isValidDataFile(f *os.File, data []byte) bool { return false } - val := os.Getenv(maxFileSize) - if val != "" { - maxSize, err := strconv.ParseInt(val, 10, 64) - if err == nil { - log.Print(err) - } - return stat.Size() <= maxSize - } - return stat.Size() <= defaultMaxFileSize + return stat.Size() <= maxSize } diff --git a/cmd/tink-worker/main.go b/cmd/tink-worker/main.go index 8783926fc..33bf98eda 100644 --- a/cmd/tink-worker/main.go +++ b/cmd/tink-worker/main.go @@ -2,22 +2,14 @@ package main import ( "os" - "strconv" "time" "github.com/sirupsen/logrus" - "github.com/tinkerbell/tink/client" - pb "github.com/tinkerbell/tink/protos/workflow" - "google.golang.org/grpc" -) -const ( - retryIntervalDefault = 3 - retryCountDefault = 3 + "github.com/tinkerbell/tink/cmd/tink-worker/cmd" ) var ( - rClient pb.WorkflowSvcClient retryInterval time.Duration retries int ) @@ -25,70 +17,13 @@ var ( var ( // version is set at build time version = "devel" - - logger = logrus.New() ) func main() { - initializeLogger() - logger.Debug("Starting version " + version) - setupRetry() - if setupErr := client.Setup(); setupErr != nil { - logger.Fatalln(setupErr) - } - conn, err := tryClientConnection() - if err != nil { - logger.Fatalln(err) - } - rClient = pb.NewWorkflowSvcClient(conn) - err = processWorkflowActions(rClient) - if err != nil { - logger.Errorln("worker Finished with error", err) - } -} - -func tryClientConnection() (*grpc.ClientConn, error) { - var err error - for r := 1; r <= retries; r++ { - c, e := client.GetConnection() - if e != nil { - err = e - logger.Errorln(err) - logger.Errorf("retrying after %v seconds", retryInterval) - <-time.After(retryInterval * time.Second) - continue - } - return c, nil - } - return nil, err -} - -func setupRetry() { - interval := os.Getenv("RETRY_INTERVAL") - if interval == "" { - logger.Infof("RETRY_INTERVAL not set. Using default, %d seconds\n", retryIntervalDefault) - retryInterval = retryIntervalDefault - } else { - interval, err := time.ParseDuration(interval) - if err != nil { - logger.Warningf("Invalid RETRY_INTERVAL set. Using default, %d seconds.\n", retryIntervalDefault) - retryInterval = retryIntervalDefault - } else { - retryInterval = interval - } - } + logger := logrus.New() + rootCmd := cmd.NewRootCommand(version, logger) - maxRetry := os.Getenv("MAX_RETRY") - if maxRetry == "" { - logger.Infof("MAX_RETRY not set. Using default, %d retries.\n", retryCountDefault) - retries = retryCountDefault - } else { - max, err := strconv.Atoi(maxRetry) - if err != nil { - logger.Warningf("Invalid MAX_RETRY set. Using default, %d retries.\n", retryCountDefault) - retries = retryCountDefault - } else { - retries = max - } + if err := rootCmd.Execute(); err != nil { + os.Exit(1) } } diff --git a/go.mod b/go.mod index c8b7881ce..47668de0f 100644 --- a/go.mod +++ b/go.mod @@ -27,12 +27,15 @@ require ( github.com/packethost/pkg v0.0.0-20190410153520-e8e15f4ce770 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.1.0 + github.com/prometheus/common v0.6.0 github.com/rollbar/rollbar-go v1.0.2 // indirect github.com/sirupsen/logrus v1.4.1 github.com/spf13/cobra v1.0.0 + github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.3.0 go.mongodb.org/mongo-driver v1.1.2 // indirect + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect golang.org/x/text v0.3.2 // indirect