Skip to content

Commit

Permalink
Make flux logs more lenient
Browse files Browse the repository at this point in the history
UX changes:

- Only print an error when a pod doesn't have a matching container
  instead of exiting early.
- Return a non-zero status code when no pod is found at all.

Details:

In certain situations there might be 3rd-party pods running in the
Flux namespace that cause the command to fail streaming logs, e.g.
when they have multiple containers but none of them is called
`manager` (which all Flux-maintained pods do). An example of such a
situation is when Flux is installed with the 3rd-party Flux extension
on AKS.

The `logs` command is now more forgiving and merely logs an error in
these situations instead of completely bailing out. It still returns a
non-zero exit code.

For the parallel log streaming with `-f` the code is now a little more
complex so that errors are now written to stderr in parallel with all
other logs written to stdout. That's what `asyncCopy` is for.

refs #3944

Signed-off-by: Max Jonas Werner <mail@makk.es>
  • Loading branch information
Max Jonas Werner committed Jun 5, 2023
1 parent c0fa6e6 commit 6b8afc9
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 73 deletions.
49 changes: 39 additions & 10 deletions cmd/flux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -76,7 +77,7 @@ type logsFlags struct {
sinceSeconds time.Duration
}

var logsArgs = &logsFlags{
var logsArgs = logsFlags{
tail: -1,
}

Expand Down Expand Up @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no argument required")
}

pods, err := getPods(ctx, clientset, fluxSelector)
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return podLogs(ctx, requests)
}

func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
// state). If no Pod is found, an error is returned.
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
var ret []corev1.Pod

opts := metav1.ListOptions{
LabelSelector: label,
}
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
opts := metav1.ListOptions{
LabelSelector: createLabelStringFromMap(label),
}
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -196,19 +200,24 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
}
}

if len(ret) == 0 {
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
}

return ret, nil
}

func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
Expand All @@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()

_, err := io.Copy(os.Stdout, reader)
return err
stdoutErrCh := asyncCopy(os.Stdout, reader)
stderrErrCh := asyncCopy(os.Stderr, errReader)

return errors.Join(<-stdoutErrCh, <-stderrErrCh)
}

// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)

return errCh
}

func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
var retErr error
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
retErr = fmt.Errorf("failed to collect logs from all Flux pods")
continue
}
}

return nil
return retErr
}

func createLabelStringFromMap(m map[string]string) string {
Expand Down
88 changes: 88 additions & 0 deletions cmd/flux/logs_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//go:build e2e
// +build e2e

/*
Copyright 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"testing"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsWrongNamespace(t *testing.T) {
cmd := cmdTestCase{
args: "logs --flux-namespace=default",
assert: assertError(`no Flux pods found in namespace "default"`),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}
70 changes: 7 additions & 63 deletions cmd/flux/logs_test.go → cmd/flux/logs_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,73 +30,17 @@ import (
. "github.com/onsi/gomega"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}

func TestLogRequest(t *testing.T) {
mapper := &testResponseMapper{}
tests := []struct {
name string
namespace string
flags *logsFlags
flags logsFlags
assertFile string
}{
{
name: "all logs",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
allNamespaces: true,
},
Expand All @@ -105,22 +49,22 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace",
namespace: "default",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
},
assertFile: "testdata/logs/namespace.txt",
},
{
name: "filter by kind and namespace",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
kind: "Kustomization",
},
assertFile: "testdata/logs/kind.txt",
},
{
name: "filter by loglevel",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
allNamespaces: true,
Expand All @@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace, name, loglevel and kind",
namespace: "flux-system",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
kind: "Kustomization",
Expand Down Expand Up @@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) {

// reset flags to default
*kubeconfigArgs.Namespace = rootArgs.defaults.Namespace
logsArgs = &logsFlags{
logsArgs = logsFlags{
tail: -1,
}
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/flux/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func resetCmdArgs() {
alertProviderArgs = alertProviderFlags{}
bootstrapArgs = NewBootstrapFlags()
bServerArgs = bServerFlags{}
logsArgs = logsFlags{
tail: -1,
fluxNamespace: rootArgs.defaults.Namespace,
}
buildKsArgs = buildKsFlags{}
checkArgs = checkFlags{}
createArgs = createFlags{}
Expand Down

0 comments on commit 6b8afc9

Please sign in to comment.