diff --git a/pkg/endpoint/providers/kube/client_test.go b/pkg/endpoint/providers/kube/client_test.go index 5c7aa3c229..2137cf2f9e 100644 --- a/pkg/endpoint/providers/kube/client_test.go +++ b/pkg/endpoint/providers/kube/client_test.go @@ -14,9 +14,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fake "k8s.io/client-go/kubernetes/fake" + "github.com/openservicemesh/osm/pkg/announcements" "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/endpoint" k8s "github.com/openservicemesh/osm/pkg/kubernetes" + "github.com/openservicemesh/osm/pkg/kubernetes/events" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/tests" "github.com/openservicemesh/osm/pkg/utils" @@ -54,6 +56,11 @@ var _ = Describe("Test Kube Client Provider", func() { It("should correctly return a list of endpoints for a service", func() { // Should be empty for now + endpointChannel := events.GetPubSubInstance().Subscribe(announcements.EndpointAdded, + announcements.EndpointDeleted, + announcements.EndpointUpdated) + defer events.GetPubSubInstance().Unsub(endpointChannel) + Expect(provider.ListEndpointsForService(tests.BookbuyerService)).To(BeNil()) // Create bookbuyer endpoint in Bookbuyer namespace @@ -86,7 +93,7 @@ var _ = Describe("Test Kube Client Provider", func() { _, err = fakeClientSet.CoreV1().Endpoints(tests.BookbuyerService.Namespace).Create(context.TODO(), endp, metav1.CreateOptions{}) Expect(err).To(BeNil()) - <-provider.GetAnnouncementsChannel() + <-endpointChannel Expect(provider.ListEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { @@ -125,6 +132,11 @@ var _ = Describe("Test Kube Client Provider", func() { }) It("GetResolvableEndpoints should properly return actual endpoints without ClusterIP when ClusterIP is not set", func() { + endpointChannel := events.GetPubSubInstance().Subscribe(announcements.EndpointAdded, + announcements.EndpointDeleted, + announcements.EndpointUpdated) + defer events.GetPubSubInstance().Unsub(endpointChannel) + // Expect the individual pod endpoints, when no cluster IP is assigned to the service mockKubeController.EXPECT().GetService(tests.BookbuyerService).Return(&corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -173,7 +185,7 @@ var _ = Describe("Test Kube Client Provider", func() { _, err = fakeClientSet.CoreV1().Endpoints(tests.BookbuyerService.Namespace).Create(context.TODO(), endp, metav1.CreateOptions{}) Expect(err).To(BeNil()) - <-provider.GetAnnouncementsChannel() + <-endpointChannel Expect(provider.GetResolvableEndpointsForService(tests.BookbuyerService)).To(Equal([]endpoint.Endpoint{ { @@ -184,6 +196,11 @@ var _ = Describe("Test Kube Client Provider", func() { }) It("should correctly return the port to protocol mapping for a service's endpoints", func() { + endpointChannel := events.GetPubSubInstance().Subscribe(announcements.EndpointAdded, + announcements.EndpointDeleted, + announcements.EndpointUpdated) + defer events.GetPubSubInstance().Unsub(endpointChannel) + appProtoHTTP := "http" appProtoTCP := "tcp" @@ -226,7 +243,7 @@ var _ = Describe("Test Kube Client Provider", func() { Create(context.TODO(), endp, metav1.CreateOptions{}) Expect(err).To(BeNil()) - <-provider.GetAnnouncementsChannel() + <-endpointChannel portToProtocolMap, err := provider.GetPortToProtocolMappingForService(tests.BookbuyerService) Expect(err).To(BeNil()) @@ -330,6 +347,11 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func }) It("should return a service that matches the ServiceAccount associated with the Pod", func() { + podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded, + announcements.PodDeleted, + announcements.PodUpdated) + defer events.GetPubSubInstance().Unsub(podsChannel) + // Create a Service svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -379,7 +401,7 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func _, err = fakeClientSet.CoreV1().Pods(testNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel givenSvcAccount := service.K8sServiceAccount{ Namespace: testNamespace, @@ -396,10 +418,15 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func err = fakeClientSet.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel }) It("should return an error when the Service selector doesn't match the pod", func() { + podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded, + announcements.PodDeleted, + announcements.PodUpdated) + defer events.GetPubSubInstance().Unsub(podsChannel) + // Create a Service svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -449,7 +476,7 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func _, err = fakeClientSet.CoreV1().Pods(testNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel givenSvcAccount := service.K8sServiceAccount{ Namespace: testNamespace, @@ -465,13 +492,17 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func err = fakeClientSet.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel }) It("should return all services when multiple services match the same Pod", func() { // This test is meant to ensure the // service selector logic works as expected when multiple services // have the same selector match. + podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded, + announcements.PodDeleted, + announcements.PodUpdated) + defer events.GetPubSubInstance().Unsub(podsChannel) // Create a Service svc := &corev1.Service{ @@ -528,7 +559,7 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func _, err = fakeClientSet.CoreV1().Pods(testNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel givenSvcAccount := service.K8sServiceAccount{ Namespace: testNamespace, @@ -547,6 +578,6 @@ var _ = Describe("When getting a Service associated with a ServiceAccount", func err = fakeClientSet.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-provider.GetAnnouncementsChannel() + <-podsChannel }) })