Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8sAPI Executor #1010

Merged
merged 6 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/executor"
"github.com/argoproj/argo/workflow/executor/docker"
"github.com/argoproj/argo/workflow/executor/k8sapi"
"github.com/argoproj/argo/workflow/executor/kubelet"
)

Expand Down Expand Up @@ -77,6 +78,11 @@ func initExecutor() *executor.WorkflowExecutor {

var cre executor.ContainerRuntimeExecutor
switch os.Getenv(common.EnvVarContainerRuntimeExecutor) {
case common.ContainerRuntimeExecutorK8sAPI:
cre, err = k8sapi.NewK8sAPIExecutor(clientset, config, podName, namespace)
if err != nil {
panic(err.Error())
}
case common.ContainerRuntimeExecutorKubelet:
cre, err = kubelet.NewKubeletExecutor()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ data:
bucket: my-bucket
region: us-west-2
# keyFormat is a format pattern to define how artifacts will be organized in a bucket.
# It can reference workflow metadata variables such as workflow.namespace, workflow.name,
# It can reference workflow metadata variables such as workflow.namespace, workflow.name,
# pod.name. Can also use strftime formating of workflow.creationTimestamp so that workflow
# artifacts can be organized by date. If omitted, will use `{{workflow.name}}/{{pod.name}}`,
# which has potential for have collisions.
Expand Down
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ const (
// ContainerRuntimeExecutorKubelet to use the kubelet as container runtime executor
ContainerRuntimeExecutorKubelet = "kubelet"

// ContainerRuntimeExecutorK8sAPI to use the Kubernetes API server as container runtime executor
ContainerRuntimeExecutorK8sAPI = "k8sapi"

// Variables that are added to the scope during template execution and can be referenced using {{}} syntax

// GlobalVarWorkflowName is a global workflow variable referencing the workflow's metadata.name field
Expand Down
6 changes: 3 additions & 3 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func ExecPodContainer(restConfig *rest.Config, namespace string, pod string, con
}

// GetExecutorOutput returns the output of an remotecommand.Executor
func GetExecutorOutput(exec remotecommand.Executor) (string, string, error) {
func GetExecutorOutput(exec remotecommand.Executor) (*bytes.Buffer, *bytes.Buffer, error) {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
err := exec.Stream(remotecommand.StreamOptions{
Expand All @@ -105,9 +105,9 @@ func GetExecutorOutput(exec remotecommand.Executor) (string, string, error) {
Tty: false,
})
if err != nil {
return "", "", errors.InternalWrapError(err)
return nil, nil, errors.InternalWrapError(err)
}
return stdOut.String(), stdErr.String(), nil
return &stdOut, &stdErr, nil
}

// ProcessArgs sets in the inputs, the values either passed via arguments, or the hardwired values
Expand Down
7 changes: 7 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ func (woc *wfOperationCtx) newWaitContainer(tmpl *wfv1.Template) (*apiv1.Contain

func (woc *wfOperationCtx) createEnvVars() []apiv1.EnvVar {
switch woc.controller.Config.ContainerRuntimeExecutor {
case common.ContainerRuntimeExecutorK8sAPI:
return append(execEnvVars,
apiv1.EnvVar{
Name: common.EnvVarContainerRuntimeExecutor,
Value: woc.controller.Config.ContainerRuntimeExecutor,
},
)
case common.ContainerRuntimeExecutorKubelet:
return append(execEnvVars,
apiv1.EnvVar{
Expand Down
140 changes: 140 additions & 0 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package common

import (
"bytes"
"compress/gzip"
"fmt"
"os"
"strings"
"syscall"
"time"

log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
)

const (
containerShimPrefix = "://"
)

// killGracePeriod is the time in seconds after sending SIGTERM before
// forcefully killing the sidecar with SIGKILL (value matches k8s)
const killGracePeriod = 10

// GetContainerID returns container ID of a ContainerStatus resource
func GetContainerID(container *v1.ContainerStatus) string {
i := strings.Index(container.ContainerID, containerShimPrefix)
if i == -1 {
return ""
}
return container.ContainerID[i+len(containerShimPrefix):]
}

// KubernetesClientInterface is the interface to implement getContainerStatus method
type KubernetesClientInterface interface {
getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
createArchive(containerID, sourcePath string) (*bytes.Buffer, error)
}

// WaitForTermination of the given containerID, set the timeout to 0 to discard it
func WaitForTermination(c KubernetesClientInterface, containerID string, timeout time.Duration) error {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
timer := time.NewTimer(timeout)
if timeout == 0 {
timer.Stop()
} else {
defer timer.Stop()
}

log.Infof("Starting to wait completion of containerID %s ...", containerID)
for {
select {
case <-ticker.C:
_, containerStatus, err := c.getContainerStatus(containerID)
if err != nil {
return err
}
if containerStatus.State.Terminated == nil {
continue
}
log.Infof("ContainerID %q is terminated: %v", containerID, containerStatus.String())
return nil
case <-timer.C:
return fmt.Errorf("timeout after %s", timeout.String())
}
}
}

// TerminatePodWithContainerID invoke the given SIG against the PID1 of the container.
// No-op if the container is on the hostPID
func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string, sig syscall.Signal) error {
pod, container, err := c.getContainerStatus(containerID)
if err != nil {
return err
}
if container.State.Terminated != nil {
log.Infof("Container %s is already terminated: %v", container.ContainerID, container.State.Terminated.String())
return nil
}
if pod.Spec.HostPID {
return fmt.Errorf("cannot terminate a hostPID Pod %s", pod.Name)
}
if pod.Spec.RestartPolicy != "Never" {
return fmt.Errorf("cannot terminate pod with a %q restart policy", pod.Spec.RestartPolicy)
}
return c.killContainer(pod, container, sig)
}

// KillGracefully kills a container gracefully.
func KillGracefully(c KubernetesClientInterface, containerID string) error {
log.Infof("SIGTERM containerID %q: %s", containerID, syscall.SIGTERM.String())
err := TerminatePodWithContainerID(c, containerID, syscall.SIGTERM)
if err != nil {
return err
}
err = WaitForTermination(c, containerID, time.Second*killGracePeriod)
if err == nil {
log.Infof("ContainerID %q successfully killed", containerID)
return nil
}
log.Infof("SIGKILL containerID %q: %s", containerID, syscall.SIGKILL.String())
err = TerminatePodWithContainerID(c, containerID, syscall.SIGKILL)
if err != nil {
return err
}
err = WaitForTermination(c, containerID, time.Second*killGracePeriod)
if err != nil {
return err
}
log.Infof("ContainerID %q successfully killed", containerID)
return nil
}

// CopyArchive downloads files and directories as a tarball and saves it to a specified path.
func CopyArchive(c KubernetesClientInterface, containerID, sourcePath, destPath string) error {
log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath)
b, err := c.createArchive(containerID, sourcePath)
if err != nil {
return err
}
f, err := os.OpenFile(destPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
if err != nil {
return err
}
w := gzip.NewWriter(f)
_, err = w.Write(b.Bytes())
if err != nil {
return err
}
err = w.Flush()
if err != nil {
return err
}
err = w.Close()
if err != nil {
return err
}
return nil
}
149 changes: 149 additions & 0 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package k8sapi

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"syscall"
"time"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/common"
execcommon "github.com/argoproj/argo/workflow/executor/common"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)

type k8sAPIClient struct {
execcommon.KubernetesClientInterface

clientset *kubernetes.Clientset
config *restclient.Config
podName string
namespace string
}

func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config, podName, namespace string) (*k8sAPIClient, error) {
return &k8sAPIClient{
clientset: clientset,
config: config,
podName: podName,
namespace: namespace,
}, nil
}

func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
if err != nil {
return "", err
}
command := []string{"cat", sourcePath}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, containerStatus.Name, true, false, command...)
if err != nil {
return "", err
}
stdOut, _, err := common.GetExecutorOutput(exec)
if err != nil {
return "", err
}
return stdOut.String(), nil
}

func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
if err != nil {
return nil, err
}
command := []string{"tar", "cf", "-", sourcePath}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, containerStatus.Name, true, false, command...)
if err != nil {
return nil, err
}
stdOut, _, err := common.GetExecutorOutput(exec)
if err != nil {
return nil, err
}
return stdOut, nil
}

func (c *k8sAPIClient) getLogsAsStream(containerID string) (io.ReadCloser, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
if err != nil {
return nil, err
}
return c.clientset.CoreV1().Pods(c.namespace).
GetLogs(c.podName, &v1.PodLogOptions{Container: containerStatus.Name, SinceTime: &metav1.Time{}}).Stream()
}

func (c *k8sAPIClient) getLogs(containerID string) (string, error) {
reader, err := c.getLogsAsStream(containerID)
if err != nil {
return "", err
}
bytes, err := ioutil.ReadAll(reader)
if err != nil {
return "", errors.InternalWrapError(err)
}
return string(bytes), nil
}

func (c *k8sAPIClient) saveLogs(containerID, path string) error {
reader, err := c.getLogsAsStream(containerID)
if err != nil {
return err
}
outFile, err := os.Create(path)
if err != nil {
return errors.InternalWrapError(err)
}
defer outFile.Close()
_, err = io.Copy(outFile, reader)
if err != nil {
return errors.InternalWrapError(err)
}
return nil
}

func (c *k8sAPIClient) getPod() (*v1.Pod, error) {
return c.clientset.CoreV1().Pods(c.namespace).Get(c.podName, metav1.GetOptions{})
}

func (c *k8sAPIClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
pod, err := c.getPod()
if err != nil {
return nil, nil, err
}
for _, containerStatus := range pod.Status.ContainerStatuses {
if execcommon.GetContainerID(&containerStatus) != containerID {
continue
}
return pod, &containerStatus, nil
}
return nil, nil, errors.New(errors.CodeNotFound, fmt.Sprintf("containerID %q is not found in the pod %s", containerID, c.podName))
}

func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Duration) error {
return execcommon.WaitForTermination(c, containerID, timeout)
}

func (c *k8sAPIClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, false, command...)
if err != nil {
return err
}
_, _, err = common.GetExecutorOutput(exec)
return err
}

func (c *k8sAPIClient) killGracefully(containerID string) error {
return execcommon.KillGracefully(c, containerID)
}

func (c *k8sAPIClient) copyArchive(containerID, sourcePath, destPath string) error {
return execcommon.CopyArchive(c, containerID, sourcePath, destPath)
}
Loading