From 1b21a18f740f8d3562c047b46fe00ed1d8892a0a Mon Sep 17 00:00:00 2001 From: Ivan Pirlik Date: Tue, 9 Jan 2018 12:29:32 +0000 Subject: [PATCH] Add polling to pod watch --- cli/pipeline/watch.go | 70 ++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/cli/pipeline/watch.go b/cli/pipeline/watch.go index dec6b86..58d736a 100644 --- a/cli/pipeline/watch.go +++ b/cli/pipeline/watch.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "log" + "time" ) type WatchEventType string @@ -21,6 +22,8 @@ const ( Failed WatchEventType = "FAILED" ) +const watchPollInterval = 30 * time.Second + type WatchEvent struct { Type WatchEventType Pod *v1.Pod @@ -44,6 +47,33 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha containers := make(map[string]bool) + parsePodStatus := func(pod *v1.Pod) { + if pod.Status.Phase == v1.PodSucceeded { + out <- WatchEvent{Completed, pod, "", ""} + } else if pod.Status.Phase == v1.PodFailed { + out <- WatchEvent{Failed, pod, "", ""} + } else { + for _, container := range pod.Status.ContainerStatuses { + if container.State.Running != nil { + _, present := containers[container.Name] + if !present { + out <- WatchEvent{Added, pod, container.Name, ""} + containers[container.Name] = true + } + } else if container.State.Terminated != nil { + _, present := containers[container.Name] + if present { + out <- WatchEvent{Removed, pod, container.Name, ""} + containers[container.Name] = false + } + if container.State.Terminated.ExitCode != 0 { + out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message} + } + } + } + } + } + go func() { for { select { @@ -57,30 +87,7 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha switch e.Type { case watch.Added, watch.Modified: - if pod.Status.Phase == v1.PodSucceeded { - out <- WatchEvent{Completed, pod, "", ""} - } else if pod.Status.Phase == v1.PodFailed { - out <- WatchEvent{Failed, pod, "", ""} - } else { - for _, container := range pod.Status.ContainerStatuses { - if container.State.Running != nil { - _, present := containers[container.Name] - if !present { - out <- WatchEvent{Added, pod, container.Name, ""} - containers[container.Name] = true - } - } else if container.State.Terminated != nil { - _, present := containers[container.Name] - if present { - out <- WatchEvent{Removed, pod, container.Name, ""} - containers[container.Name] = false - } - if container.State.Terminated.ExitCode != 0 { - out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message} - } - } - } - } + parsePodStatus(pod) case watch.Deleted: out <- WatchEvent{Deleted, pod, "", ""} case watch.Error: @@ -94,6 +101,21 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha } }() + go func() { + for { + select { + case <-time.After(watchPollInterval): + pod, err := c.CoreV1().Pods(watchPod.Namespace).Get(watchPod.Name, metav1.GetOptions{}) + if err != nil { + parsePodStatus(pod) + } + case <-ctx.Done(): + return + } + } + + }() + return out, nil }