Skip to content
This repository has been archived by the owner on Dec 9, 2022. It is now read-only.

Commit

Permalink
Add polling to pod watch
Browse files Browse the repository at this point in the history
  • Loading branch information
me committed Jan 9, 2018
1 parent a3e88e3 commit 1b21a18
Showing 1 changed file with 46 additions and 24 deletions.
70 changes: 46 additions & 24 deletions cli/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"log"
"time"
)

type WatchEventType string
Expand All @@ -21,6 +22,8 @@ const (
Failed WatchEventType = "FAILED"
)

const watchPollInterval = 30 * time.Second

type WatchEvent struct {
Type WatchEventType
Pod *v1.Pod
Expand All @@ -44,6 +47,33 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha

containers := make(map[string]bool)

parsePodStatus := func(pod *v1.Pod) {
if pod.Status.Phase == v1.PodSucceeded {
out <- WatchEvent{Completed, pod, "", ""}
} else if pod.Status.Phase == v1.PodFailed {
out <- WatchEvent{Failed, pod, "", ""}
} else {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
_, present := containers[container.Name]
if !present {
out <- WatchEvent{Added, pod, container.Name, ""}
containers[container.Name] = true
}
} else if container.State.Terminated != nil {
_, present := containers[container.Name]
if present {
out <- WatchEvent{Removed, pod, container.Name, ""}
containers[container.Name] = false
}
if container.State.Terminated.ExitCode != 0 {
out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message}
}
}
}
}
}

go func() {
for {
select {
Expand All @@ -57,30 +87,7 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha

switch e.Type {
case watch.Added, watch.Modified:
if pod.Status.Phase == v1.PodSucceeded {
out <- WatchEvent{Completed, pod, "", ""}
} else if pod.Status.Phase == v1.PodFailed {
out <- WatchEvent{Failed, pod, "", ""}
} else {
for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
_, present := containers[container.Name]
if !present {
out <- WatchEvent{Added, pod, container.Name, ""}
containers[container.Name] = true
}
} else if container.State.Terminated != nil {
_, present := containers[container.Name]
if present {
out <- WatchEvent{Removed, pod, container.Name, ""}
containers[container.Name] = false
}
if container.State.Terminated.ExitCode != 0 {
out <- WatchEvent{Failed, pod, container.Name, container.State.Terminated.Message}
}
}
}
}
parsePodStatus(pod)
case watch.Deleted:
out <- WatchEvent{Deleted, pod, "", ""}
case watch.Error:
Expand All @@ -94,6 +101,21 @@ func Watch(ctx context.Context, c kubernetes.Interface, watchPod *v1.Pod) (<-cha
}
}()

go func() {
for {
select {
case <-time.After(watchPollInterval):
pod, err := c.CoreV1().Pods(watchPod.Namespace).Get(watchPod.Name, metav1.GetOptions{})
if err != nil {
parsePodStatus(pod)
}
case <-ctx.Done():
return
}
}

}()

return out, nil
}

Expand Down

0 comments on commit 1b21a18

Please sign in to comment.