From 14be75a38d021faf35066829f6b1b15665455ea9 Mon Sep 17 00:00:00 2001 From: Edu Serra Date: Mon, 23 Nov 2020 12:28:36 -0800 Subject: [PATCH] pubsub: introduce pubsub chan Unsub(), fix few sync tests (#2109) * pubsub: introduce close, fix few sync tests - Introduces Close() to pubsub - adds a Close() test - moves few tests to use pubsub and not read from individual channels * CR - naming: Close() -> Unsub() - Add release branch references vs main - use idiomatic channel range to exit on close Signed-off-by: Eduard Serra --- pkg/kubernetes/client_test.go | 4 ++ pkg/kubernetes/events/event_pubsub.go | 23 +++++++ pkg/kubernetes/events/event_pubsub_test.go | 20 ++++++ pkg/kubernetes/events/types.go | 5 ++ pkg/smi/client_test.go | 73 +++++++++++++++++----- 5 files changed, 109 insertions(+), 16 deletions(-) diff --git a/pkg/kubernetes/client_test.go b/pkg/kubernetes/client_test.go index 83d8acf40f..3e3b689453 100644 --- a/pkg/kubernetes/client_test.go +++ b/pkg/kubernetes/client_test.go @@ -142,6 +142,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() { serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded, announcements.ServiceDeleted, announcements.ServiceUpdated) + defer events.GetPubSubInstance().Unsub(serviceChannel) // Create monitored namespace for this service testNamespace := &corev1.Namespace{ @@ -190,6 +191,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() { serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded, announcements.ServiceDeleted, announcements.ServiceUpdated) + defer events.GetPubSubInstance().Unsub(serviceChannel) testSvcs := []service.MeshService{ {Name: uuid.New().String(), Namespace: "ns-1"}, {Name: uuid.New().String(), Namespace: "ns-2"}, @@ -260,9 +262,11 @@ var _ = Describe("Test Namespace KubeController Methods", func() { serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded, announcements.ServiceDeleted, announcements.ServiceUpdated) + defer events.GetPubSubInstance().Unsub(serviceChannel) podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded, announcements.PodDeleted, announcements.PodUpdated) + defer events.GetPubSubInstance().Unsub(podsChannel) // Create a namespace testNamespace := &corev1.Namespace{ diff --git a/pkg/kubernetes/events/event_pubsub.go b/pkg/kubernetes/events/event_pubsub.go index 417aedb292..14bf2a303e 100644 --- a/pkg/kubernetes/events/event_pubsub.go +++ b/pkg/kubernetes/events/event_pubsub.go @@ -36,6 +36,29 @@ func (c *osmPubsub) Publish(message PubSubMessage) { c.pSub.Pub(message, message.AnnouncementType.String()) } +// Unsub is the Unsub implementation for PubSub. +// It is synchronized, upon exit the channel is guaranteed to be both +// unsubbed to all topics and closed. +// This is a necessary step to guarantee garbage collection +func (c *osmPubsub) Unsub(unsubChan chan interface{}) { + // implementation has several requirements (including different goroutine context) + // https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L102 + + syncCh := make(chan struct{}) + go func() { + // This will close the channel on the pubsub backend + // https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L264 + c.pSub.Unsub(unsubChan) + + for range unsubChan { + // Drain channel, read til close + } + syncCh <- struct{}{} + }() + + <-syncCh +} + // GetPubSubInstance returns a unique, global scope PubSub interface instance // Note that spawning the instance is not thread-safe. First call should happen on // a single-routine context to avoid races. diff --git a/pkg/kubernetes/events/event_pubsub_test.go b/pkg/kubernetes/events/event_pubsub_test.go index f3266987c7..8d07eefd37 100644 --- a/pkg/kubernetes/events/event_pubsub_test.go +++ b/pkg/kubernetes/events/event_pubsub_test.go @@ -60,3 +60,23 @@ func TestPubSubEvents(t *testing.T) { } } } + +func TestPubSubClose(t *testing.T) { + assert := assert.New(t) + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + subChannel := GetPubSubInstance().Subscribe(announcements.BackpressureUpdated) + + // publish something + GetPubSubInstance().Publish(PubSubMessage{ + AnnouncementType: announcements.BackpressureUpdated, + }) + + // make sure channel is drained and closed + GetPubSubInstance().Unsub(subChannel) + + // Channel has to have been already emptied and closed + _, ok := <-subChannel + assert.False(ok) +} diff --git a/pkg/kubernetes/events/types.go b/pkg/kubernetes/events/types.go index 84f4f79fbd..c61a23dc51 100644 --- a/pkg/kubernetes/events/types.go +++ b/pkg/kubernetes/events/types.go @@ -40,4 +40,9 @@ type PubSub interface { // Publish publishes the message to all subscribers that have subscribed to topic Publish(message PubSubMessage) + + // Unsub unsubscribes and closes the channel on pubsub backend + // Note this is a necessary step to ensure a channel can be + // garbage collected when it is freed. + Unsub(unsubChan chan interface{}) } diff --git a/pkg/smi/client_test.go b/pkg/smi/client_test.go index fc8a88f5a1..4a0b145d3d 100644 --- a/pkg/smi/client_test.go +++ b/pkg/smi/client_test.go @@ -17,9 +17,11 @@ import ( osmPolicy "github.com/openservicemesh/osm/experimental/pkg/apis/policy/v1alpha1" osmPolicyClient "github.com/openservicemesh/osm/experimental/pkg/client/clientset/versioned/fake" + "github.com/openservicemesh/osm/pkg/announcements" "github.com/openservicemesh/osm/pkg/constants" "github.com/openservicemesh/osm/pkg/featureflags" k8s "github.com/openservicemesh/osm/pkg/kubernetes" + "github.com/openservicemesh/osm/pkg/kubernetes/events" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/tests" ) @@ -98,6 +100,11 @@ var _ = Describe("When listing TrafficSplit", func() { }) It("should return a list of traffic split resources", func() { + tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded, + announcements.TrafficSplitDeleted, + announcements.TrafficSplitUpdated) + defer events.GetPubSubInstance().Unsub(tsChannel) + split := &smiSplit.TrafficSplit{ ObjectMeta: metav1.ObjectMeta{ Name: "test-ListTrafficSplits", @@ -120,7 +127,7 @@ var _ = Describe("When listing TrafficSplit", func() { _, err := fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Create(context.TODO(), split, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-tsChannel splits := meshSpec.ListTrafficSplits() Expect(len(splits)).To(Equal(1)) @@ -128,7 +135,7 @@ var _ = Describe("When listing TrafficSplit", func() { err = fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Delete(context.TODO(), split.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-tsChannel }) }) @@ -144,6 +151,11 @@ var _ = Describe("When listing TrafficSplit services", func() { }) It("should return a list of weighted services corresponding to the traffic split backends", func() { + tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded, + announcements.TrafficSplitDeleted, + announcements.TrafficSplitUpdated) + defer events.GetPubSubInstance().Unsub(tsChannel) + split := &smiSplit.TrafficSplit{ ObjectMeta: metav1.ObjectMeta{ Name: "test-ListTrafficSplitServices", @@ -166,7 +178,7 @@ var _ = Describe("When listing TrafficSplit services", func() { _, err := fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Create(context.TODO(), split, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-tsChannel weightedServices := meshSpec.ListTrafficSplitServices() Expect(len(weightedServices)).To(Equal(len(split.Spec.Backends))) @@ -178,7 +190,7 @@ var _ = Describe("When listing TrafficSplit services", func() { err = fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Delete(context.TODO(), split.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-tsChannel }) }) @@ -194,6 +206,11 @@ var _ = Describe("When listing ServiceAccounts", func() { }) It("should return a list of service accounts specified in TrafficTarget resources", func() { + ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded, + announcements.TrafficTargetDeleted, + announcements.TrafficTargetUpdated) + defer events.GetPubSubInstance().Unsub(ttChannel) + trafficTarget := &smiAccess.TrafficTarget{ TypeMeta: metav1.TypeMeta{ APIVersion: "access.smi-spec.io/v1alpha2", @@ -224,7 +241,7 @@ var _ = Describe("When listing ServiceAccounts", func() { _, err := fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Create(context.TODO(), trafficTarget, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-ttChannel svcAccounts := meshSpec.ListServiceAccounts() @@ -233,7 +250,7 @@ var _ = Describe("When listing ServiceAccounts", func() { err = fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Delete(context.TODO(), trafficTarget.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-ttChannel }) }) @@ -249,6 +266,11 @@ var _ = Describe("When listing TrafficTargets", func() { }) It("Returns a list of TrafficTarget resources", func() { + ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded, + announcements.TrafficTargetDeleted, + announcements.TrafficTargetUpdated) + defer events.GetPubSubInstance().Unsub(ttChannel) + trafficTarget := &smiAccess.TrafficTarget{ TypeMeta: metav1.TypeMeta{ APIVersion: "access.smi-spec.io/v1alpha2", @@ -279,14 +301,14 @@ var _ = Describe("When listing TrafficTargets", func() { _, err := fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Create(context.TODO(), trafficTarget, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-ttChannel targets := meshSpec.ListTrafficTargets() Expect(len(targets)).To(Equal(1)) err = fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Delete(context.TODO(), trafficTarget.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-ttChannel }) }) @@ -307,6 +329,11 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() { }) It("should return a list of ListHTTPTrafficSpecs resources", func() { + rgChannel := events.GetPubSubInstance().Subscribe(announcements.RouteGroupAdded, + announcements.RouteGroupDeleted, + announcements.RouteGroupUpdated) + defer events.GetPubSubInstance().Unsub(rgChannel) + routeSpec := &smiSpecs.HTTPRouteGroup{ TypeMeta: metav1.TypeMeta{ APIVersion: "specs.smi-spec.io/v1alpha3", @@ -343,7 +370,7 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() { _, err := fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().HTTPRouteGroups(testNamespaceName).Create(context.TODO(), routeSpec, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-rgChannel httpRoutes := meshSpec.ListHTTPTrafficSpecs() Expect(len(httpRoutes)).To(Equal(1)) @@ -351,7 +378,7 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() { err = fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().HTTPRouteGroups(testNamespaceName).Delete(context.TODO(), routeSpec.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-rgChannel }) }) @@ -372,6 +399,10 @@ var _ = Describe("When listing TCP routes", func() { }) It("should return a list of TCPRoute resources", func() { + trChannel := events.GetPubSubInstance().Subscribe(announcements.TCPRouteAdded, + announcements.TCPRouteDeleted, + announcements.TCPRouteUpdated) + defer events.GetPubSubInstance().Unsub(trChannel) routeSpec := &smiSpecs.TCPRoute{ TypeMeta: metav1.TypeMeta{ APIVersion: "specs.smi-spec.io/v1alpha2", @@ -386,7 +417,7 @@ var _ = Describe("When listing TCP routes", func() { _, err := fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().TCPRoutes(testNamespaceName).Create(context.TODO(), routeSpec, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-trChannel tcpRoutes := meshSpec.ListTCPTrafficSpecs() Expect(len(tcpRoutes)).To(Equal(1)) @@ -394,7 +425,7 @@ var _ = Describe("When listing TCP routes", func() { err = fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().TCPRoutes(testNamespaceName).Delete(context.TODO(), routeSpec.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-trChannel }) }) @@ -435,6 +466,11 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f }) It("should return the Backpresure policy for the given service", func() { + bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded, + announcements.BackpressureDeleted, + announcements.BackpressureUpdated) + defer events.GetPubSubInstance().Unsub(bpChannel) + meshSvc := service.MeshService{ Namespace: testNamespaceName, Name: "test-GetBackpressurePolicy", @@ -456,7 +492,7 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f _, err := fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Create(context.TODO(), backpressurePolicy, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-bpChannel backpressurePolicyInCache := meshSpec.GetBackpressurePolicy(meshSvc) Expect(backpressurePolicyInCache).ToNot(BeNil()) @@ -464,10 +500,15 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f err = fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Delete(context.TODO(), backpressurePolicy.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-bpChannel }) It("should return nil when the app label is missing for the given service", func() { + bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded, + announcements.BackpressureDeleted, + announcements.BackpressureUpdated) + defer events.GetPubSubInstance().Unsub(bpChannel) + meshSvc := service.MeshService{ Namespace: testNamespaceName, Name: "test-GetBackpressurePolicy", @@ -488,14 +529,14 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f _, err := fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Create(context.TODO(), backpressurePolicy, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-bpChannel backpressurePolicyInCache := meshSpec.GetBackpressurePolicy(meshSvc) Expect(backpressurePolicyInCache).To(BeNil()) err = fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Delete(context.TODO(), backpressurePolicy.Name, metav1.DeleteOptions{}) Expect(err).ToNot(HaveOccurred()) - <-meshSpec.GetAnnouncementsChannel() + <-bpChannel }) })