Skip to content

Commit

Permalink
Refactored survey (#315)
Browse files Browse the repository at this point in the history
* Refactored survey

* fix merge
  • Loading branch information
kelindar authored May 24, 2020
1 parent cedaa42 commit 2da0d25
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 178 deletions.
25 changes: 18 additions & 7 deletions internal/broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cluster

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -114,6 +115,16 @@ func (s *Swarm) Printf(format string, args ...interface{}) {
}
}

// findPeer retrieves a peer.
func (s *Swarm) findPeer(name mesh.PeerName) *Peer {
peer, added := s.members.GetOrAdd(name)
if added {
s.onPeerOnline(peer)
}

return peer
}

// onPeerOnline occurs when a new peer is created.
func (s *Swarm) onPeerOnline(peer *Peer) {
logging.LogTarget("swarm", "peer created", peer.name)
Expand All @@ -137,14 +148,14 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) {
}
}

// FindPeer retrieves a peer.
func (s *Swarm) FindPeer(name mesh.PeerName) *Peer {
peer, added := s.members.GetOrAdd(name)
if added {
s.onPeerOnline(peer)
// SendTo sends a message to a peer.
func (s *Swarm) SendTo(name mesh.PeerName, msg *message.Message) error {
peer := s.findPeer(name)
if !peer.IsActive() {
return errors.New("swarm: unable to reply to a request, peer is not active")
}

return peer
return peer.Send(msg)
}

// ID returns the local node ID.
Expand Down Expand Up @@ -236,7 +247,7 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {

// Find the active peer for this subscription event
encoded := ev.Encode()
peer := s.FindPeer(mesh.PeerName(ev.Peer))
peer := s.findPeer(mesh.PeerName(ev.Peer))

// If the subscription is added, notify (TODO: use channels)
if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() {
Expand Down
16 changes: 14 additions & 2 deletions internal/broker/cluster/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package cluster

import (
"github.com/emitter-io/emitter/internal/event"
"testing"

"github.com/emitter-io/emitter/internal/config"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/mesh"
Expand Down Expand Up @@ -57,6 +57,7 @@ func TestOnGossipUnicast(t *testing.T) {
}

func TestNewSwarm_Scenario(t *testing.T) {
msg := newTestMessage(message.Ssid{1, 2, 3}, "a/b/c/", "hello abc")
cfg := config.ClusterConfig{
NodeName: "00:00:00:00:00:01",
ListenAddr: ":4000",
Expand All @@ -65,6 +66,8 @@ func TestNewSwarm_Scenario(t *testing.T) {

// Create a new swarm and check if it was constructed well
s := NewSwarm(&cfg)
s.update()

assert.Equal(t, 0, s.NumPeers())
assert.Equal(t, uint64(1), s.ID())
assert.NotNil(t, s.Gossip())
Expand All @@ -86,9 +89,18 @@ func TestNewSwarm_Scenario(t *testing.T) {
assert.Error(t, err)

// Find peer
peer := s.FindPeer(123)
peer := s.findPeer(123)
assert.NotNil(t, peer)

// Send to active peer
err = s.SendTo(123, &msg)
assert.NoError(t, err)

// Send to inactive peer
peer.activity = 0
err = s.SendTo(123, &msg)
assert.Error(t, err)

// Remove that peer, it should not be there
s.onPeerOffline(123)
assert.False(t, s.members.Contains(mesh.PeerName(123)))
Expand Down
31 changes: 11 additions & 20 deletions internal/broker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"time"

"github.com/emitter-io/emitter/internal/broker/keygen"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/contract"
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *Service) newConn(t net.Conn, readRate int) *Conn {
}

// Generate a globally unique id as well
c.guid = c.luid.Unique(uint64(s.LocalName()), "emitter")
c.guid = c.luid.Unique(uint64(s.ID()), "emitter")
if readRate == 0 {
readRate = defaultReadRate
}
Expand Down Expand Up @@ -271,16 +271,13 @@ func (c *Conn) Subscribe(ssid message.Ssid, channel []byte) {

// Add the subscription
if first := c.subs.Increment(ssid, channel); first {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Subscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: ssid,
Channel: channel,
}

c.service.onSubscribe(c, ev) // Subscribe the subscriber
c.service.notifySubscribe(ev) // Broadcast the subscription within our cluster
})
}
}

Expand All @@ -291,16 +288,13 @@ func (c *Conn) Unsubscribe(ssid message.Ssid, channel []byte) {

// Decrement the counter and if there's no more subscriptions, notify everyone.
if last := c.subs.Decrement(ssid); last {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Unsubscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: ssid,
Channel: channel,
}

c.service.onUnsubscribe(c, ev) // Unsubscribe the subscriber
c.service.notifyUnsubscribe(ev) // Broadcast the unsubscription within our cluster
})
}
}

Expand All @@ -313,16 +307,13 @@ func (c *Conn) Close() error {
// Unsubscribe from everything, no need to lock since each Unsubscribe is
// already locked. Locking the 'Close()' would result in a deadlock.
for _, counter := range c.subs.All() {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Unsubscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: counter.Ssid,
Channel: counter.Channel,
}

c.service.onUnsubscribe(c, ev)
c.service.notifyUnsubscribe(ev)
})
}

// Close the transport and decrement the connection counter
Expand Down
2 changes: 1 addition & 1 deletion internal/broker/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Conn) onPublish(packet *mqtt.Publish) *errors.Error {
}

// Iterate through all subscribers and send them the message
size := c.service.publish(msg, func(s message.Subscriber) bool {
size := c.service.Publish(msg, func(s message.Subscriber) bool {
return s.ID() != exclude
})

Expand Down
84 changes: 0 additions & 84 deletions internal/broker/query_test.go

This file was deleted.

Loading

0 comments on commit 2da0d25

Please sign in to comment.