Skip to content

Commit

Permalink
Export the methods of KubernetesClientInterface (#1294)
Browse files Browse the repository at this point in the history
All calls to these methods previously generated a panic at runtime
because the calls resolved to the default, panic-always implementation,
not to the overrides provided by `k8sAPIClient` and `kubeletClient`.

Embedding an exported interface with unexported methods into a struct is
the only way to implement that interface in another package.  When doing
this, the compiler generates default, panic-always implementations for
all methods from the interface.  Implementors can override exported
methods, but it's not possible to override an unexported method from the
interface.  All invocations that go through the interface will come to
the default implementation, even if the struct tries to provide an
override.
  • Loading branch information
chris-chambers authored and jessesuen committed Apr 11, 2019
1 parent 1c729a7 commit 950de1b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
14 changes: 7 additions & 7 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func GetContainerID(container *v1.ContainerStatus) string {

// KubernetesClientInterface is the interface to implement getContainerStatus method
type KubernetesClientInterface interface {
getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
createArchive(containerID, sourcePath string) (*bytes.Buffer, error)
GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error)
KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error
CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error)
}

// WaitForTermination of the given containerID, set the timeout to 0 to discard it
Expand All @@ -52,7 +52,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout
for {
select {
case <-ticker.C:
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
Expand All @@ -70,7 +70,7 @@ func WaitForTermination(c KubernetesClientInterface, containerID string, timeout
// TerminatePodWithContainerID invoke the given SIG against the PID1 of the container.
// No-op if the container is on the hostPID
func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string, sig syscall.Signal) error {
pod, container, err := c.getContainerStatus(containerID)
pod, container, err := c.GetContainerStatus(containerID)
if err != nil {
return err
}
Expand All @@ -84,7 +84,7 @@ func TerminatePodWithContainerID(c KubernetesClientInterface, containerID string
if pod.Spec.RestartPolicy != "Never" {
return fmt.Errorf("cannot terminate pod with a %q restart policy", pod.Spec.RestartPolicy)
}
return c.killContainer(pod, container, sig)
return c.KillContainer(pod, container, sig)
}

// KillGracefully kills a container gracefully.
Expand Down Expand Up @@ -115,7 +115,7 @@ func KillGracefully(c KubernetesClientInterface, containerID string) error {
// CopyArchive downloads files and directories as a tarball and saves it to a specified path.
func CopyArchive(c KubernetesClientInterface, containerID, sourcePath, destPath string) error {
log.Infof("Archiving %s:%s to %s", containerID, sourcePath, destPath)
b, err := c.createArchive(containerID, sourcePath)
b, err := c.CreateArchive(containerID, sourcePath)
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions workflow/executor/k8sapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
)

type k8sAPIClient struct {
execcommon.KubernetesClientInterface

clientset *kubernetes.Clientset
config *restclient.Config
podName string
namespace string
}

var _ execcommon.KubernetesClientInterface = &k8sAPIClient{}

func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config, podName, namespace string) (*k8sAPIClient, error) {
return &k8sAPIClient{
clientset: clientset,
Expand All @@ -39,7 +39,7 @@ func newK8sAPIClient(clientset *kubernetes.Clientset, config *restclient.Config,
}

func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return "", err
}
Expand All @@ -55,8 +55,8 @@ func (c *k8sAPIClient) getFileContents(containerID, sourcePath string) (string,
return stdOut.String(), nil
}

func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
func (c *k8sAPIClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +73,7 @@ func (c *k8sAPIClient) createArchive(containerID, sourcePath string) (*bytes.Buf
}

func (c *k8sAPIClient) getLogsAsStream(containerID string) (io.ReadCloser, error) {
_, containerStatus, err := c.getContainerStatus(containerID)
_, containerStatus, err := c.GetContainerStatus(containerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (c *k8sAPIClient) getPod() (*v1.Pod, error) {
return c.clientset.CoreV1().Pods(c.namespace).Get(c.podName, metav1.GetOptions{})
}

func (c *k8sAPIClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
func (c *k8sAPIClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
pod, err := c.getPod()
if err != nil {
return nil, nil, err
Expand All @@ -132,7 +132,7 @@ func (c *k8sAPIClient) waitForTermination(containerID string, timeout time.Durat
return execcommon.WaitForTermination(c, containerID, timeout)
}

func (c *k8sAPIClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
func (c *k8sAPIClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
command := []string{"/bin/sh", "-c", fmt.Sprintf("kill -%d 1", sig)}
exec, err := common.ExecPodContainer(c.config, c.namespace, c.podName, container.Name, false, false, command...)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions workflow/executor/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const (
)

type kubeletClient struct {
execcommon.KubernetesClientInterface

httpClient *http.Client
httpHeader http.Header
websocketDialer *websocket.Dialer
Expand All @@ -40,6 +38,8 @@ type kubeletClient struct {
kubeletEndpoint string
}

var _ execcommon.KubernetesClientInterface = &kubeletClient{}

func newKubeletClient() (*kubeletClient, error) {
kubeletHost := os.Getenv(common.EnvVarDownwardAPINodeIP)
if kubeletHost == "" {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (k *kubeletClient) doRequestLogs(namespace, podName, containerName string)
return resp, nil
}

func (k *kubeletClient) getContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
func (k *kubeletClient) GetContainerStatus(containerID string) (*v1.Pod, *v1.ContainerStatus, error) {
podList, err := k.getPodList()
if err != nil {
return nil, nil, errors.InternalWrapError(err)
Expand Down Expand Up @@ -242,7 +242,7 @@ func (k *kubeletClient) readFileContents(u *url.URL) (*bytes.Buffer, error) {
}

// createArchive exec in the given containerID and create a tarball of the given sourcePath. Works with directory
func (k *kubeletClient) createArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
func (k *kubeletClient) CreateArchive(containerID, sourcePath string) (*bytes.Buffer, error) {
return k.getCommandOutput(containerID, fmt.Sprintf("command=tar&command=-cf&command=-&command=%s&output=1", sourcePath))
}

Expand Down Expand Up @@ -284,7 +284,7 @@ func (k *kubeletClient) WaitForTermination(containerID string, timeout time.Dura
return execcommon.WaitForTermination(k, containerID, timeout)
}

func (k *kubeletClient) killContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
func (k *kubeletClient) KillContainer(pod *v1.Pod, container *v1.ContainerStatus, sig syscall.Signal) error {
u, err := url.ParseRequestURI(fmt.Sprintf("wss://%s/exec/%s/%s/%s?command=/bin/sh&&command=-c&command=kill+-%d+1&output=1&error=1", k.kubeletEndpoint, pod.Namespace, pod.Name, container.Name, sig))
if err != nil {
return errors.InternalWrapError(err)
Expand Down

0 comments on commit 950de1b

Please sign in to comment.