Skip to content

Commit

Permalink
Make flux logs more lenient
Browse files Browse the repository at this point in the history
In certain situations there might be 3rd-party pods running in the
Flux namespace that cause the command to fail streaming logs, e.g.
when they have multiple containers but none of them is called
`manager` (which all Flux-maintained pods do). An example of such a
situation is when Flux is installed with the 3rd-party Flux extension
on AKS.

The `logs` command is now more forgiving and merely logs an error in
these situations instead of completely bailing out.

For the parallel log streaming with `-f` the code is now a little more
complex so that errors are now written to stderr in parallel with all
other logs written to stdout. That's what `asyncCopy` is for.

refs #3944

Signed-off-by: Max Jonas Werner <mail@makk.es>
  • Loading branch information
Max Jonas Werner committed Jun 2, 2023
1 parent c0fa6e6 commit c8ec5a7
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions cmd/flux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no argument required")
}

pods, err := getPods(ctx, clientset, fluxSelector)
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return podLogs(ctx, requests)
}

func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
// state). If no pod is found, an error is returned.
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
var ret []corev1.Pod

opts := metav1.ListOptions{
LabelSelector: label,
}
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
opts := metav1.ListOptions{
LabelSelector: createLabelStringFromMap(label),
}
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -196,19 +200,24 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
}
}

if len(ret) == 0 {
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
}

return ret, nil
}

func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
Expand All @@ -217,20 +226,38 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()

_, err := io.Copy(os.Stdout, reader)
return err
stdoutErrCh := asyncCopy(os.Stdout, reader)
stderrErrCh := asyncCopy(os.Stderr, errReader)

return errors.Join(<-stdoutErrCh, <-stderrErrCh)
}

// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)

return errCh
}

func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
var errs []error
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
errs = append(errs, err)
continue
}
}

return nil
return errors.Join(errs...)
}

func createLabelStringFromMap(m map[string]string) string {
Expand Down

0 comments on commit c8ec5a7

Please sign in to comment.