Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
CR
Browse files Browse the repository at this point in the history
- naming: Close() -> Unsub()
- Add release branch references vs main
- use idiomatic channel range to exit on close
  • Loading branch information
eduser25 committed Nov 23, 2020
1 parent b970123 commit 2a0ac7f
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 25 deletions.
8 changes: 4 additions & 4 deletions pkg/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Close(serviceChannel)
defer events.GetPubSubInstance().Unsub(serviceChannel)

// Create monitored namespace for this service
testNamespace := &corev1.Namespace{
Expand Down Expand Up @@ -191,7 +191,7 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Close(serviceChannel)
defer events.GetPubSubInstance().Unsub(serviceChannel)
testSvcs := []service.MeshService{
{Name: uuid.New().String(), Namespace: "ns-1"},
{Name: uuid.New().String(), Namespace: "ns-2"},
Expand Down Expand Up @@ -262,11 +262,11 @@ var _ = Describe("Test Namespace KubeController Methods", func() {
serviceChannel := events.GetPubSubInstance().Subscribe(announcements.ServiceAdded,
announcements.ServiceDeleted,
announcements.ServiceUpdated)
defer events.GetPubSubInstance().Close(serviceChannel)
defer events.GetPubSubInstance().Unsub(serviceChannel)
podsChannel := events.GetPubSubInstance().Subscribe(announcements.PodAdded,
announcements.PodDeleted,
announcements.PodUpdated)
defer events.GetPubSubInstance().Close(podsChannel)
defer events.GetPubSubInstance().Unsub(podsChannel)

// Create a namespace
testNamespace := &corev1.Namespace{
Expand Down
16 changes: 6 additions & 10 deletions pkg/kubernetes/events/event_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,22 @@ func (c *osmPubsub) Publish(message PubSubMessage) {
c.pSub.Pub(message, message.AnnouncementType.String())
}

// Close is the Close implementation for PubSub.
// Unsub is the Unsub implementation for PubSub.
// It is synchronized, upon exit the channel is guaranteed to be both
// unsubbed to all topics and closed.
// This is a necessary step to guarantee garbage collection
func (c *osmPubsub) Close(unsubChan chan interface{}) {
func (c *osmPubsub) Unsub(unsubChan chan interface{}) {
// implementation has several requirements (including different goroutine context)
// https://github.com/cskr/pubsub/blob/master/pubsub.go#L102
// https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L102

syncCh := make(chan struct{})
go func() {
// This should close the channel on the pubsub backend
// https://github.com/cskr/pubsub/blob/master/pubsub.go#L264
// https://github.com/cskr/pubsub/blob/v1.0.2/pubsub.go#L264
c.pSub.Unsub(unsubChan)

for {
_, ok := <-unsubChan
if !ok {
// Channel closed from pubsub
break
}
for range unsubChan {
// Drain channel, read til close
}
syncCh <- struct{}{}
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubernetes/events/event_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestPubSubClose(t *testing.T) {
})

// make sure channel is drained and closed
GetPubSubInstance().Close(subChannel)
GetPubSubInstance().Unsub(subChannel)

// Channel has to have been already emptied and closed
_, ok := <-subChannel
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubernetes/events/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type PubSub interface {
// Publish publishes the message to all subscribers that have subscribed to <message.AnnouncementType> topic
Publish(message PubSubMessage)

// Close unsubscribes and closes the channel on pubsub backend
// Unsub unsubscribes and closes the channel on pubsub backend
// Note this is a necessary step to ensure a channel can be
// garbage collected when it is freed.
Close(unsubChan chan interface{})
Unsub(unsubChan chan interface{})
}
16 changes: 8 additions & 8 deletions pkg/smi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var _ = Describe("When listing TrafficSplit", func() {
tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded,
announcements.TrafficSplitDeleted,
announcements.TrafficSplitUpdated)
defer events.GetPubSubInstance().Close(tsChannel)
defer events.GetPubSubInstance().Unsub(tsChannel)

split := &smiSplit.TrafficSplit{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -154,7 +154,7 @@ var _ = Describe("When listing TrafficSplit services", func() {
tsChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficSplitAdded,
announcements.TrafficSplitDeleted,
announcements.TrafficSplitUpdated)
defer events.GetPubSubInstance().Close(tsChannel)
defer events.GetPubSubInstance().Unsub(tsChannel)

split := &smiSplit.TrafficSplit{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -209,7 +209,7 @@ var _ = Describe("When listing ServiceAccounts", func() {
ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded,
announcements.TrafficTargetDeleted,
announcements.TrafficTargetUpdated)
defer events.GetPubSubInstance().Close(ttChannel)
defer events.GetPubSubInstance().Unsub(ttChannel)

trafficTarget := &smiAccess.TrafficTarget{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -269,7 +269,7 @@ var _ = Describe("When listing TrafficTargets", func() {
ttChannel := events.GetPubSubInstance().Subscribe(announcements.TrafficTargetAdded,
announcements.TrafficTargetDeleted,
announcements.TrafficTargetUpdated)
defer events.GetPubSubInstance().Close(ttChannel)
defer events.GetPubSubInstance().Unsub(ttChannel)

trafficTarget := &smiAccess.TrafficTarget{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -332,7 +332,7 @@ var _ = Describe("When listing ListHTTPTrafficSpecs", func() {
rgChannel := events.GetPubSubInstance().Subscribe(announcements.RouteGroupAdded,
announcements.RouteGroupDeleted,
announcements.RouteGroupUpdated)
defer events.GetPubSubInstance().Close(rgChannel)
defer events.GetPubSubInstance().Unsub(rgChannel)

routeSpec := &smiSpecs.HTTPRouteGroup{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -402,7 +402,7 @@ var _ = Describe("When listing TCP routes", func() {
trChannel := events.GetPubSubInstance().Subscribe(announcements.TCPRouteAdded,
announcements.TCPRouteDeleted,
announcements.TCPRouteUpdated)
defer events.GetPubSubInstance().Close(trChannel)
defer events.GetPubSubInstance().Unsub(trChannel)
routeSpec := &smiSpecs.TCPRoute{
TypeMeta: metav1.TypeMeta{
APIVersion: "specs.smi-spec.io/v1alpha2",
Expand Down Expand Up @@ -469,7 +469,7 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f
bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded,
announcements.BackpressureDeleted,
announcements.BackpressureUpdated)
defer events.GetPubSubInstance().Close(bpChannel)
defer events.GetPubSubInstance().Unsub(bpChannel)

meshSvc := service.MeshService{
Namespace: testNamespaceName,
Expand Down Expand Up @@ -507,7 +507,7 @@ var _ = Describe("When fetching BackpressurePolicy for the given MeshService", f
bpChannel := events.GetPubSubInstance().Subscribe(announcements.BackpressureAdded,
announcements.BackpressureDeleted,
announcements.BackpressureUpdated)
defer events.GetPubSubInstance().Close(bpChannel)
defer events.GetPubSubInstance().Unsub(bpChannel)

meshSvc := service.MeshService{
Namespace: testNamespaceName,
Expand Down

0 comments on commit 2a0ac7f

Please sign in to comment.