Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export the methods of KubernetesClientInterface #1294

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func GetContainerID(container *v1.ContainerStatus) string {

// KubernetesClientInterface is the interface to implement getContainerStatus method
type KubernetesClientInterface interface {
getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
createArchive(containerID, sourcePath string) (*bytes.Buffer, error)
GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error)
}

// WaitForTermination of the given containerID, set the timeout to 0 to discard it
Expand All @@ -52,7 +52,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout
for {
select {
case <-ticker.C:
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
Expand All @@ -70,7 +70,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout
// TerminatePodWithContainerID invoke the given SIG against the PID1 of the container.
// No-op if the container is on the hostPID
func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string, sig syscall.Signal) error {
pod, container, err := c.getContainerStatus(containerID)
pod, container, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
Expand All @@ -84,7 +84,7 @@ func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string
if pod.Spec.RestartPolicy != "Never" {
return fmt.Errorf("cannot terminate pod with a %q restart policy", pod.Spec.RestartPolicy)
}
return c.killContainer(pod, container, sig)
return c.KillContainer(pod, container, sig)
}

// KillGracefully kills a container gracefully.
Expand Down Expand Up @@ -115,7 +115,7 @@ func KillGracefully(c KubernetesClientInterface, containerID string) error {
// CopyArchive downloads files and directories as a tarball and saves it to a specified path.
func CopyArchive(c KubernetesClientInterface, containerID, sourcePath, destPath string) error {
log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath)
b, err := c.createArchive(containerID, sourcePath)
b, err := c.CreateArchive(containerID, sourcePath)
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
)

type k8sAPIClient struct {
execcommon.KubernetesClientInterface

clientset *kubernetes.Clientset
config *restclient.Config
podName string
namespace string
}

var _ execcommon.KubernetesClientInterface = &k8sAPIClient{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is actually important. The methods were not called at all when I implemented it, so the executor was working. I think some logic around the executors changed after that and caused failures. I should've put this line.


func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config, podName, namespace string) (*k8sAPIClient, error) {
return &k8sAPIClient{
clientset: clientset,
Expand All @@ -39,7 +39,7 @@ func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config,
}

func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return "", err
}
Expand All @@ -55,8 +55,8 @@ func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string,
return stdOut.String(), nil
}

func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
func (c *k8sAPIClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +73,7 @@ func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buf
}

func (c *k8sAPIClient) getLogsAsStream(containerID string) (io.ReadCloser, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *k8sAPIClient) getPod() (*v1.Pod, error) {
return c.clientset.CoreV1().Pods(c.namespace).Get(c.podName, metav1.GetOptions{})
}

func (c *k8sAPIClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
func (c *k8sAPIClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
pod, err := c.getPod()
if err != nil {
return nil, nil, err
Expand All @@ -132,7 +132,7 @@ func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Durat
return execcommon.WaitForTermination(c, containerID, timeout)
}

func (c *k8sAPIClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
func (c *k8sAPIClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, false, command...)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions workflow/executor/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const (
)

type kubeletClient struct {
execcommon.KubernetesClientInterface

httpClient *http.Client
httpHeader http.Header
websocketDialer *websocket.Dialer
Expand All @@ -40,6 +38,8 @@ type kubeletClient struct {
kubeletEndpoint string
}

var _ execcommon.KubernetesClientInterface = &kubeletClient{}

func newKubeletClient() (*kubeletClient, error) {
kubeletHost := os.Getenv(common.EnvVarDownwardAPINodeIP)
if kubeletHost == "" {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (k *kubeletClient) doRequestLogs(namespace, podName, containerName string)
return resp, nil
}

func (k *kubeletClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
func (k *kubeletClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
podList, err := k.getPodList()
if err != nil {
return nil, nil, errors.InternalWrapError(err)
Expand Down Expand Up @@ -242,7 +242,7 @@ func (k *kubeletClient) readFileContents(u *url.URL) (*bytes.Buffer, error) {
}

// createArchive exec in the given containerID and create a tarball of the given sourcePath. Works with directory
func (k *kubeletClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
func (k *kubeletClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
return k.getCommandOutput(containerID, fmt.Sprintf("command=tar&command=-cf&command=-&command=%s&output=1", sourcePath))
}

Expand Down Expand Up @@ -284,7 +284,7 @@ func (k *kubeletClient) WaitForTermination(containerID string, timeout time.Dura
return execcommon.WaitForTermination(k, containerID, timeout)
}

func (k *kubeletClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
func (k *kubeletClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
u, err := url.ParseRequestURI(fmt.Sprintf("wss://%s/exec/%s/%s/%s?command=/bin/sh&&command=-c&command=kill+-%d+1&output=1&error=1", k.kubeletEndpoint, pod.Namespace, pod.Name, container.Name, sig))
if err != nil {
return errors.InternalWrapError(err)
Expand Down