Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tee party: Stream cmd output to tests when -v is enabled, and stream SSH output to logs #3475

Merged
merged 6 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions pkg/minikube/bootstrapper/ssh_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package bootstrapper

import (
"bytes"
"fmt"
"io"
"path"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/crypto/ssh"
"k8s.io/minikube/pkg/minikube/assets"
"k8s.io/minikube/pkg/util"
)

// SSHRunner runs commands through SSH.
Expand All @@ -52,25 +54,80 @@ func (s *SSHRunner) Remove(f assets.CopyableFile) error {
return sess.Run(cmd)
}

type singleWriter struct {
b bytes.Buffer
mu sync.Mutex
}

func (w *singleWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.b.Write(p)
}

// teeSSH runs an SSH command, streaming stdout, stderr to logs
func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error {
outPipe, err := s.StdoutPipe()
if err != nil {
return errors.Wrap(err, "stdout")
}

errPipe, err := s.StderrPipe()
if err != nil {
return errors.Wrap(err, "stderr")
}
var wg sync.WaitGroup
wg.Add(2)

go func() {
if err := util.TeePrefix(util.ErrPrefix, errPipe, errB, glog.Infof); err != nil {
glog.Errorf("tee stderr: %v", err)
}
wg.Done()
}()
go func() {
if err := util.TeePrefix(util.OutPrefix, outPipe, outB, glog.Infof); err != nil {
glog.Errorf("tee stdout: %v", err)
}
wg.Done()
}()
err = s.Run(cmd)
wg.Wait()
return err
}

// Run starts a command on the remote and waits for it to return.
func (s *SSHRunner) Run(cmd string) error {
glog.Infoln("Run:", cmd)
glog.Infof("SSH: %s", cmd)
sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "getting ssh session")
return errors.Wrap(err, "NewSession")
}
defer sess.Close()
return sess.Run(cmd)

defer func() {
if err := sess.Close(); err != nil {
if err != io.EOF {
glog.Errorf("session close: %v", err)
}
}
}()
var outB bytes.Buffer
var errB bytes.Buffer
err = teeSSH(sess, cmd, &outB, &errB)
if err != nil {
return errors.Wrapf(err, "command failed: %s\nstdout: %s\nstderr: %s", cmd, outB.String(), errB.String())
}
return nil
}

// CombinedOutputTo runs the command and stores both command
// output and error to out.
func (s *SSHRunner) CombinedOutputTo(cmd string, out io.Writer) error {
b, err := s.CombinedOutput(cmd)
func (s *SSHRunner) CombinedOutputTo(cmd string, w io.Writer) error {
out, err := s.CombinedOutput(cmd)
if err != nil {
return errors.Wrapf(err, "running command: %s\n.", cmd)
return err
}
_, err = out.Write([]byte(b))
_, err = w.Write([]byte(out))
return err
}

Expand All @@ -80,15 +137,17 @@ func (s *SSHRunner) CombinedOutput(cmd string) (string, error) {
glog.Infoln("Run with output:", cmd)
sess, err := s.c.NewSession()
if err != nil {
return "", errors.Wrap(err, "getting ssh session")
return "", errors.Wrap(err, "NewSession")
}
defer sess.Close()

b, err := sess.CombinedOutput(cmd)
var combined singleWriter
err = teeSSH(sess, cmd, &combined, &combined)
out := combined.b.String()
if err != nil {
return "", errors.Wrapf(err, "running command: %s\n, output: %s", cmd, string(b))
return "", err
}
return string(b), nil
return out, nil
}

// Copy copies a file to the remote over SSH.
Expand All @@ -97,18 +156,18 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", f.GetTargetDir())
for _, cmd := range []string{deleteCmd, mkdirCmd} {
if err := s.Run(cmd); err != nil {
return errors.Wrapf(err, "Error running command: %s", cmd)
return errors.Wrapf(err, "pre-copy")
}
}

sess, err := s.c.NewSession()
if err != nil {
return errors.Wrap(err, "Error creating new session via ssh client")
return errors.Wrap(err, "NewSession")
}

w, err := sess.StdinPipe()
if err != nil {
return errors.Wrap(err, "Error accessing StdinPipe via ssh session")
return errors.Wrap(err, "StdinPipe")
}
// The scpcmd below *should not* return until all data is copied and the
// StdinPipe is closed. But let's use a WaitGroup to make it expicit.
Expand All @@ -123,12 +182,10 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error {
fmt.Fprint(w, "\x00")
}()

scpcmd := fmt.Sprintf("sudo scp -t %s", f.GetTargetDir())
out, err := sess.CombinedOutput(scpcmd)
_, err = sess.CombinedOutput(fmt.Sprintf("sudo scp -t %s", f.GetTargetDir()))
if err != nil {
return errors.Wrapf(err, "Error running scp command: %s output: %s", scpcmd, out)
return err
}
wg.Wait()

return nil
}
33 changes: 33 additions & 0 deletions pkg/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package util

import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
Expand All @@ -33,6 +35,9 @@ import (
"github.com/pkg/errors"
)

const ErrPrefix = "! "
const OutPrefix = "> "

const (
downloadURL = "https://storage.googleapis.com/minikube/releases/%s/minikube-%s-amd64%s"
)
Expand Down Expand Up @@ -199,3 +204,31 @@ func MaybeChownDirRecursiveToMinikubeUser(dir string) error {
}
return nil
}

// TeePrefix copies bytes from a reader to writer, logging each new line.
func TeePrefix(prefix string, r io.Reader, w io.Writer, logger func(format string, args ...interface{})) error {
scanner := bufio.NewScanner(r)
scanner.Split(bufio.ScanBytes)
var line bytes.Buffer

for scanner.Scan() {
b := scanner.Bytes()
if _, err := w.Write(b); err != nil {
return err
}

if bytes.IndexAny(b, "\r\n") == 0 {
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
line.Reset()
}
continue
}
line.Write(b)
}
// Catch trailing output in case stream does not end with a newline
if line.Len() > 0 {
logger("%s%s", prefix, line.String())
}
return nil
}
40 changes: 40 additions & 0 deletions pkg/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package util

import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/pkg/errors"
Expand Down Expand Up @@ -158,3 +162,39 @@ func TestGetBinaryDownloadURL(t *testing.T) {
}

}

func TestTeePrefix(t *testing.T) {
var in bytes.Buffer
var out bytes.Buffer
var logged strings.Builder

logSink := func(format string, args ...interface{}) {
logged.WriteString("(" + fmt.Sprintf(format, args...) + ")")
}

// Simulate the primary use case: tee in the background. This also helps avoid I/O races.
var wg sync.WaitGroup
wg.Add(1)
go func() {
TeePrefix(":", &in, &out, logSink)
wg.Done()
}()

in.Write([]byte("goo"))
in.Write([]byte("\n"))
in.Write([]byte("g\r\n\r\n"))
in.Write([]byte("le"))
wg.Wait()

gotBytes := out.Bytes()
wantBytes := []byte("goo\ng\r\n\r\nle")
if !bytes.Equal(gotBytes, wantBytes) {
t.Errorf("output=%q, want: %q", gotBytes, wantBytes)
}

gotLog := logged.String()
wantLog := "(:goo)(:g)(:le)"
if gotLog != wantLog {
t.Errorf("log=%q, want: %q", gotLog, wantLog)
}
}
22 changes: 11 additions & 11 deletions test/integration/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,32 @@ func TestDocker(t *testing.T) {
mk.RunWithContext(ctx, "delete")

startCmd := fmt.Sprintf("start %s %s %s", mk.StartArgs, mk.Args,
"--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true")
out, err := mk.RunWithContext(ctx, startCmd)
"--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true --alsologtostderr --v=5")
stdout, stderr, err := mk.RunWithContext(ctx, startCmd)
if err != nil {
t.Fatalf("start: %v\nstart out: %s", err, out)
t.Fatalf("start: %v\nstdout: %s\nstderr: %s", err, stdout, stderr)
}

mk.EnsureRunning()

out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager")
stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager")
if err != nil {
t.Errorf("docker env: %v\ndocker env out: %s", err, out)
t.Errorf("docker env: %v\nstderr: %s", err, stderr)
}

for _, envVar := range []string{"FOO=BAR", "BAZ=BAT"} {
if !strings.Contains(string(out), envVar) {
t.Errorf("Env var %s missing: %s.", envVar, out)
if !strings.Contains(stdout, envVar) {
t.Errorf("Env var %s missing: %s.", envVar, stdout)
}
}

out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager")
stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager")
if err != nil {
t.Errorf("ssh show docker: %v\nshow docker out: %s", err, out)
t.Errorf("ssh show docker: %v\nstderr: %s", err, stderr)
}
for _, opt := range []string{"--debug", "--icc=true"} {
if !strings.Contains(string(out), opt) {
t.Fatalf("Option %s missing from ExecStart: %s.", opt, out)
if !strings.Contains(stdout, opt) {
t.Fatalf("Option %s missing from ExecStart: %s.", opt, stdout)
}
}
}
Loading