From 64863d27ee1b8c0203dccc3c84d5c897cf89e1f2 Mon Sep 17 00:00:00 2001 From: Tom Manville Date: Tue, 3 Sep 2019 20:40:01 -0700 Subject: [PATCH] Refactor kube exec to support io streams --- pkg/kube/exec.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/pkg/kube/exec.go b/pkg/kube/exec.go index 587a65fdb0..367a99fb2f 100644 --- a/pkg/kube/exec.go +++ b/pkg/kube/exec.go @@ -15,11 +15,11 @@ package kube import ( - "bytes" "io" + "io/ioutil" "net/url" - "strings" + log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -59,6 +59,26 @@ func Exec(cli kubernetes.Interface, namespace, pod, container string, command [] // returning stdout, stderr and error. `options` allowed for // additional parameters to be passed. func ExecWithOptions(kubeCli kubernetes.Interface, options ExecOptions) (string, string, error) { + config, err := LoadConfig() + if err != nil { + return "", "", err + } + o, e, errCh := execStream(kubeCli, config, options) + defer func() { _ = o.Close() }() + defer func() { _ = e.Close() }() + stdout, oerr := ioutil.ReadAll(o) + if oerr != nil { + log.Info("Failed to read stdout:", oerr.Error()) + } + stderr, eerr := ioutil.ReadAll(e) + if eerr != nil { + log.Info("Failed to read stderr:", eerr.Error()) + } + + return string(stdout), string(stderr), <-errCh +} + +func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options ExecOptions) (io.ReadCloser, io.ReadCloser, chan error) { const tty = false req := kubeCli.CoreV1().RESTClient().Post(). Resource("pods"). @@ -78,14 +98,16 @@ func ExecWithOptions(kubeCli kubernetes.Interface, options ExecOptions) (string, TTY: tty, }, scheme.ParameterCodec) - config, err := LoadConfig() - if err != nil { - return "", "", err - } - - var stdout, stderr bytes.Buffer - err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) - return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err + pro, pwo := io.Pipe() + pre, pwe := io.Pipe() + errCh := make(chan error, 1) + go func() { + err := execute("POST", req.URL(), config, options.Stdin, pwo, pwe, tty) + errCh <- err + _ = pwo.Close() + _ = pwe.Close() + }() + return pro, pre, errCh } func execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {