Skip to content

Commit

Permalink
Make logstream watch the pods as they change. (#1496)
Browse files Browse the repository at this point in the history
* Make logstream watch the pods as they change.

- watch the pods and track what we already watch
- start watchers for the new pods

* log

* review

* other
  • Loading branch information
vagababov authored Jul 15, 2020
1 parent f50bf78 commit 4c35539
Showing 1 changed file with 84 additions and 41 deletions.
125 changes: 84 additions & 41 deletions test/logstream/kubelogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"

"knative.dev/pkg/ptr"
"knative.dev/pkg/test"
Expand All @@ -35,6 +37,7 @@ import (

type kubelogs struct {
namespace string
kc *test.KubeClient

once sync.Once
m sync.RWMutex
Expand All @@ -49,54 +52,80 @@ var _ streamer = (*kubelogs)(nil)
// timeFormat defines a simple timestamp with millisecond granularity
const timeFormat = "15:04:05.000"

func (k *kubelogs) init(t test.TLegacy) {
k.keys = make(map[string]logger)
func (k *kubelogs) startForPod(eg *errgroup.Group, pod *corev1.Pod) {
// Grab data from all containers in the pods. We need this in case
// an envoy sidecar is injected for mesh installs. This should be
// equivalent to --all-containers.
for _, container := range pod.Spec.Containers {
// Required for capture below.
psn, pn, cn := pod.Namespace, pod.Name, container.Name
eg.Go(func() error {
options := &corev1.PodLogOptions{
Container: cn,
// Follow directs the api server to continuously stream logs back.
Follow: true,
// Only return new logs (this value is being used for "epsilon").
SinceSeconds: ptr.Int64(1),
}

req := k.kc.Kube.CoreV1().Pods(psn).GetLogs(pn, options)
stream, err := req.Stream()
if err != nil {
return err
}
defer stream.Close()
// Read this container's stream.
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
k.handleLine(scanner.Bytes())
}
// Pods get killed with chaos duck, so logs might end
// before the test does. So don't report an error here.
return nil
})
}
}

kc, err := test.NewKubeClient(test.Flags.Kubeconfig, test.Flags.Cluster)
if err != nil {
t.Error("Error loading client config", "error", err)
func podIsReady(p *corev1.Pod) bool {
if p.Status.Phase == corev1.PodRunning && p.DeletionTimestamp == nil {
for _, cond := range p.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
return true
}
}
}
return false
}

// List the pods in the given namespace.
pl, err := kc.Kube.CoreV1().Pods(k.namespace).List(metav1.ListOptions{})
func (k *kubelogs) watchPods(t test.TLegacy) {
wi, err := k.kc.Kube.CoreV1().Pods(k.namespace).Watch(metav1.ListOptions{})
if err != nil {
t.Error("Error listing pods", "error", err)
t.Error("Logstream knative pod watch failed, logs might be missing", "error", err)
return
}

eg := errgroup.Group{}
for _, pod := range pl.Items {
// Grab data from all containers in the pods. We need this in case
// an envoy sidecar is injected for mesh installs. This should be
// equivalent to --all-containers.
for _, container := range pod.Spec.Containers {
// Required for capture below.
pod, container := pod, container
eg.Go(func() error {
options := &corev1.PodLogOptions{
Container: container.Name,
// Follow directs the api server to continuously stream logs back.
Follow: true,
// Only return new logs (this value is being used for "epsilon").
SinceSeconds: ptr.Int64(1),
}

req := kc.Kube.CoreV1().Pods(k.namespace).GetLogs(pod.Name, options)
stream, err := req.Stream()
if err != nil {
return err
go func() {
watchedPods := sets.NewString()
for ev := range wi.ResultChan() {
p := ev.Object.(*corev1.Pod)
switch ev.Type {
case watch.Deleted:
watchedPods.Delete(p.Name)
case watch.Added, watch.Modified:
if watchedPods.Has(p.Name) {
t.Log("Already watching pod", p.Name)
continue
}
defer stream.Close()
// Read this container's stream.
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
k.handleLine(scanner.Bytes())
if podIsReady(p) {
t.Log("Watching logs for pod: ", p.Name)
watchedPods.Insert(p.Name)
k.startForPod(&eg, p)
continue
}
return fmt.Errorf("logstream completed prematurely for %s/%s: %w",
pod.Name, container.Name, scanner.Err())
})
t.Log("Pod is not yet ready: ", p.Name)
}
}
}

}()
// Monitor the error group in the background and surface an error on the kubelogs
// in case anything had an active stream open.
go func() {
Expand All @@ -108,6 +137,20 @@ func (k *kubelogs) init(t test.TLegacy) {
}()
}

func (k *kubelogs) init(t test.TLegacy) {
k.keys = make(map[string]logger, 1)

kc, err := test.NewKubeClient(test.Flags.Kubeconfig, test.Flags.Cluster)
if err != nil {
t.Error("Error loading client config", "error", err)
return
}
k.kc = kc

// watchPods will start logging for existing pods as well.
k.watchPods(t)
}

func (k *kubelogs) handleLine(l []byte) {
// This holds the standard structure of our logs.
var line struct {
Expand Down Expand Up @@ -160,7 +203,7 @@ func (k *kubelogs) handleLine(l []byte) {
}
}

// Start implements streamer
// Start implements streamer.
func (k *kubelogs) Start(t test.TLegacy) Canceler {
k.once.Do(func() { k.init(t) })

Expand All @@ -178,7 +221,7 @@ func (k *kubelogs) Start(t test.TLegacy) Canceler {
delete(k.keys, name)

if k.err != nil {
t.Error("error during logstream", "error", k.err)
t.Error("Error during logstream", "error", k.err)
}
}
}

0 comments on commit 4c35539

Please sign in to comment.