diff --git a/workflow/executor/common/common.go b/workflow/executor/common/common.go index 0ce8d251532f..dd218c447305 100644 --- a/workflow/executor/common/common.go +++ b/workflow/executor/common/common.go @@ -32,9 +32,9 @@ func GetContainerID(container *v1.ContainerStatus) string { // KubernetesClientInterface is the interface to implement getContainerStatus method type KubernetesClientInterface interface { - getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) - killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error - createArchive(containerID, sourcePath string) (*bytes.Buffer, error) + GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) + KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error + CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) } // WaitForTermination of the given containerID, set the timeout to 0 to discard it @@ -52,7 +52,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout for { select { case <-ticker.C: - _, containerStatus, err := c.getContainerStatus(containerID) + _, containerStatus, err := c.GetContainerStatus(containerID) if err != nil { return err } @@ -70,7 +70,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout // TerminatePodWithContainerID invoke the given SIG against the PID1 of the container. // No-op if the container is on the hostPID func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string, sig syscall.Signal) error { - pod, container, err := c.getContainerStatus(containerID) + pod, container, err := c.GetContainerStatus(containerID) if err != nil { return err } @@ -84,7 +84,7 @@ func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string if pod.Spec.RestartPolicy != "Never" { return fmt.Errorf("cannot terminate pod with a %q restart policy", pod.Spec.RestartPolicy) } - return c.killContainer(pod, container, sig) + return c.KillContainer(pod, container, sig) } // KillGracefully kills a container gracefully. @@ -115,7 +115,7 @@ func KillGracefully(c KubernetesClientInterface, containerID string) error { // CopyArchive downloads files and directories as a tarball and saves it to a specified path. func CopyArchive(c KubernetesClientInterface, containerID, sourcePath, destPath string) error { log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath) - b, err := c.createArchive(containerID, sourcePath) + b, err := c.CreateArchive(containerID, sourcePath) if err != nil { return err } diff --git a/workflow/executor/k8sapi/client.go b/workflow/executor/k8sapi/client.go index 025e69a86686..1da5433be3e6 100644 --- a/workflow/executor/k8sapi/client.go +++ b/workflow/executor/k8sapi/client.go @@ -21,14 +21,14 @@ import ( ) type k8sAPIClient struct { - execcommon.KubernetesClientInterface - clientset *kubernetes.Clientset config *restclient.Config podName string namespace string } +var _ execcommon.KubernetesClientInterface = &k8sAPIClient{} + func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config, podName, namespace string) (*k8sAPIClient, error) { return &k8sAPIClient{ clientset: clientset, @@ -39,7 +39,7 @@ func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config, } func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string, error) { - _, containerStatus, err := c.getContainerStatus(containerID) + _, containerStatus, err := c.GetContainerStatus(containerID) if err != nil { return "", err } @@ -55,8 +55,8 @@ func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string, return stdOut.String(), nil } -func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) { - _, containerStatus, err := c.getContainerStatus(containerID) +func (c *k8sAPIClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) { + _, containerStatus, err := c.GetContainerStatus(containerID) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buf } func (c *k8sAPIClient) getLogsAsStream(containerID string) (io.ReadCloser, error) { - _, containerStatus, err := c.getContainerStatus(containerID) + _, containerStatus, err := c.GetContainerStatus(containerID) if err != nil { return nil, err } @@ -114,7 +114,7 @@ func (c *k8sAPIClient) getPod() (*v1.Pod, error) { return c.clientset.CoreV1().Pods(c.namespace).Get(c.podName, metav1.GetOptions{}) } -func (c *k8sAPIClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) { +func (c *k8sAPIClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) { pod, err := c.getPod() if err != nil { return nil, nil, err @@ -132,7 +132,7 @@ func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Durat return execcommon.WaitForTermination(c, containerID, timeout) } -func (c *k8sAPIClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error { +func (c *k8sAPIClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error { command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)} exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, false, command...) if err != nil { diff --git a/workflow/executor/kubelet/client.go b/workflow/executor/kubelet/client.go index 4af47c6836b6..6fc7e5df69ab 100644 --- a/workflow/executor/kubelet/client.go +++ b/workflow/executor/kubelet/client.go @@ -28,8 +28,6 @@ const ( ) type kubeletClient struct { - execcommon.KubernetesClientInterface - httpClient *http.Client httpHeader http.Header websocketDialer *websocket.Dialer @@ -40,6 +38,8 @@ type kubeletClient struct { kubeletEndpoint string } +var _ execcommon.KubernetesClientInterface = &kubeletClient{} + func newKubeletClient() (*kubeletClient, error) { kubeletHost := os.Getenv(common.EnvVarDownwardAPINodeIP) if kubeletHost == "" { @@ -165,7 +165,7 @@ func (k *kubeletClient) doRequestLogs(namespace, podName, containerName string) return resp, nil } -func (k *kubeletClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) { +func (k *kubeletClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) { podList, err := k.getPodList() if err != nil { return nil, nil, errors.InternalWrapError(err) @@ -242,7 +242,7 @@ func (k *kubeletClient) readFileContents(u *url.URL) (*bytes.Buffer, error) { } // createArchive exec in the given containerID and create a tarball of the given sourcePath. Works with directory -func (k *kubeletClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) { +func (k *kubeletClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) { return k.getCommandOutput(containerID, fmt.Sprintf("command=tar&command=-cf&command=-&command=%s&output=1", sourcePath)) } @@ -284,7 +284,7 @@ func (k *kubeletClient) WaitForTermination(containerID string, timeout time.Dura return execcommon.WaitForTermination(k, containerID, timeout) } -func (k *kubeletClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error { +func (k *kubeletClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error { u, err := url.ParseRequestURI(fmt.Sprintf("wss://%s/exec/%s/%s/%s?command=/bin/sh&&command=-c&command=kill+-%d+1&output=1&error=1", k.kubeletEndpoint, pod.Namespace, pod.Name, container.Name, sig)) if err != nil { return errors.InternalWrapError(err)