Skip to content

Commit

Permalink
events: Add pubsub global interface for OSM events
Browse files Browse the repository at this point in the history
This commit introduces a glogal interface to pass messages and events,
from multiple writers to multiple readers, based on topic.

Single channel mechanisms suffers from some coupling still, where
the object holding the channel or channel itself has to be passed around.

Additionally, we are seeing situations where individual modules could be
interested in being notified for certain events so they could run
their own isolated logic (mind the isolation), current single channel model
poses several quite some issues on that front (need a middleman multiplexer, etc.)

This commit doesn't yet introduce a consumer of this API, though in subsequent
we will introduce/rework some channel uses that will immediately greatly benefit
from this feature (proxy's global boadcasting, configurator, etc.)

Addresses openservicemesh#1914
  • Loading branch information
eduser25 committed Nov 17, 2020
1 parent e0513d3 commit 7b37e49
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func main() {
}

featureflags.Initialize(optionalFeatures)
events.GetPubSubInstance() // Just to generate the interface, single routine context

// Initialize kube config and client
kubeConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfigFile)
Expand Down
16 changes: 16 additions & 0 deletions pkg/kubernetes/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

a "github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
)

var emitLogs = os.Getenv(constants.EnvVarLogKubernetesEvents) == "true"
Expand Down Expand Up @@ -60,6 +61,11 @@ func GetKubernetesEventHandlers(informerName, providerName string, announce chan
logNotObservedNamespace(obj, eventTypes.Add)
return
}
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: eventTypes.Add,
NewObj: obj,
OldObj: nil,
})
sendAnnouncement(eventTypes.Add, obj)
},

Expand All @@ -68,6 +74,11 @@ func GetKubernetesEventHandlers(informerName, providerName string, announce chan
logNotObservedNamespace(newObj, eventTypes.Update)
return
}
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: eventTypes.Update,
NewObj: oldObj,
OldObj: newObj,
})
sendAnnouncement(eventTypes.Update, oldObj)
},

Expand All @@ -76,6 +87,11 @@ func GetKubernetesEventHandlers(informerName, providerName string, announce chan
logNotObservedNamespace(obj, eventTypes.Delete)
return
}
events.GetPubSubInstance().Publish(events.PubSubMessage{
AnnouncementType: eventTypes.Delete,
NewObj: nil,
OldObj: obj,
})
sendAnnouncement(eventTypes.Delete, obj)
},
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/kubernetes/events/event_pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package events

import (
"github.com/cskr/pubsub"

"github.com/openservicemesh/osm/pkg/announcements"
)

const (
// Default number of events a subscriber channel will buffer
defaultAnnouncementChannelSize = 512
)

var (
// Globally accessible instance, through singleton pattern GetPubSubInstance
pubSubInstance *osmPubsub
)

// Object which implements the PubSub interface
type osmPubsub struct {
pSub *pubsub.PubSub
}

// Subscribe is the Subscribe implementation for PubSub
func (c *osmPubsub) Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} {
subTypes := []string{}
for _, v := range aTypes {
subTypes = append(subTypes, string(v))
}

return c.pSub.Sub(subTypes...)
}

// Publish is the Publish implementation for PubSub
func (c *osmPubsub) Publish(message PubSubMessage) {
c.pSub.Pub(message, message.AnnouncementType.String())
}

// GetPubSubInstance returns a unique, global scope PubSub interface instance
// Note that spawning the instance is not thread-safe. First call should happen on
// a single-routine context to avoid races.
func GetPubSubInstance() PubSub {
if pubSubInstance == nil {
pubSubInstance = &osmPubsub{
pSub: pubsub.New(defaultAnnouncementChannelSize),
}
}
return pubSubInstance
}
62 changes: 62 additions & 0 deletions pkg/kubernetes/events/event_pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package events

import (
"reflect"
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/openservicemesh/osm/pkg/announcements"
)

func TestPubSubEvents(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

testCases := []struct {
register announcements.AnnouncementType
publish PubSubMessage
expectMessage bool
}{
{
register: announcements.BackpressureAdded,
publish: PubSubMessage{
AnnouncementType: announcements.ConfigMapAdded,
NewObj: struct{}{},
OldObj: nil,
},
expectMessage: false,
},
{
register: announcements.BackpressureAdded,
publish: PubSubMessage{
AnnouncementType: announcements.BackpressureAdded,
NewObj: nil,
OldObj: "randomString",
},
expectMessage: true,
},
}

for i := range testCases {
subscribedChanel := GetPubSubInstance().Subscribe(testCases[i].register)
GetPubSubInstance().Publish(testCases[i].publish)

select {
case psMesg := <-subscribedChanel:
assert.True(testCases[i].expectMessage)

psCast, ok := psMesg.(PubSubMessage)
assert.True(ok)

equal := reflect.DeepEqual(psCast, testCases[i].publish)
assert.True(equal)

case <-time.After(1 * time.Second):
assert.False(testCases[i].expectMessage)
}
}
}
17 changes: 17 additions & 0 deletions pkg/kubernetes/events/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/logger"
)

Expand All @@ -24,3 +25,19 @@ const (
// CertificateIssuanceFailure signifies that a request to issue a certificate failed
CertificateIssuanceFailure = "FatalCertificateIssuanceFailure"
)

// PubSubMessage represents a common messages abstraction to pass through the PubSub interface
type PubSubMessage struct {
AnnouncementType announcements.AnnouncementType
OldObj interface{}
NewObj interface{}
}

// PubSub is a simple interface to call for pubsub functionality in front of a pubsub implementation
type PubSub interface {
// Subscribe returns a channel subscribed to the specific type/s of announcement/s passed by parameter
Subscribe(aTypes ...announcements.AnnouncementType) chan interface{}

// Publish publishes the message to all subscribers that have subscribed to <message.AnnouncementType> topic
Publish(message PubSubMessage)
}

0 comments on commit 7b37e49

Please sign in to comment.