Skip to content

Commit

Permalink
Stabilize ingressip controller unit test
Browse files Browse the repository at this point in the history
Eliminate flakes in the TestBasicControllerFlow test by polling fake action
state rather than relying on custom fake watch consumption; the event counts on
the watch are not deterministic, and so the previous assumption of two events
doesn't always hold and causes a deadlock in the test's custom controller
handler.

Refactor the test to remove unnecessary fake watch and controller setup; just
use the fake client and its built-in watch capabilities directly.
  • Loading branch information
ironcladlou committed Jul 18, 2018
1 parent cc355cc commit 0179284
Showing 1 changed file with 36 additions and 64 deletions.
100 changes: 36 additions & 64 deletions pkg/service/controller/ingressip/service_ingressip_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
kcoreclient "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -44,30 +44,6 @@ func newController(t *testing.T, client *fake.Clientset, stopCh <-chan struct{})
return controller
}

func controllerSetup(t *testing.T, startingObjects []runtime.Object, stopCh <-chan struct{}) (*fake.Clientset, *watch.FakeWatcher, *IngressIPController) {
client := fake.NewSimpleClientset(startingObjects...)

fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", clientgotesting.DefaultWatchReactor(fakeWatch, nil))

client.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(clientgotesting.CreateAction).GetObject()
fakeWatch.Add(obj)
return true, obj, nil
})

// Ensure that updates the controller makes are passed through to the watcher.
client.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(clientgotesting.CreateAction).GetObject()
fakeWatch.Modify(obj)
return true, obj, nil
})

controller := newController(t, client, stopCh)

return client, fakeWatch, controller
}

func newService(name, ip string, typeLoadBalancer bool) *v1.Service {
serviceType := v1.ServiceTypeClusterIP
if typeLoadBalancer {
Expand Down Expand Up @@ -599,51 +575,47 @@ func TestBasicControllerFlow(t *testing.T) {
newService("lb-unallocated", "", true),
}

stopChannel := make(chan struct{})
defer close(stopChannel)
stop := make(chan struct{})
defer close(stop)

_, fakeWatch, controller := controllerSetup(t, startingObjects, stopChannel)

updated := make(chan bool)
deleted := make(chan bool)

controller.changeHandler = func(change *serviceChange) error {
defer func() {
if len(change.key) == 0 {
deleted <- true
} else if change.oldService != nil {
updated <- true
client := fake.NewSimpleClientset(startingObjects...)
controller := newController(t, client, stop)
controller.changeHandler = controller.processChange

go controller.Run(stop)

// Wait for the service spec and status to be updated with an allocated IP.
err := wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) {
for _, genericAction := range client.Actions() {
switch action := genericAction.(type) {
case clientgotesting.UpdateAction:
service, ok := action.GetObject().(*v1.Service)
if ok && len(service.Spec.ExternalIPs) > 0 && len(service.Status.LoadBalancer.Ingress) > 0 {
return true, nil
}
}
}()

err := controller.processChange(change)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

return err
return false, nil
})
if err != nil {
t.Fatalf("failed waiting for update: %v", err)
}

go controller.Run(stopChannel)
client.CoreV1().Services(namespace).Delete("lb-unallocated", &metav1.DeleteOptions{})

waitForUpdate := func(msg string) {
t.Logf("waiting for: %v", msg)
select {
case <-updated:
case <-time.After(time.Duration(30 * time.Second)):
t.Fatalf("failed to see: %v", msg)
// Wait for a delete to be persisted.
err = wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) {
for _, genericAction := range client.Actions() {
switch action := genericAction.(type) {
case clientgotesting.DeleteAction:
if action.GetName() == "lb-unallocated" {
return true, nil
}
}
}
}

waitForUpdate("spec update")
waitForUpdate("status update")

fakeWatch.Delete(startingObjects[0])

t.Log("waiting for the service to be deleted")
select {
case <-deleted:
case <-time.After(time.Duration(30 * time.Second)):
t.Fatalf("failed to see expected service deletion")
return false, nil
})
if err != nil {
t.Fatalf("failed waiting for delete: %v", err)
}
}

0 comments on commit 0179284

Please sign in to comment.