diff --git a/pkg/service/controller/ingressip/service_ingressip_controller_test.go b/pkg/service/controller/ingressip/service_ingressip_controller_test.go index 56b0dd1f0a96..6d1b38bdfbf8 100644 --- a/pkg/service/controller/ingressip/service_ingressip_controller_test.go +++ b/pkg/service/controller/ingressip/service_ingressip_controller_test.go @@ -11,7 +11,7 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/util/wait" informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" kcoreclient "k8s.io/client-go/kubernetes/typed/core/v1" @@ -44,30 +44,6 @@ func newController(t *testing.T, client *fake.Clientset, stopCh <-chan struct{}) return controller } -func controllerSetup(t *testing.T, startingObjects []runtime.Object, stopCh <-chan struct{}) (*fake.Clientset, *watch.FakeWatcher, *IngressIPController) { - client := fake.NewSimpleClientset(startingObjects...) - - fakeWatch := watch.NewFake() - client.PrependWatchReactor("*", clientgotesting.DefaultWatchReactor(fakeWatch, nil)) - - client.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { - obj := action.(clientgotesting.CreateAction).GetObject() - fakeWatch.Add(obj) - return true, obj, nil - }) - - // Ensure that updates the controller makes are passed through to the watcher. - client.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { - obj := action.(clientgotesting.CreateAction).GetObject() - fakeWatch.Modify(obj) - return true, obj, nil - }) - - controller := newController(t, client, stopCh) - - return client, fakeWatch, controller -} - func newService(name, ip string, typeLoadBalancer bool) *v1.Service { serviceType := v1.ServiceTypeClusterIP if typeLoadBalancer { @@ -599,51 +575,47 @@ func TestBasicControllerFlow(t *testing.T) { newService("lb-unallocated", "", true), } - stopChannel := make(chan struct{}) - defer close(stopChannel) + stop := make(chan struct{}) + defer close(stop) - _, fakeWatch, controller := controllerSetup(t, startingObjects, stopChannel) - - updated := make(chan bool) - deleted := make(chan bool) - - controller.changeHandler = func(change *serviceChange) error { - defer func() { - if len(change.key) == 0 { - deleted <- true - } else if change.oldService != nil { - updated <- true + client := fake.NewSimpleClientset(startingObjects...) + controller := newController(t, client, stop) + controller.changeHandler = controller.processChange + + go controller.Run(stop) + + // Wait for the service spec and status to be updated with an allocated IP. + err := wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) { + for _, genericAction := range client.Actions() { + switch action := genericAction.(type) { + case clientgotesting.UpdateAction: + service, ok := action.GetObject().(*v1.Service) + if ok && len(service.Spec.ExternalIPs) > 0 && len(service.Status.LoadBalancer.Ingress) > 0 { + return true, nil + } } - }() - - err := controller.processChange(change) - if err != nil { - t.Errorf("unexpected error: %v", err) } - - return err + return false, nil + }) + if err != nil { + t.Fatalf("failed waiting for update: %v", err) } - go controller.Run(stopChannel) + client.CoreV1().Services(namespace).Delete("lb-unallocated", &metav1.DeleteOptions{}) - waitForUpdate := func(msg string) { - t.Logf("waiting for: %v", msg) - select { - case <-updated: - case <-time.After(time.Duration(30 * time.Second)): - t.Fatalf("failed to see: %v", msg) + // Wait for a delete to be persisted. + err = wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) { + for _, genericAction := range client.Actions() { + switch action := genericAction.(type) { + case clientgotesting.DeleteAction: + if action.GetName() == "lb-unallocated" { + return true, nil + } + } } - } - - waitForUpdate("spec update") - waitForUpdate("status update") - - fakeWatch.Delete(startingObjects[0]) - - t.Log("waiting for the service to be deleted") - select { - case <-deleted: - case <-time.After(time.Duration(30 * time.Second)): - t.Fatalf("failed to see expected service deletion") + return false, nil + }) + if err != nil { + t.Fatalf("failed waiting for delete: %v", err) } }