Skip to content

Commit

Permalink
no pod events yet prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
gabemontero committed Oct 9, 2021
1 parent af38749 commit d01f94c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/shp/cmd/build/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ func TestStartBuildRunFollowLog(t *testing.T) {
return false, nil
})
if err != nil {
cmd.watchLock.Unlock()
t.Errorf("test %s: Run initialization did not complete in time: pw %#v ioStreams %#v shpClientset %#v", test.name, cmd.pw, cmd.ioStreams, cmd.shpClientset)
cmd.watchLock.Unlock()
}

// mimic watch events, bypassing k8s fake client watch hoopla whose plug points are not always useful;
Expand Down
52 changes: 39 additions & 13 deletions pkg/shp/reactor/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ const (
// state modifications, should work as a helper to build business logic based on the build POD
// changes.
type PodWatcher struct {
ctx context.Context
to time.Duration
stopCh chan bool // stops the event loop execution
stopLock sync.Mutex
stopped bool
watcher watch.Interface // client watch instance

toPodFn TimeoutPodFn
skipPodFn SkipPodFn
onPodAddedFn OnPodEventFn
onPodModifiedFn OnPodEventFn
onPodDeletedFn OnPodEventFn
ctx context.Context
to time.Duration
stopCh chan bool // stops the event loop execution
stopLock sync.Mutex
stopped bool
eventTicker *time.Ticker
watcher watch.Interface // client watch instance

noPodEventsYetFn NoPodEventsYetFn
toPodFn TimeoutPodFn
skipPodFn SkipPodFn
onPodAddedFn OnPodEventFn
onPodModifiedFn OnPodEventFn
onPodDeletedFn OnPodEventFn
}

// SkipPodFn a given pod instance is informed and expects a boolean as return. When true is returned
Expand All @@ -44,6 +46,9 @@ type OnPodEventFn func(pod *corev1.Pod) error
// TimeoutPodFn when either the context or request timeout expires before the Pod finishes
type TimeoutPodFn func(msg string)

// NoPodEventsYetFn when the watch has not received the create event within a reasonable time
type NoPodEventsYetFn func()

// WithSkipPodFn sets the skip function instance.
func (p *PodWatcher) WithSkipPodFn(fn SkipPodFn) *PodWatcher {
p.skipPodFn = fn
Expand Down Expand Up @@ -74,8 +79,17 @@ func (p *PodWatcher) WithTimeoutPodFn(fn TimeoutPodFn) *PodWatcher {
return p
}

// WithNoPodEventsYetFn sets the function executed when the watcher decides it has waited long enough for the first event
func (p *PodWatcher) WithNoPodEventsYetFn(fn NoPodEventsYetFn) *PodWatcher {
p.noPodEventsYetFn = fn
return p
}

// handleEvent applies user informed functions against informed pod and event.
func (p *PodWatcher) handleEvent(pod *corev1.Pod, event watch.Event) error {
p.stopLock.Lock()
defer p.stopLock.Unlock()
p.eventTicker.Stop()
switch event.Type {
case watch.Added:
if p.onPodAddedFn != nil {
Expand Down Expand Up @@ -139,6 +153,16 @@ func (p *PodWatcher) Start() (*corev1.Pod, error) {
}
return nil, nil

// deal with case where a lack of any pod event means there is some sort of issue;
// we let the called function decide whether to stop the watch
// NOTE: a k8s event watch coupled with our pod watch proved problematic with unit tests; also, with
// a lot of the relevant constants in github.com/k8s/k8s, which is a hassle to vendor in, prototypes
// felt fragile
case <-p.eventTicker.C:
if p.noPodEventsYetFn != nil {
p.noPodEventsYetFn()
}

// watching over stop channel to stop the event loop on demand.
case <-p.stopCh:
p.watcher.Stop()
Expand All @@ -155,6 +179,7 @@ func (p *PodWatcher) Stop() {
defer p.stopLock.Unlock()
if !p.stopped {
close(p.stopCh)
p.eventTicker.Stop()
p.stopped = true
}
}
Expand All @@ -171,5 +196,6 @@ func NewPodWatcher(
if err != nil {
return nil, err
}
return &PodWatcher{ctx: ctx, to: timeout, watcher: w, stopCh: make(chan bool), stopLock: sync.Mutex{}}, nil
//TODO don't think the have not received events yet ticker needs to be tunable, but leaving a TODO for now while we get feedback
return &PodWatcher{ctx: ctx, to: timeout, watcher: w, eventTicker: time.NewTicker(1 * time.Second), stopCh: make(chan bool), stopLock: sync.Mutex{}}, nil
}
39 changes: 39 additions & 0 deletions pkg/shp/reactor/pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,45 @@ func Test_PodWatcher_ContextTimeout(t *testing.T) {
g.Expect(called).To(o.BeTrue())
}

func Test_PodWatcher_NotCalledYet(t *testing.T) {
// we separate this test out from the other events given the
// lazy check we have for not getting pod events
g := gomega.NewGomegaWithT(t)
ctx := context.TODO()

clientset := fake.NewSimpleClientset()

pw, err := NewPodWatcher(ctx, math.MaxInt64, clientset, metav1.ListOptions{}, metav1.NamespaceDefault)
g.Expect(err).To(o.BeNil())

eventsCh := make(chan bool, 1)
eventsDoneCh := make(chan bool, 1)

called := false
pw.WithNoPodEventsYetFn(func() {
called = true
eventsCh <- true
})


// executing the event loop in the background, and waiting for the stop channel before inspecting
// for errors
go func() {
_, err := pw.Start()
<-pw.stopCh
g.Expect(err).To(o.BeNil())
eventsDoneCh <- true
}()

<-eventsCh
pw.Stop()
<-eventsDoneCh

if !called {
t.Fatal("called still false")
}
}

func Test_PodWatcherEvents(t *testing.T) {
g := gomega.NewGomegaWithT(t)
ctx := context.TODO()
Expand Down

0 comments on commit d01f94c

Please sign in to comment.