Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make flux logs more lenient #3945

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions cmd/flux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -76,7 +77,7 @@ type logsFlags struct {
sinceSeconds time.Duration
}

var logsArgs = &logsFlags{
var logsArgs = logsFlags{
tail: -1,
}

Expand Down Expand Up @@ -115,7 +116,7 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("no argument required")
}

pods, err := getPods(ctx, clientset, fluxSelector)
pods, err := getPods(ctx, clientset, logsArgs.fluxNamespace, fluxSelector)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,13 +164,16 @@ func logsCmdRun(cmd *cobra.Command, args []string) error {
return podLogs(ctx, requests)
}

func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]corev1.Pod, error) {
// getPods searches for all Deployments in the given namespace that match the given label and returns a list of Pods
// from these Deployments. For each Deployment a single Pod is chosen (based on various factors such as the running
// state). If no Pod is found, an error is returned.
func getPods(ctx context.Context, c *kubernetes.Clientset, ns string, label string) ([]corev1.Pod, error) {
var ret []corev1.Pod

opts := metav1.ListOptions{
LabelSelector: label,
}
deployList, err := c.AppsV1().Deployments(logsArgs.fluxNamespace).List(ctx, opts)
deployList, err := c.AppsV1().Deployments(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -179,7 +183,7 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
opts := metav1.ListOptions{
LabelSelector: createLabelStringFromMap(label),
}
podList, err := c.CoreV1().Pods(logsArgs.fluxNamespace).List(ctx, opts)
podList, err := c.CoreV1().Pods(ns).List(ctx, opts)
if err != nil {
return ret, err
}
Expand All @@ -196,19 +200,24 @@ func getPods(ctx context.Context, c *kubernetes.Clientset, label string) ([]core
}
}

if len(ret) == 0 {
return nil, fmt.Errorf("no Flux pods found in namespace %q", ns)
}

return ret, nil
}

func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
errReader, errWriter := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
go func(req rest.ResponseWrapper) {
defer wg.Done()
if err := logRequest(ctx, req, writer); err != nil {
writer.CloseWithError(err)
fmt.Fprintf(errWriter, "failed getting logs: %s\n", err)
return
}
}(request)
Expand All @@ -217,20 +226,40 @@ func parallelPodLogs(ctx context.Context, requests []rest.ResponseWrapper) error
go func() {
wg.Wait()
writer.Close()
errWriter.Close()
}()

_, err := io.Copy(os.Stdout, reader)
return err
stdoutErrCh := asyncCopy(os.Stdout, reader)
stderrErrCh := asyncCopy(os.Stderr, errReader)

return errors.Join(<-stdoutErrCh, <-stderrErrCh)
}

// asyncCopy copies all data from from dst to src asynchronously and returns a channel for reading an error value.
// This is basically an asynchronous wrapper around `io.Copy`. The returned channel is unbuffered and always is sent
// a value (either nil or the error from `io.Copy`) as soon as `io.Copy` returns.
// This function lets you copy from multiple sources into multiple destinations in parallel.
func asyncCopy(dst io.Writer, src io.Reader) <-chan error {
errCh := make(chan error)
go func(errCh chan error) {
_, err := io.Copy(dst, src)
errCh <- err
}(errCh)

return errCh
}

func podLogs(ctx context.Context, requests []rest.ResponseWrapper) error {
var retErr error
for _, req := range requests {
if err := logRequest(ctx, req, os.Stdout); err != nil {
return err
fmt.Fprintf(os.Stderr, "failed getting logs: %s\n", err)
retErr = fmt.Errorf("failed to collect logs from all Flux pods")
continue
}
}

return nil
return retErr
}

func createLabelStringFromMap(m map[string]string) string {
Expand Down
88 changes: 88 additions & 0 deletions cmd/flux/logs_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//go:build e2e
// +build e2e

/*
Copyright 2021 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"testing"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsWrongNamespace(t *testing.T) {
cmd := cmdTestCase{
args: "logs --flux-namespace=default",
assert: assertError(`no Flux pods found in namespace "default"`),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}
70 changes: 7 additions & 63 deletions cmd/flux/logs_test.go → cmd/flux/logs_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,73 +30,17 @@ import (
. "github.com/onsi/gomega"
)

func TestLogsNoArgs(t *testing.T) {
cmd := cmdTestCase{
args: "logs",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsAllNamespaces(t *testing.T) {
cmd := cmdTestCase{
args: "logs --all-namespaces",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSince(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=XXX",
assert: assertError(`invalid argument "XXX" for "--since" flag: time: invalid duration "XXX"`),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTime(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=2021-08-06T14:26:25.546Z",
assert: assertSuccess(),
}
cmd.runTestCmd(t)
}

func TestLogsSinceTimeInvalid(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since-time=XXX",
assert: assertError("XXX is not a valid (RFC3339) time"),
}
cmd.runTestCmd(t)
}

func TestLogsSinceOnlyOneAllowed(t *testing.T) {
cmd := cmdTestCase{
args: "logs --since=2m --since-time=2021-08-06T14:26:25.546Z",
assert: assertError("at most one of `sinceTime` or `sinceSeconds` may be specified"),
}
cmd.runTestCmd(t)
}

func TestLogRequest(t *testing.T) {
mapper := &testResponseMapper{}
tests := []struct {
name string
namespace string
flags *logsFlags
flags logsFlags
assertFile string
}{
{
name: "all logs",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
allNamespaces: true,
},
Expand All @@ -105,22 +49,22 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace",
namespace: "default",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
},
assertFile: "testdata/logs/namespace.txt",
},
{
name: "filter by kind and namespace",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
kind: "Kustomization",
},
assertFile: "testdata/logs/kind.txt",
},
{
name: "filter by loglevel",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
allNamespaces: true,
Expand All @@ -130,7 +74,7 @@ func TestLogRequest(t *testing.T) {
{
name: "filter by namespace, name, loglevel and kind",
namespace: "flux-system",
flags: &logsFlags{
flags: logsFlags{
tail: -1,
logLevel: "error",
kind: "Kustomization",
Expand Down Expand Up @@ -163,7 +107,7 @@ func TestLogRequest(t *testing.T) {

// reset flags to default
*kubeconfigArgs.Namespace = rootArgs.defaults.Namespace
logsArgs = &logsFlags{
logsArgs = logsFlags{
tail: -1,
}
})
Expand Down
4 changes: 4 additions & 0 deletions cmd/flux/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func resetCmdArgs() {
alertProviderArgs = alertProviderFlags{}
bootstrapArgs = NewBootstrapFlags()
bServerArgs = bServerFlags{}
logsArgs = logsFlags{
tail: -1,
fluxNamespace: rootArgs.defaults.Namespace,
}
buildKsArgs = buildKsFlags{}
checkArgs = checkFlags{}
createArgs = createFlags{}
Expand Down