Skip to content

Commit

Permalink
Rework logging code
Browse files Browse the repository at this point in the history
This change hides the concurrency from the client to
provide a familiar interface. Clients can now read
a merged stream of logs as if it were a synchronous io.Reader.

Signed-off-by: Chuck Ha <chuck@heptio.com>
  • Loading branch information
Chuck Ha committed Mar 7, 2018
1 parent 6c572bb commit 95c5efd
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 37 deletions.
30 changes: 24 additions & 6 deletions cmd/sonobuoy/app/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ package app

import (
"fmt"
"io"
"os"

"github.com/pkg/errors"
"github.com/spf13/cobra"

ops "github.com/heptio/sonobuoy/pkg/client"
"github.com/heptio/sonobuoy/pkg/client"
"github.com/heptio/sonobuoy/pkg/errlog"
)

var logConfig ops.LogConfig
const (
bufSize = 2048
)

var logConfig client.LogConfig
var logsKubecfg Kubeconfig

func init() {
Expand All @@ -42,7 +47,7 @@ func init() {
&logConfig.Follow, "follow", "f", false,
"Specify if the logs should be streamed.",
)

logConfig.Out = os.Stdout
AddKubeconfigFlag(&logsKubecfg, cmd.Flags())
AddNamespaceFlag(&logConfig.Namespace, cmd.Flags())
RootCmd.AddCommand(cmd)
Expand All @@ -54,13 +59,26 @@ func getLogs(cmd *cobra.Command, args []string) {
errlog.LogError(fmt.Errorf("failed to get rest config: %v", err))
os.Exit(1)
}
sbc, err := ops.NewSonobuoyClient(restConfig)
sbc, err := client.NewSonobuoyClient(restConfig)
if err != nil {
errlog.LogError(errors.Wrap(err, "could not create sonobuoy client"))
os.Exit(1)
}
if err := sbc.GetLogs(&logConfig); err != nil {
errlog.LogError(errors.Wrap(err, "error attempting to get sonobuoy logs"))
logreader, err := sbc.LogReader(&logConfig)
if err != nil {
errlog.LogError(errors.Wrap(err, "could not build a log reader"))
os.Exit(1)
}
b := make([]byte, bufSize)
for {
n, err := logreader.Read(b)
if err != nil && err != io.EOF {
errlog.LogError(errors.Wrap(err, "error reading logs"))
os.Exit(1)
}
fmt.Fprint(logConfig.Out, string(b[:n]))
if err == io.EOF {
return
}
}
}
6 changes: 4 additions & 2 deletions pkg/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type LogConfig struct {
Follow bool
// Namespace is the namespace the sonobuoy aggregator is running in.
Namespace string
// Out is the writer to write to.
Out io.Writer
}

// GenConfig are the input options for generating a Sonobuoy manifest.
Expand Down Expand Up @@ -111,8 +113,8 @@ type Interface interface {
RetrieveResults(cfg *RetrieveConfig) io.Reader
// GetStatus determines the status of the sonobuoy run in order to assist the user.
GetStatus(namespace string) (*aggregation.Status, error)
// GetLogs streams logs from the sonobuoy pod by default to stdout.
GetLogs(cfg *LogConfig) error
// LogReader returns a reader that contains a merged stream of sonobuoy logs.
LogReader(cfg *LogConfig) (*Reader, error)
// Delete removes a sonobuoy run, namespace, and all associated resources.
Delete(cfg *DeleteConfig) error
}
211 changes: 182 additions & 29 deletions pkg/client/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,212 @@ package client
import (
"fmt"
"io"
"os"
"strings"
"sync"

"github.com/heptio/sonobuoy/pkg/config"
"github.com/sirupsen/logrus"
"github.com/pkg/errors"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

var (
podLogSeparator = strings.Repeat("-", 79)
const (
bufSize = 4096
)

// Logs gathers the logs for the containers in the sonobuoy namespace and prints them
// Reader provides an io.Reader interface to a channel of bytes.
// The first error received on the error channel will be returned by Read and all subsequent calls to Read.
type Reader struct {
bytestream chan []byte
errc chan error
done chan struct{}

func (c *SonobuoyClient) GetLogs(cfg *LogConfig) error {
// TODO(EKF): Stream to a writer instead of just stdout
if cfg.Follow {
return streamLogs(c.Client, cfg.Namespace, config.MasterPodName, &v1.PodLogOptions{Follow: true})
// Used for when one message is too large for the input buffer
// TODO(chuckha) consider alternative data structures here
overflowBuffer []byte
err error
}

// NewReader returns a configured Reader.
func NewReader(bytestream chan []byte, errc chan error) *Reader {
reader := &Reader{
bytestream: bytestream,
errc: errc,
overflowBuffer: []byte{},
err: nil,
}
return reader
}

// Read tries to fill up the passed in byte slice with messages from the channel.
// Read manages the message overflow ensuring no bytes are missed.
// If an error is set on the reader it will return the error immediately.
func (r *Reader) Read(p []byte) (int, error) {
// Always return the error if it is set.
if r.err != nil {
return 0, r.err
}

// n is the number of bytes written to `p`.
n := 0

// Send any overflow before grabbing new messages.
if len(r.overflowBuffer) > 0 {
// If we need to chunk it, copy as much as we can and reduce the overflow buffer.
if len(r.overflowBuffer) > len(p) {
copy(p, r.overflowBuffer[:len(p)])
r.overflowBuffer = r.overflowBuffer[len(p):]
return len(p), nil
}
// At this point the entire overflow will fit into the buffer.
copy(p, r.overflowBuffer)
n += len(r.overflowBuffer)
r.overflowBuffer = []byte{}
// This is suboptimal (does not use entire buffer) but gets output to the user faster.
return n, nil
}

pods, err := c.Client.CoreV1().Pods(cfg.Namespace).List(metav1.ListOptions{})
// One message a time.
select {
// it's ok to keep reading messages and miss an error.
case err := <-r.errc:
r.err = err
return n, r.err
case data, ok := <-r.bytestream:
if !ok {
return 0, io.EOF
}

// The incoming data is bigger than size of the remaining size of the buffer. Save overflow data for next read.
remainingSize := len(p) - n
if len(data) > remainingSize {
copy(p[n:], data[:remainingSize])
r.overflowBuffer = data[remainingSize:]
return len(p), nil
}

// We have enough headroom in the buffer, copy all of it.
copy(p[n:], data)
n += len(data)
return n, nil
}
}

// LogReader configures a Reader that provides an io.Reader interface to a merged stream of logs from various containers.
func (s *SonobuoyClient) LogReader(cfg *LogConfig) (*Reader, error) {
pods, err := s.Client.CoreV1().Pods(cfg.Namespace).List(metav1.ListOptions{})
if err != nil {
return fmt.Errorf("could not list pods: %v", err)
return nil, errors.Wrap(err, "failed to list pods")
}

errc := make(chan error)
agg := make(chan *message)
var wg sync.WaitGroup

// TODO(chuckha) if we get an error back that the container is still creating maybe we could retry?
for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
logrus.WithFields(logrus.Fields{"pod": pod.Name, "container": container.Name}).Info("Printing container logs")
err := streamLogs(c.Client, cfg.Namespace, pod.Name, &v1.PodLogOptions{
Container: container.Name,
})
if err != nil {
return fmt.Errorf("failed to stream logs: %v", err)
wg.Add(1)
ls := &logStreamer{
ns: pod.Namespace,
pod: pod.Name,
container: container.Name,
errc: errc,
logc: agg,
logOpts: &v1.PodLogOptions{
Container: container.Name,
Follow: cfg.Follow,
},
client: s.Client,
}
fmt.Println(podLogSeparator)

go func(w *sync.WaitGroup, ls *logStreamer) {
defer w.Done()
ls.stream()
}(&wg, ls)
}
}
return nil

// Cleanup when finished.
go func(wg *sync.WaitGroup, agg chan *message, errc chan error) {
wg.Wait()
close(agg)
errc <- io.EOF
close(errc)
}(&wg, agg, errc)

return NewReader(applyHeaders(agg), errc), nil
}

// message represents a buffer of logs from a container in a pod in a namespace.
type message struct {
// preamble acts as the id for a particular container as well as the data to print before the actual logs.
preamble string
// buffer is the blob of logs that we extracted from the container.
buffer []byte
}

// TODO(chuckha) the output is a little confusing because our containers already produce structured logs.
func newMessage(preamble string, data []byte) *message {
d := make([]byte, len(data))
copy(d, data)
return &message{
preamble: preamble,
buffer: d,
}
}

// logStreamer writes logs from a container to a fan-in channel.
type logStreamer struct {
ns, pod, container string
errc chan error
logc chan *message
logOpts *v1.PodLogOptions
client kubernetes.Interface
}

func streamLogs(client kubernetes.Interface, namespace, podName string, logOptions *v1.PodLogOptions) error {
req := client.CoreV1().Pods(namespace).GetLogs(podName, logOptions)
// stream will open a connection to the pod's logs and push messages onto a fan-in channel.
func (l *logStreamer) stream() {
req := l.client.CoreV1().Pods(l.ns).GetLogs(l.pod, l.logOpts)
readCloser, err := req.Stream()
if err != nil {
return fmt.Errorf("could not stream the request: %v", err)
l.errc <- errors.Wrapf(err, "error streaming logs from container [%v]", l.container)
return
}
defer readCloser.Close()
// In the case of -f this will never return unless there is an error.
if _, err = io.Copy(os.Stdout, readCloser); err != nil {
return fmt.Errorf("could not copy request body: %v", err)

// newline because logs have new lines in them
preamble := fmt.Sprintf("namespace=%v pod=%v container=%v\n", l.ns, l.pod, l.container)

buf := make([]byte, bufSize)
// Loop until EOF (streaming case won't get an EOF)
for {
n, err := readCloser.Read(buf)
if err != nil && err != io.EOF {
l.errc <- errors.Wrapf(err, "error reading logs from container [%v]", l.container)
return
}
if n > 0 {
l.logc <- newMessage(preamble, buf[:n])
}
if err == io.EOF {
return
}
}
return nil
}

// applyHeaders takes a channel of messages and transforms it into a channel of bytes.
// applyHeaders will write headers to the byte stream as appropriate.
func applyHeaders(mesc chan *message) chan []byte {
out := make(chan []byte)
go func() {
header := ""
for message := range mesc {
// Add the header if the header is different (ie the message is coming from a different source)
if message.preamble != header {
out <- []byte(message.preamble)
header = message.preamble
}
out <- message.buffer
}
}()
return out
}
Loading

0 comments on commit 95c5efd

Please sign in to comment.