forked from openservicemesh/osm
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
events: Add pubsub global interface for OSM events (openservicemesh#2076
) This commit introduces a glogal interface to pass messages and events, from multiple writers to multiple readers, based on topic. Single channel mechanisms suffers from some coupling still, where the object holding the channel or channel itself has to be passed around. Additionally, we are seeing situations where individual modules could be interested in being notified for certain events so they could run their own isolated logic (mind the isolation), current single channel model poses several quite some issues on that front (need a middleman multiplexer, etc.) This commit doesn't yet introduce a consumer of this API, though in subsequent we will introduce/rework some channel uses that will immediately greatly benefit from this feature (proxy's global boadcasting, configurator, etc.) Addresses openservicemesh#1914
- Loading branch information
Showing
5 changed files
with
145 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package events | ||
|
||
import ( | ||
"github.com/cskr/pubsub" | ||
|
||
"github.com/openservicemesh/osm/pkg/announcements" | ||
) | ||
|
||
const ( | ||
// Default number of events a subscriber channel will buffer | ||
defaultAnnouncementChannelSize = 512 | ||
) | ||
|
||
var ( | ||
// Globally accessible instance, through singleton pattern GetPubSubInstance | ||
pubSubInstance *osmPubsub | ||
) | ||
|
||
// Object which implements the PubSub interface | ||
type osmPubsub struct { | ||
pSub *pubsub.PubSub | ||
} | ||
|
||
// Subscribe is the Subscribe implementation for PubSub | ||
func (c *osmPubsub) Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} { | ||
subTypes := []string{} | ||
for _, v := range aTypes { | ||
subTypes = append(subTypes, string(v)) | ||
} | ||
|
||
return c.pSub.Sub(subTypes...) | ||
} | ||
|
||
// Publish is the Publish implementation for PubSub | ||
func (c *osmPubsub) Publish(message PubSubMessage) { | ||
c.pSub.Pub(message, message.AnnouncementType.String()) | ||
} | ||
|
||
// GetPubSubInstance returns a unique, global scope PubSub interface instance | ||
// Note that spawning the instance is not thread-safe. First call should happen on | ||
// a single-routine context to avoid races. | ||
func GetPubSubInstance() PubSub { | ||
if pubSubInstance == nil { | ||
pubSubInstance = &osmPubsub{ | ||
pSub: pubsub.New(defaultAnnouncementChannelSize), | ||
} | ||
} | ||
return pubSubInstance | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package events | ||
|
||
import ( | ||
"reflect" | ||
"testing" | ||
"time" | ||
|
||
gomock "github.com/golang/mock/gomock" | ||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/openservicemesh/osm/pkg/announcements" | ||
) | ||
|
||
func TestPubSubEvents(t *testing.T) { | ||
assert := assert.New(t) | ||
mockCtrl := gomock.NewController(t) | ||
defer mockCtrl.Finish() | ||
|
||
testCases := []struct { | ||
register announcements.AnnouncementType | ||
publish PubSubMessage | ||
expectMessage bool | ||
}{ | ||
{ | ||
register: announcements.BackpressureAdded, | ||
publish: PubSubMessage{ | ||
AnnouncementType: announcements.ConfigMapAdded, | ||
NewObj: struct{}{}, | ||
OldObj: nil, | ||
}, | ||
expectMessage: false, | ||
}, | ||
{ | ||
register: announcements.BackpressureAdded, | ||
publish: PubSubMessage{ | ||
AnnouncementType: announcements.BackpressureAdded, | ||
NewObj: nil, | ||
OldObj: "randomString", | ||
}, | ||
expectMessage: true, | ||
}, | ||
} | ||
|
||
for i := range testCases { | ||
subscribedChanel := GetPubSubInstance().Subscribe(testCases[i].register) | ||
GetPubSubInstance().Publish(testCases[i].publish) | ||
|
||
select { | ||
case psMesg := <-subscribedChanel: | ||
assert.True(testCases[i].expectMessage) | ||
|
||
psCast, ok := psMesg.(PubSubMessage) | ||
assert.True(ok) | ||
|
||
equal := reflect.DeepEqual(psCast, testCases[i].publish) | ||
assert.True(equal) | ||
|
||
case <-time.After(1 * time.Second): | ||
assert.False(testCases[i].expectMessage) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters