Skip to content

Commit

Permalink
pubsub: introduce pubsub chan Unsub(), fix few sync tests (openservic…
Browse files Browse the repository at this point in the history
…emesh#2109)

* pubsub: introduce close, fix few sync tests

- Introduces Close() to pubsub
- adds a Close() test
- moves few tests to use pubsub and not read from individual channels

* CR

- naming: Close() -> Unsub()
- Add release branch references vs main
- use idiomatic channel range to exit on close

Signed-off-by: Eduard Serra <eduser25@gmail.com>
  • Loading branch information
eduser25 authored Nov 23, 2020
1 parent 306dac8 commit 14be75a
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 16 deletions.
4 changes: 4 additions & 0 deletions pkg/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Unsub(serviceChannel)

// Create monitored namespace for this service
testNamespace := &corev1.Namespace{
Expand Down Expand Up @@ -190,6 +191,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Unsub(serviceChannel)
testSvcs := []service.MeshService{
{Name: uuid.New().String(), Namespace: "ns-1"},
{Name: uuid.New().String(), Namespace: "ns-2"},
Expand Down Expand Up @@ -260,9 +262,11 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Unsub(serviceChannel)
podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded,
announcements.PodDeleted,
announcements.PodUpdated)
defer events.GetPubSubInstance().Unsub(podsChannel)

// Create a namespace
testNamespace := &corev1.Namespace{
Expand Down
23 changes: 23 additions & 0 deletions pkg/kubernetes/events/event_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,29 @@ func (c *osmPubsub) Publish(message PubSubMessage) {
c.pSub.Pub(message, message.AnnouncementType.String())
}

// Unsub is the Unsub implementation for PubSub.
// It is synchronized, upon exit the channel is guaranteed to be both
// unsubbed to all topics and closed.
// This is a necessary step to guarantee garbage collection
func (c *osmPubsub) Unsub(unsubChan chan interface{}) {
// implementation has several requirements (including different goroutine context)
// https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L102

syncCh := make(chan struct{})
go func() {
// This will close the channel on the pubsub backend
// https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L264
c.pSub.Unsub(unsubChan)

for range unsubChan {
// Drain channel, read til close
}
syncCh <- struct{}{}
}()

<-syncCh
}

// GetPubSubInstance returns a unique, global scope PubSub interface instance
// Note that spawning the instance is not thread-safe. First call should happen on
// a single-routine context to avoid races.
Expand Down
20 changes: 20 additions & 0 deletions pkg/kubernetes/events/event_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,23 @@ func TestPubSubEvents(t *testing.T) {
}
}
}

func TestPubSubClose(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

subChannel := GetPubSubInstance().Subscribe(announcements.BackpressureUpdated)

// publish something
GetPubSubInstance().Publish(PubSubMessage{
AnnouncementType: announcements.BackpressureUpdated,
})

// make sure channel is drained and closed
GetPubSubInstance().Unsub(subChannel)

// Channel has to have been already emptied and closed
_, ok := <-subChannel
assert.False(ok)
}
5 changes: 5 additions & 0 deletions pkg/kubernetes/events/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ type PubSub interface {

// Publish publishes the message to all subscribers that have subscribed to <message.AnnouncementType> topic
Publish(message PubSubMessage)

// Unsub unsubscribes and closes the channel on pubsub backend
// Note this is a necessary step to ensure a channel can be
// garbage collected when it is freed.
Unsub(unsubChan chan interface{})
}
73 changes: 57 additions & 16 deletions pkg/smi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (

osmPolicy "github.com/openservicemesh/osm/experimental/pkg/apis/policy/v1alpha1"
osmPolicyClient "github.com/openservicemesh/osm/experimental/pkg/client/clientset/versioned/fake"
"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/featureflags"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/tests"
)
Expand Down Expand Up @@ -98,6 +100,11 @@ var _ = Describe("When listing TrafficSplit", func() {
})

It("should return a list of traffic split resources", func() {
tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded,
announcements.TrafficSplitDeleted,
announcements.TrafficSplitUpdated)
defer events.GetPubSubInstance().Unsub(tsChannel)

split := &smiSplit.TrafficSplit{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ListTrafficSplits",
Expand All @@ -120,15 +127,15 @@ var _ = Describe("When listing TrafficSplit", func() {

_, err := fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Create(context.TODO(), split, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-tsChannel

splits := meshSpec.ListTrafficSplits()
Expect(len(splits)).To(Equal(1))
Expect(split).To(Equal(splits[0]))

err = fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Delete(context.TODO(), split.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-tsChannel
})
})

Expand All @@ -144,6 +151,11 @@ var _ = Describe("When listing TrafficSplit services", func() {
})

It("should return a list of weighted services corresponding to the traffic split backends", func() {
tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded,
announcements.TrafficSplitDeleted,
announcements.TrafficSplitUpdated)
defer events.GetPubSubInstance().Unsub(tsChannel)

split := &smiSplit.TrafficSplit{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ListTrafficSplitServices",
Expand All @@ -166,7 +178,7 @@ var _ = Describe("When listing TrafficSplit services", func() {

_, err := fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Create(context.TODO(), split, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-tsChannel

weightedServices := meshSpec.ListTrafficSplitServices()
Expect(len(weightedServices)).To(Equal(len(split.Spec.Backends)))
Expand All @@ -178,7 +190,7 @@ var _ = Describe("When listing TrafficSplit services", func() {

err = fakeClientSet.smiTrafficSplitClientSet.SplitV1alpha2().TrafficSplits(testNamespaceName).Delete(context.TODO(), split.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-tsChannel
})
})

Expand All @@ -194,6 +206,11 @@ var _ = Describe("When listing ServiceAccounts", func() {
})

It("should return a list of service accounts specified in TrafficTarget resources", func() {
ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded,
announcements.TrafficTargetDeleted,
announcements.TrafficTargetUpdated)
defer events.GetPubSubInstance().Unsub(ttChannel)

trafficTarget := &smiAccess.TrafficTarget{
TypeMeta: metav1.TypeMeta{
APIVersion: "access.smi-spec.io/v1alpha2",
Expand Down Expand Up @@ -224,7 +241,7 @@ var _ = Describe("When listing ServiceAccounts", func() {

_, err := fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Create(context.TODO(), trafficTarget, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-ttChannel

svcAccounts := meshSpec.ListServiceAccounts()

Expand All @@ -233,7 +250,7 @@ var _ = Describe("When listing ServiceAccounts", func() {

err = fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Delete(context.TODO(), trafficTarget.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-ttChannel
})
})

Expand All @@ -249,6 +266,11 @@ var _ = Describe("When listing TrafficTargets", func() {
})

It("Returns a list of TrafficTarget resources", func() {
ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded,
announcements.TrafficTargetDeleted,
announcements.TrafficTargetUpdated)
defer events.GetPubSubInstance().Unsub(ttChannel)

trafficTarget := &smiAccess.TrafficTarget{
TypeMeta: metav1.TypeMeta{
APIVersion: "access.smi-spec.io/v1alpha2",
Expand Down Expand Up @@ -279,14 +301,14 @@ var _ = Describe("When listing TrafficTargets", func() {

_, err := fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Create(context.TODO(), trafficTarget, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-ttChannel

targets := meshSpec.ListTrafficTargets()
Expect(len(targets)).To(Equal(1))

err = fakeClientSet.smiTrafficTargetClientSet.AccessV1alpha2().TrafficTargets(testNamespaceName).Delete(context.TODO(), trafficTarget.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-ttChannel
})
})

Expand All @@ -307,6 +329,11 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() {
})

It("should return a list of ListHTTPTrafficSpecs resources", func() {
rgChannel := events.GetPubSubInstance().Subscribe(announcements.RouteGroupAdded,
announcements.RouteGroupDeleted,
announcements.RouteGroupUpdated)
defer events.GetPubSubInstance().Unsub(rgChannel)

routeSpec := &smiSpecs.HTTPRouteGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: "specs.smi-spec.io/v1alpha3",
Expand Down Expand Up @@ -343,15 +370,15 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() {

_, err := fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().HTTPRouteGroups(testNamespaceName).Create(context.TODO(), routeSpec, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-rgChannel

httpRoutes := meshSpec.ListHTTPTrafficSpecs()
Expect(len(httpRoutes)).To(Equal(1))
Expect(httpRoutes[0].Name).To(Equal(routeSpec.Name))

err = fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().HTTPRouteGroups(testNamespaceName).Delete(context.TODO(), routeSpec.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-rgChannel
})
})

Expand All @@ -372,6 +399,10 @@ var _ = Describe("When listing TCP routes", func() {
})

It("should return a list of TCPRoute resources", func() {
trChannel := events.GetPubSubInstance().Subscribe(announcements.TCPRouteAdded,
announcements.TCPRouteDeleted,
announcements.TCPRouteUpdated)
defer events.GetPubSubInstance().Unsub(trChannel)
routeSpec := &smiSpecs.TCPRoute{
TypeMeta: metav1.TypeMeta{
APIVersion: "specs.smi-spec.io/v1alpha2",
Expand All @@ -386,15 +417,15 @@ var _ = Describe("When listing TCP routes", func() {

_, err := fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().TCPRoutes(testNamespaceName).Create(context.TODO(), routeSpec, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-trChannel

tcpRoutes := meshSpec.ListTCPTrafficSpecs()
Expect(len(tcpRoutes)).To(Equal(1))
Expect(tcpRoutes[0].Name).To(Equal(routeSpec.Name))

err = fakeClientSet.smiTrafficSpecClientSet.SpecsV1alpha3().TCPRoutes(testNamespaceName).Delete(context.TODO(), routeSpec.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-trChannel
})
})

Expand Down Expand Up @@ -435,6 +466,11 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f
})

It("should return the Backpresure policy for the given service", func() {
bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded,
announcements.BackpressureDeleted,
announcements.BackpressureUpdated)
defer events.GetPubSubInstance().Unsub(bpChannel)

meshSvc := service.MeshService{
Namespace: testNamespaceName,
Name: "test-GetBackpressurePolicy",
Expand All @@ -456,18 +492,23 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f

_, err := fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Create(context.TODO(), backpressurePolicy, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-bpChannel

backpressurePolicyInCache := meshSpec.GetBackpressurePolicy(meshSvc)
Expect(backpressurePolicyInCache).ToNot(BeNil())
Expect(backpressurePolicyInCache.Name).To(Equal(backpressurePolicy.Name))

err = fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Delete(context.TODO(), backpressurePolicy.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-bpChannel
})

It("should return nil when the app label is missing for the given service", func() {
bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded,
announcements.BackpressureDeleted,
announcements.BackpressureUpdated)
defer events.GetPubSubInstance().Unsub(bpChannel)

meshSvc := service.MeshService{
Namespace: testNamespaceName,
Name: "test-GetBackpressurePolicy",
Expand All @@ -488,14 +529,14 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f

_, err := fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Create(context.TODO(), backpressurePolicy, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-bpChannel

backpressurePolicyInCache := meshSpec.GetBackpressurePolicy(meshSvc)
Expect(backpressurePolicyInCache).To(BeNil())

err = fakeClientSet.osmPolicyClientSet.PolicyV1alpha1().Backpressures(testNamespaceName).Delete(context.TODO(), backpressurePolicy.Name, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
<-meshSpec.GetAnnouncementsChannel()
<-bpChannel
})
})

Expand Down

0 comments on commit 14be75a

Please sign in to comment.