Skip to content

Commit

Permalink
Add the ability to register and use more than one StackExchange
Browse files Browse the repository at this point in the history
rel to: #3
  • Loading branch information
kataras committed Jul 11, 2019
1 parent 6031b3f commit 860a59d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 20 deletions.
8 changes: 7 additions & 1 deletion _examples/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ func startServer() {
if err != nil {
panic(err)
}
server.StackExchange = exc
server.UseStackExchange(exc)
// The server.StackExchange field is also exported
// so users can directly use or/and test their registered
// implementations all together.
// This is possible because a wrapper is in-place
// when you register more than one stack exchanges
// on the same neffos server instance.

server.IDGenerator = func(w http.ResponseWriter, r *http.Request) string {
if userID := r.Header.Get("X-Username"); userID != "" {
Expand Down
12 changes: 4 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,18 +572,14 @@ func (c *Conn) notifyNamespaceConnected(ns *NSConn, connectMsg Message) {
connectMsg.Event = OnNamespaceConnected
ns.events.fireEvent(ns, connectMsg) // omit error, it's connected.

if !c.IsClient() {
if c.server.StackExchange != nil {
c.server.StackExchange.Subscribe(c, ns.namespace)
}
if !c.IsClient() && c.server.usesStackExchange() {
c.server.StackExchange.Subscribe(c, ns.namespace)
}
}

func (c *Conn) notifyNamespaceDisconnect(ns *NSConn, disconnectMsg Message) {
if !c.IsClient() {
if c.server.StackExchange != nil {
c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace)
}
if !c.IsClient() && c.server.usesStackExchange() {
c.server.StackExchange.Unsubscribe(c, disconnectMsg.Namespace)
}
}

Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
Expand All @@ -9,8 +8,5 @@ github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/iris-contrib/go.uuid v2.0.0+incompatible h1:XZubAYg61/JwnJNbZilGjf3b3pB80+OQg2qf6c8BfWE=
github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0=
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.3.0 h1:oacPXPKHJg0hcngVVrdtTnfGJiS+PtwoQwTBZGFlV4k=
github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
43 changes: 36 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,32 @@ func New(upgrader Upgrader, connHandler ConnHandler) *Server {
return s
}

// UseStackExchange can be used to add one or more StackExchange
// to the server.
// Returns a non-nil error when "exc"
// completes the `StackExchangeInitializer` interface and its `Init` failed.
//
// Read more at the `StackExchange` type's docs.
func (s *Server) UseStackExchange(exc StackExchange) error {
if err := stackExchangeInit(exc, s.namespaces); err != nil {
return err
}

if s.usesStackExchange() {
s.StackExchange = wrapStackExchanges(s.StackExchange, exc)
} else {
s.StackExchange = exc
}

return nil
}

// usesStackExchange reports whether this server
// uses one or more `StackExchange`s.
func (s *Server) usesStackExchange() bool {
return s.StackExchange != nil
}

func (s *Server) start() {
atomic.StoreUint32(&s.closed, 0)

Expand All @@ -130,7 +156,7 @@ func (s *Server) start() {
s.OnDisconnect(c)
}

if s.StackExchange != nil {
if s.usesStackExchange() {
s.StackExchange.OnDisconnect(c)
}
}
Expand Down Expand Up @@ -275,10 +301,13 @@ func (s *Server) Upgrade(
c.ReconnectTries, _ = strconv.Atoi(retriesHeaderValue)
}

go func(c *Conn) {
for s.waitMessage(c) {
}
}(c)
if !s.usesStackExchange() {
// fire neffos broadcaster when no exchangers are registered.
go func(c *Conn) {
for s.waitMessage(c) {
}
}(c)
}

s.connect <- c

Expand Down Expand Up @@ -317,7 +346,7 @@ func (s *Server) Upgrade(
}
}

if s.StackExchange != nil {
if s.usesStackExchange() {
if err := s.StackExchange.OnConnect(c); err != nil {
c.readiness.unwait(err)
return nil, err
Expand Down Expand Up @@ -443,7 +472,7 @@ func (s *Server) Broadcast(exceptSender fmt.Stringer, msg Message) {

// s.broadcastCond.Broadcast()

if s.StackExchange != nil {
if s.usesStackExchange() {
s.StackExchange.Publish(msg)
return
}
Expand Down
68 changes: 68 additions & 0 deletions stackexchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,71 @@ type StackExchange interface {
// it's called automatically on neffos namespace disconnect.
Unsubscribe(c *Conn, namespace string) // should close the subscriber.
}

// StackExchangeInitializer is an optional interface for a `StackExchange`.
// It contains a single `Init` method which accepts
// the registered server namespaces and returns error.
// It does not called on manual `Server.StackExchange` field set,
// use the `Server.UseStackExchange` to make sure that this implementation is respected.
type StackExchangeInitializer interface {
// Init should initialize a stackexchange, it's optional.
Init(Namespaces) error
}

func stackExchangeInit(s StackExchange, namespaces Namespaces) error {
if s != nil {
if sinit, ok := s.(StackExchangeInitializer); ok {
return sinit.Init(namespaces)
}
}

return nil
}

// internal use only when more than one stack exchanges are registered.
type stackExchangeWrapper struct {
// read-only fields.
parent StackExchange
current StackExchange
}

func wrapStackExchanges(existingExc StackExchange, newExc StackExchange) StackExchange {
return &stackExchangeWrapper{
parent: existingExc,
current: newExc,
}
}

func (s *stackExchangeWrapper) OnConnect(c *Conn) error {
// return on first error, do not wrap errors,
// the server should NOT run if at least one is errored.
err := s.parent.OnConnect(c)
if err != nil {
return err
}

return s.current.OnConnect(c)
}

func (s *stackExchangeWrapper) OnDisconnect(c *Conn) {
s.parent.OnDisconnect(c)
s.current.OnDisconnect(c)
}

func (s *stackExchangeWrapper) Publish(msg Message) bool {
// keep try on the next but return false on any failure.
okParent := s.parent.Publish(msg)
okCurrent := s.current.Publish(msg)

return okParent && okCurrent
}

func (s *stackExchangeWrapper) Subscribe(c *Conn, namespace string) {
s.parent.Subscribe(c, namespace)
s.current.Subscribe(c, namespace)
}

func (s *stackExchangeWrapper) Unsubscribe(c *Conn, namespace string) {
s.parent.Unsubscribe(c, namespace)
s.current.Unsubscribe(c, namespace)
}

0 comments on commit 860a59d

Please sign in to comment.