From ec6ef63441bd30feee52628ebc933b34eba1b294 Mon Sep 17 00:00:00 2001 From: Max Jonas Werner Date: Thu, 1 Jun 2023 15:53:05 +0200 Subject: [PATCH] Make `flux logs` more lenient In certain situations there might be 3rd-party pods running in the Flux namespace that cause the command to fail streaming logs, e.g. when they have multiple containers but none of them is called `manager` (which all Flux-maintained pods do). An example of such a situation is when Flux is installed with the 3rd-party Flux extension on AKS. The `logs` command is now more forgiving and merely logs an error in these situations instead of completely bailing out. For the parallel log streaming with `-f` the code is now a little more complex so that errors are now written to stderr in parallel with all other logs written to stdout. That's what `asyncCopy` is for. A behavioral difference now is that the command will not exit with an error, anymore, except for the case where it can't write to stdout/stderr in the parallel case. refs #3944 Signed-off-by: Max Jonas Werner --- cmd/flux/logs.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/cmd/flux/logs.go b/cmd/flux/logs.go index d8c27ff274..3f011331a6 100644 --- a/cmd/flux/logs.go +++ b/cmd/flux/logs.go @@ -20,6 +20,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -160,7 +161,8 @@ func logsCmdRun(cmd *cobra.Command, args []string) error { return parallelPodLogs(ctx, requests) } - return podLogs(ctx, requests) + podLogs(ctx, requests) + return nil } func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) { @@ -201,6 +203,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error { reader, writer := io.Pipe() + errReader, errWriter := io.Pipe() wg := &sync.WaitGroup{} wg.Add(len(requests)) @@ -208,7 +211,7 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func(req rest.ResponseWrapper) { defer wg.Done() if err := logRequest(ctx, req, writer); err != nil { - writer.CloseWithError(err) + fmt.Fprintf(errWriter, "failed getting logs: %s\n", err) return } }(request) @@ -217,20 +220,36 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error go func() { wg.Wait() writer.Close() + errWriter.Close() }() - _, err := io.Copy(os.Stdout, reader) - return err + return errors.Join( + <-asyncCopy(os.Stdout, reader), + <-asyncCopy(os.Stderr, errReader), + ) +} + +// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value. +// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent +// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns. +// This function lets you copy from multiple sources into multiple destinations in parallel. +func asyncCopy(dst io.Writer, src io.Reader) <-chan error { + errCh := make(chan error) + go func(errCh chan error) { + _, err := io.Copy(dst, src) + errCh <- err + }(errCh) + + return errCh } -func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error { +func podLogs(ctx context.Context, requests []rest.ResponseWrapper) { for _, req := range requests { if err := logRequest(ctx, req, os.Stdout); err != nil { - return err + fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err) + continue } } - - return nil } func createLabelStringFromMap(m map[string]string) string {