Skip to content

Commit

Permalink
Make flux logs more lenient
Browse files Browse the repository at this point in the history
In certain situations there might be 3rd-party pods running in the
Flux namespace that cause the command to fail streaming logs, e.g.
when they have multiple containers but none of them is called
`manager` (which all Flux-maintained pods do). An example of such a
situation is when Flux is installed with the 3rd-party Flux extension
on AKS.

The `logs` command is now more forgiving and merely logs an error in
these situations instead of completely bailing out.

For the parallel log streaming with `-f` the code is now a little more
complex so that errors are now written to stderr in parallel with all
other logs written to stdout. That's what `asyncCopy` is for.

A behavioral difference now is that the command will not exit with an
error, anymore, except for the case where it can't write to
stdout/stderr in the parallel case.

refs #3944

Signed-off-by: Max Jonas Werner <mail@makk.es>
  • Loading branch information
Max Jonas Werner committed Jun 1, 2023
1 parent c0fa6e6 commit ec6ef63
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions cmd/flux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -160,7 +161,8 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return parallelPodLogs(ctx, requests)
}

return podLogs(ctx, requests)
podLogs(ctx, requests)
return nil
}

func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
Expand Down Expand Up @@ -201,14 +203,15 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core

func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
Expand All @@ -217,20 +220,36 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()

_, err := io.Copy(os.Stdout, reader)
return err
return errors.Join(
<-asyncCopy(os.Stdout, reader),
<-asyncCopy(os.Stderr, errReader),
)
}

// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)

return errCh
}

func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
func podLogs(ctx context.Context, requests []rest.ResponseWrapper) {
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
continue
}
}

return nil
}

func createLabelStringFromMap(m map[string]string) string {
Expand Down

0 comments on commit ec6ef63

Please sign in to comment.