diff --git a/cmd/sonobuoy/app/logs.go b/cmd/sonobuoy/app/logs.go index a1e1a75fb..b2e2c164e 100644 --- a/cmd/sonobuoy/app/logs.go +++ b/cmd/sonobuoy/app/logs.go @@ -18,16 +18,21 @@ package app import ( "fmt" + "io" "os" "github.com/pkg/errors" "github.com/spf13/cobra" - ops "github.com/heptio/sonobuoy/pkg/client" + "github.com/heptio/sonobuoy/pkg/client" "github.com/heptio/sonobuoy/pkg/errlog" ) -var logConfig ops.LogConfig +const ( + bufSize = 2048 +) + +var logConfig client.LogConfig var logsKubecfg Kubeconfig func init() { @@ -42,7 +47,7 @@ func init() { &logConfig.Follow, "follow", "f", false, "Specify if the logs should be streamed.", ) - + logConfig.Out = os.Stdout AddKubeconfigFlag(&logsKubecfg, cmd.Flags()) AddNamespaceFlag(&logConfig.Namespace, cmd.Flags()) RootCmd.AddCommand(cmd) @@ -54,13 +59,26 @@ func getLogs(cmd *cobra.Command, args []string) { errlog.LogError(fmt.Errorf("failed to get rest config: %v", err)) os.Exit(1) } - sbc, err := ops.NewSonobuoyClient(restConfig) + sbc, err := client.NewSonobuoyClient(restConfig) if err != nil { errlog.LogError(errors.Wrap(err, "could not create sonobuoy client")) os.Exit(1) } - if err := sbc.GetLogs(&logConfig); err != nil { - errlog.LogError(errors.Wrap(err, "error attempting to get sonobuoy logs")) + logreader, err := sbc.LogReader(&logConfig) + if err != nil { + errlog.LogError(errors.Wrap(err, "could not build a log reader")) os.Exit(1) } + b := make([]byte, bufSize) + for { + n, err := logreader.Read(b) + if err != nil && err != io.EOF { + errlog.LogError(errors.Wrap(err, "error reading logs")) + os.Exit(1) + } + fmt.Fprint(logConfig.Out, string(b[:n])) + if err == io.EOF { + return + } + } } diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index a46cbe231..723e52725 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -31,6 +31,8 @@ type LogConfig struct { Follow bool // Namespace is the namespace the sonobuoy aggregator is running in. Namespace string + // Out is the writer to write to. + Out io.Writer } // GenConfig are the input options for generating a Sonobuoy manifest. @@ -111,8 +113,8 @@ type Interface interface { RetrieveResults(cfg *RetrieveConfig) io.Reader // GetStatus determines the status of the sonobuoy run in order to assist the user. GetStatus(namespace string) (*aggregation.Status, error) - // GetLogs streams logs from the sonobuoy pod by default to stdout. - GetLogs(cfg *LogConfig) error + // LogReader returns a reader that contains a merged stream of sonobuoy logs. + LogReader(cfg *LogConfig) (*Reader, error) // Delete removes a sonobuoy run, namespace, and all associated resources. Delete(cfg *DeleteConfig) error } diff --git a/pkg/client/logs.go b/pkg/client/logs.go index 08df05140..0a4415969 100644 --- a/pkg/client/logs.go +++ b/pkg/client/logs.go @@ -19,59 +19,212 @@ package client import ( "fmt" "io" - "os" - "strings" + "sync" - "github.com/heptio/sonobuoy/pkg/config" - "github.com/sirupsen/logrus" + "github.com/pkg/errors" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) -var ( - podLogSeparator = strings.Repeat("-", 79) +const ( + bufSize = 4096 ) -// Logs gathers the logs for the containers in the sonobuoy namespace and prints them +// Reader provides an io.Reader interface to a channel of bytes. +// The first error received on the error channel will be returned by Read and all subsequent calls to Read. +type Reader struct { + bytestream chan []byte + errc chan error + done chan struct{} -func (c *SonobuoyClient) GetLogs(cfg *LogConfig) error { - // TODO(EKF): Stream to a writer instead of just stdout - if cfg.Follow { - return streamLogs(c.Client, cfg.Namespace, config.MasterPodName, &v1.PodLogOptions{Follow: true}) + // Used for when one message is too large for the input buffer + // TODO(chuckha) consider alternative data structures here + overflowBuffer []byte + err error +} + +// NewReader returns a configured Reader. +func NewReader(bytestream chan []byte, errc chan error) *Reader { + reader := &Reader{ + bytestream: bytestream, + errc: errc, + overflowBuffer: []byte{}, + err: nil, + } + return reader +} + +// Read tries to fill up the passed in byte slice with messages from the channel. +// Read manages the message overflow ensuring no bytes are missed. +// If an error is set on the reader it will return the error immediately. +func (r *Reader) Read(p []byte) (int, error) { + // Always return the error if it is set. + if r.err != nil { + return 0, r.err + } + + // n is the number of bytes written to `p`. + n := 0 + + // Send any overflow before grabbing new messages. + if len(r.overflowBuffer) > 0 { + // If we need to chunk it, copy as much as we can and reduce the overflow buffer. + if len(r.overflowBuffer) > len(p) { + copy(p, r.overflowBuffer[:len(p)]) + r.overflowBuffer = r.overflowBuffer[len(p):] + return len(p), nil + } + // At this point the entire overflow will fit into the buffer. + copy(p, r.overflowBuffer) + n += len(r.overflowBuffer) + r.overflowBuffer = []byte{} + // This is suboptimal (does not use entire buffer) but gets output to the user faster. + return n, nil } - pods, err := c.Client.CoreV1().Pods(cfg.Namespace).List(metav1.ListOptions{}) + // One message a time. + select { + // it's ok to keep reading messages and miss an error. + case err := <-r.errc: + r.err = err + return n, r.err + case data, ok := <-r.bytestream: + if !ok { + return 0, io.EOF + } + + // The incoming data is bigger than size of the remaining size of the buffer. Save overflow data for next read. + remainingSize := len(p) - n + if len(data) > remainingSize { + copy(p[n:], data[:remainingSize]) + r.overflowBuffer = data[remainingSize:] + return len(p), nil + } + + // We have enough headroom in the buffer, copy all of it. + copy(p[n:], data) + n += len(data) + return n, nil + } +} + +// LogReader configures a Reader that provides an io.Reader interface to a merged stream of logs from various containers. +func (s *SonobuoyClient) LogReader(cfg *LogConfig) (*Reader, error) { + pods, err := s.Client.CoreV1().Pods(cfg.Namespace).List(metav1.ListOptions{}) if err != nil { - return fmt.Errorf("could not list pods: %v", err) + return nil, errors.Wrap(err, "failed to list pods") } + + errc := make(chan error) + agg := make(chan *message) + var wg sync.WaitGroup + + // TODO(chuckha) if we get an error back that the container is still creating maybe we could retry? for _, pod := range pods.Items { for _, container := range pod.Spec.Containers { - logrus.WithFields(logrus.Fields{"pod": pod.Name, "container": container.Name}).Info("Printing container logs") - err := streamLogs(c.Client, cfg.Namespace, pod.Name, &v1.PodLogOptions{ - Container: container.Name, - }) - if err != nil { - return fmt.Errorf("failed to stream logs: %v", err) + wg.Add(1) + ls := &logStreamer{ + ns: pod.Namespace, + pod: pod.Name, + container: container.Name, + errc: errc, + logc: agg, + logOpts: &v1.PodLogOptions{ + Container: container.Name, + Follow: cfg.Follow, + }, + client: s.Client, } - fmt.Println(podLogSeparator) + + go func(w *sync.WaitGroup, ls *logStreamer) { + defer w.Done() + ls.stream() + }(&wg, ls) } } - return nil + + // Cleanup when finished. + go func(wg *sync.WaitGroup, agg chan *message, errc chan error) { + wg.Wait() + close(agg) + errc <- io.EOF + close(errc) + }(&wg, agg, errc) + + return NewReader(applyHeaders(agg), errc), nil +} + +// message represents a buffer of logs from a container in a pod in a namespace. +type message struct { + // preamble acts as the id for a particular container as well as the data to print before the actual logs. + preamble string + // buffer is the blob of logs that we extracted from the container. + buffer []byte } -// TODO(chuckha) the output is a little confusing because our containers already produce structured logs. +func newMessage(preamble string, data []byte) *message { + d := make([]byte, len(data)) + copy(d, data) + return &message{ + preamble: preamble, + buffer: d, + } +} + +// logStreamer writes logs from a container to a fan-in channel. +type logStreamer struct { + ns, pod, container string + errc chan error + logc chan *message + logOpts *v1.PodLogOptions + client kubernetes.Interface +} -func streamLogs(client kubernetes.Interface, namespace, podName string, logOptions *v1.PodLogOptions) error { - req := client.CoreV1().Pods(namespace).GetLogs(podName, logOptions) +// stream will open a connection to the pod's logs and push messages onto a fan-in channel. +func (l *logStreamer) stream() { + req := l.client.CoreV1().Pods(l.ns).GetLogs(l.pod, l.logOpts) readCloser, err := req.Stream() if err != nil { - return fmt.Errorf("could not stream the request: %v", err) + l.errc <- errors.Wrapf(err, "error streaming logs from container [%v]", l.container) + return } defer readCloser.Close() - // In the case of -f this will never return unless there is an error. - if _, err = io.Copy(os.Stdout, readCloser); err != nil { - return fmt.Errorf("could not copy request body: %v", err) + + // newline because logs have new lines in them + preamble := fmt.Sprintf("namespace=%v pod=%v container=%v\n", l.ns, l.pod, l.container) + + buf := make([]byte, bufSize) + // Loop until EOF (streaming case won't get an EOF) + for { + n, err := readCloser.Read(buf) + if err != nil && err != io.EOF { + l.errc <- errors.Wrapf(err, "error reading logs from container [%v]", l.container) + return + } + if n > 0 { + l.logc <- newMessage(preamble, buf[:n]) + } + if err == io.EOF { + return + } } - return nil +} + +// applyHeaders takes a channel of messages and transforms it into a channel of bytes. +// applyHeaders will write headers to the byte stream as appropriate. +func applyHeaders(mesc chan *message) chan []byte { + out := make(chan []byte) + go func() { + header := "" + for message := range mesc { + // Add the header if the header is different (ie the message is coming from a different source) + if message.preamble != header { + out <- []byte(message.preamble) + header = message.preamble + } + out <- message.buffer + } + }() + return out } diff --git a/pkg/client/logs_test.go b/pkg/client/logs_test.go new file mode 100644 index 000000000..a4d03ee3c --- /dev/null +++ b/pkg/client/logs_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2018 Heptio Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package client + +import ( + "io" + "testing" + + "github.com/pkg/errors" +) + +func TestLateErrors(t *testing.T) { + quotes := []string{ + "WHAT ONE DOES WHEN FACED WITH THE TRUTH IS MORE DIFFICULT THAN YOU’D THINK.", + "YOU ARE STRONGER THAN YOU BELIEVE. YOU HAVE GREATER POWERS THAN YOU KNOW.", + "YOU LET THIS LITTLE THING TELL YOU WHAT TO DO?", + "I’M WILLING TO FIGHT FOR THOSE WHO CANNOT FIGHT FOR THEMSELVES.", + } + bytestream := make(chan []byte) + go func() { + for _, quote := range quotes { + bytestream <- []byte(quote) + } + }() + errc := make(chan error, 1) + + reader := NewReader(bytestream, errc) + + // read the entire first message. + mybuf := make([]byte, len(quotes[0])) + + n, err := reader.Read(mybuf) + if err != nil { + t.Fatalf("expected nil but got: %v", err) + } + if n != len(mybuf) { + t.Fatalf("unexpected number of bytes read: %v", n) + } + + errc <- errors.New("introduce an error") + + // We are guaranteed to eventually get the error because we never close bytestream. + errcount := 0 + for i := 0; i <= 3; i++ { + _, err := reader.Read(mybuf) + if err != nil && err != io.EOF { + errcount++ + } + } + if errcount == 0 { + t.Fatalf("Never saw an expected error.") + } +} + +func TestLogEarlyErrors(t *testing.T) { + input := "sonobuoy will help you on your way to greatness" + bytestream := make(chan []byte, 1) + bytestream <- []byte(input) + + errc := make(chan error, 1) + errc <- errors.New("A seriously bad error") + + reader := NewReader(bytestream, errc) + + mybuf := make([]byte, 1024) + errcount := 0 + // We are guaranteed to read the error after we've drained bytestream, but the order is unspecified. + for i := 0; i <= 5; i++ { + _, err := reader.Read(mybuf) + // This will never be EOF since we never close the channel. + if err != nil && err != io.EOF { + errcount++ + } + } + if errcount == 0 { + t.Fatal("did not receive any errors but there should be one.") + } +} + +func TestLogReaderNoError(t *testing.T) { + testcases := []struct { + name string + input []string + bufsize int + expectedReads []string + }{ + { + name: "tiny buffer, simple input", + input: []string{"Hello world 0"}, + bufsize: 1, + expectedReads: []string{"H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d", " ", "0", ""}, + }, + { + name: "small buffer, simple input", + input: []string{"Hello world 0"}, + bufsize: 2, + expectedReads: []string{"He", "ll", "o ", "wo", "rl", "d ", "0"}, + }, + { + name: "big buffer, simple input", + input: []string{"Hello world 0"}, + bufsize: 1000, + expectedReads: []string{"Hello world 0"}, + }, + { + name: "exact buffer, simple input", + input: []string{"Hello world 0"}, + bufsize: len("Hello world 0"), + expectedReads: []string{"Hello world 0"}, + }, + { + name: "big buffer, small messages", + input: []string{ + "Once you start down the dark path, forever will it dominate your destiny.", + "Luminous beings are we, not this crude matter.", + "Fear is the path to the dark side. Fear leads to anger. Anger leads to hate. Hate leads to suffering.", + }, + bufsize: 1024, + expectedReads: []string{ + "Once you start down the dark path, forever will it dominate your destiny.", + "Luminous beings are we, not this crude matter.", + "Fear is the path to the dark side. Fear leads to anger. Anger leads to hate. Hate leads to suffering.", + }, + }, + { + name: "small buffer, big input", + input: []string{ + "this is some log line", + "this is another log line", + "this is a third log line!!", + }, + bufsize: 10, + expectedReads: []string{ + "this is so", + "me log lin", + "e", + "this is an", + "other log ", + "line", + "this is a ", + "third log ", + "line!!", + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + bytestream := make(chan []byte) + go func(data chan []byte) { + defer close(bytestream) + for _, input := range tc.input { + data <- []byte(input) + } + }(bytestream) + errc := make(chan error) + reader := NewReader(bytestream, errc) + mybuf := make([]byte, tc.bufsize) + i := 0 + for ; ; i++ { + n, err := reader.Read(mybuf) + if err != nil && err != io.EOF { + t.Fatalf("Expected no errors got %v", err) + } + if err == io.EOF { + break + } + if n > len(mybuf) { + t.Fatalf("n is too big: %v mybuf is only %v", n, len(mybuf)) + } + if i >= len(tc.expectedReads) { + t.Fatalf("Too many actual reads, not enough expected reads. BUF: %v", mybuf[:n]) + } + if len(mybuf[:n]) != len(tc.expectedReads[i]) { + t.Errorf("Expected to read %v bytes, got %v buf: '%v' expected: '%v'", len(tc.expectedReads[i]), n, string(mybuf[:n]), tc.expectedReads[i]) + } + if string(mybuf[:n]) != tc.expectedReads[i] { + t.Errorf("Expected '%v' got '%v'", tc.expectedReads[i], string(mybuf[:n])) + } + } + i++ // add one to i for the final read. + if i < len(tc.expectedReads) { + t.Fatalf("Expected Read to be called %v times but was only called %v times", len(tc.expectedReads), i) + } + }) + } +}