From 4c35539561374b472dcb32abe863b406272a8c68 Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Wed, 15 Jul 2020 12:19:34 -0700 Subject: [PATCH] Make logstream watch the pods as they change. (#1496) * Make logstream watch the pods as they change. - watch the pods and track what we already watch - start watchers for the new pods * log * review * other --- test/logstream/kubelogs.go | 125 +++++++++++++++++++++++++------------ 1 file changed, 84 insertions(+), 41 deletions(-) diff --git a/test/logstream/kubelogs.go b/test/logstream/kubelogs.go index cd137acdca..c120b7e1c7 100644 --- a/test/logstream/kubelogs.go +++ b/test/logstream/kubelogs.go @@ -27,6 +27,8 @@ import ( "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" "knative.dev/pkg/ptr" "knative.dev/pkg/test" @@ -35,6 +37,7 @@ import ( type kubelogs struct { namespace string + kc *test.KubeClient once sync.Once m sync.RWMutex @@ -49,54 +52,80 @@ var _ streamer = (*kubelogs)(nil) // timeFormat defines a simple timestamp with millisecond granularity const timeFormat = "15:04:05.000" -func (k *kubelogs) init(t test.TLegacy) { - k.keys = make(map[string]logger) +func (k *kubelogs) startForPod(eg *errgroup.Group, pod *corev1.Pod) { + // Grab data from all containers in the pods. We need this in case + // an envoy sidecar is injected for mesh installs. This should be + // equivalent to --all-containers. + for _, container := range pod.Spec.Containers { + // Required for capture below. + psn, pn, cn := pod.Namespace, pod.Name, container.Name + eg.Go(func() error { + options := &corev1.PodLogOptions{ + Container: cn, + // Follow directs the api server to continuously stream logs back. + Follow: true, + // Only return new logs (this value is being used for "epsilon"). + SinceSeconds: ptr.Int64(1), + } + + req := k.kc.Kube.CoreV1().Pods(psn).GetLogs(pn, options) + stream, err := req.Stream() + if err != nil { + return err + } + defer stream.Close() + // Read this container's stream. + scanner := bufio.NewScanner(stream) + for scanner.Scan() { + k.handleLine(scanner.Bytes()) + } + // Pods get killed with chaos duck, so logs might end + // before the test does. So don't report an error here. + return nil + }) + } +} - kc, err := test.NewKubeClient(test.Flags.Kubeconfig, test.Flags.Cluster) - if err != nil { - t.Error("Error loading client config", "error", err) +func podIsReady(p *corev1.Pod) bool { + if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil { + for _, cond := range p.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + return true + } + } } + return false +} - // List the pods in the given namespace. - pl, err := kc.Kube.CoreV1().Pods(k.namespace).List(metav1.ListOptions{}) +func (k *kubelogs) watchPods(t test.TLegacy) { + wi, err := k.kc.Kube.CoreV1().Pods(k.namespace).Watch(metav1.ListOptions{}) if err != nil { - t.Error("Error listing pods", "error", err) + t.Error("Logstream knative pod watch failed, logs might be missing", "error", err) + return } - eg := errgroup.Group{} - for _, pod := range pl.Items { - // Grab data from all containers in the pods. We need this in case - // an envoy sidecar is injected for mesh installs. This should be - // equivalent to --all-containers. - for _, container := range pod.Spec.Containers { - // Required for capture below. - pod, container := pod, container - eg.Go(func() error { - options := &corev1.PodLogOptions{ - Container: container.Name, - // Follow directs the api server to continuously stream logs back. - Follow: true, - // Only return new logs (this value is being used for "epsilon"). - SinceSeconds: ptr.Int64(1), - } - - req := kc.Kube.CoreV1().Pods(k.namespace).GetLogs(pod.Name, options) - stream, err := req.Stream() - if err != nil { - return err + go func() { + watchedPods := sets.NewString() + for ev := range wi.ResultChan() { + p := ev.Object.(*corev1.Pod) + switch ev.Type { + case watch.Deleted: + watchedPods.Delete(p.Name) + case watch.Added, watch.Modified: + if watchedPods.Has(p.Name) { + t.Log("Already watching pod", p.Name) + continue } - defer stream.Close() - // Read this container's stream. - scanner := bufio.NewScanner(stream) - for scanner.Scan() { - k.handleLine(scanner.Bytes()) + if podIsReady(p) { + t.Log("Watching logs for pod: ", p.Name) + watchedPods.Insert(p.Name) + k.startForPod(&eg, p) + continue } - return fmt.Errorf("logstream completed prematurely for %s/%s: %w", - pod.Name, container.Name, scanner.Err()) - }) + t.Log("Pod is not yet ready: ", p.Name) + } } - } - + }() // Monitor the error group in the background and surface an error on the kubelogs // in case anything had an active stream open. go func() { @@ -108,6 +137,20 @@ func (k *kubelogs) init(t test.TLegacy) { }() } +func (k *kubelogs) init(t test.TLegacy) { + k.keys = make(map[string]logger, 1) + + kc, err := test.NewKubeClient(test.Flags.Kubeconfig, test.Flags.Cluster) + if err != nil { + t.Error("Error loading client config", "error", err) + return + } + k.kc = kc + + // watchPods will start logging for existing pods as well. + k.watchPods(t) +} + func (k *kubelogs) handleLine(l []byte) { // This holds the standard structure of our logs. var line struct { @@ -160,7 +203,7 @@ func (k *kubelogs) handleLine(l []byte) { } } -// Start implements streamer +// Start implements streamer. func (k *kubelogs) Start(t test.TLegacy) Canceler { k.once.Do(func() { k.init(t) }) @@ -178,7 +221,7 @@ func (k *kubelogs) Start(t test.TLegacy) Canceler { delete(k.keys, name) if k.err != nil { - t.Error("error during logstream", "error", k.err) + t.Error("Error during logstream", "error", k.err) } } }