Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [Signal] synchronize peer registry #27

Merged
merged 1 commit into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions signal/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/signal/proto"
"sync"
)

// Peer representation of a connected Peer
Expand All @@ -25,32 +26,46 @@ func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
// Registry registry that holds all currently connected Peers
type Registry struct {
// Peer.key -> Peer
Peers map[string]*Peer
Peers sync.Map
}

// NewRegistry creates a new connected Peer registry
func NewRegistry() *Registry {
return &Registry{
Peers: make(map[string]*Peer),
return &Registry{}
}

// Get gets a peer from the registry
func (registry *Registry) Get(peerId string) (*Peer, bool) {
if load, ok := registry.Peers.Load(peerId); ok {
return load.(*Peer), ok
}
return nil, false

}

// Register registers peer in the registry
func (reg *Registry) Register(peer *Peer) {
if _, exists := reg.Peers[peer.Id]; exists {
log.Warnf("peer [%s] has been already registered", peer.Id)
} else {
log.Printf("registering new peer [%s]", peer.Id)
func (registry *Registry) IsPeerRegistered(peerId string) bool {
if _, ok := registry.Peers.Load(peerId); ok {
return ok
}
//replace Peer even if exists
//todo should we really replace?
reg.Peers[peer.Id] = peer
return false
}

// Register registers peer in the registry
func (registry *Registry) Register(peer *Peer) {
// can be that peer already exists but it is fine (e.g. reconnect)
// todo investigate what happens to the old peer (especially Peer.Stream) when we override it
registry.Peers.Store(peer.Id, peer)
log.Printf("registered peer [%s]", peer.Id)

}

// Deregister deregister Peer from the Registry (usually once it disconnects)
func (reg *Registry) Deregister(peer *Peer) {
if _, ok := reg.Peers[peer.Id]; ok {
delete(reg.Peers, peer.Id)
func (registry *Registry) Deregister(peer *Peer) {
_, loaded := registry.Peers.LoadAndDelete(peer.Id)
if loaded {
log.Printf("deregistered peer [%s]", peer.Id)
} else {
log.Warnf("attempted to remove non-existent peer [%s]", peer.Id)
}

}
30 changes: 18 additions & 12 deletions signal/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,32 @@ import (
"testing"
)

func TestRegistry_GetNonExistentPeer(t *testing.T) {
r := NewRegistry()

peer, ok := r.Get("non_existent_peer")

if peer != nil {
t.Errorf("expected non_existent_peer not found in the registry")
}

if ok {
t.Errorf("expected non_existent_peer not found in the registry")
}
}

func TestRegistry_Register(t *testing.T) {
r := NewRegistry()
peer1 := NewPeer("test_peer_1", nil)
peer2 := NewPeer("test_peer_2", nil)
r.Register(peer1)
r.Register(peer2)

if len(r.Peers) != 2 {
t.Errorf("expected 2 registered peers")
}

if _, ok := r.Peers["test_peer_1"]; !ok {
if _, ok := r.Get("test_peer_1"); !ok {
t.Errorf("expected test_peer_1 not found in the registry")
}

if _, ok := r.Peers["test_peer_2"]; !ok {
if _, ok := r.Get("test_peer_2"); !ok {
t.Errorf("expected test_peer_2 not found in the registry")
}
}
Expand All @@ -33,15 +43,11 @@ func TestRegistry_Deregister(t *testing.T) {

r.Deregister(peer1)

if len(r.Peers) != 1 {
t.Errorf("expected 1 registered peers after deregistring")
}

if _, ok := r.Peers["test_peer_1"]; ok {
if _, ok := r.Get("test_peer_1"); ok {
t.Errorf("expected test_peer_1 to absent in the registry after deregistering")
}

if _, ok := r.Peers["test_peer_2"]; !ok {
if _, ok := r.Get("test_peer_2"); !ok {
t.Errorf("expected test_peer_2 not found in the registry")
}

Expand Down
6 changes: 3 additions & 3 deletions signal/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func NewServer() *SignalExchangeServer {
// Send forwards a message to the signal peer
func (s *SignalExchangeServer) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {

if _, found := s.registry.Peers[msg.Key]; !found {
if !s.registry.IsPeerRegistered(msg.Key) {
return nil, fmt.Errorf("unknown peer %s", msg.Key)
}

if dstPeer, found := s.registry.Peers[msg.RemoteKey]; found {
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
//forward the message to the target peer
err := dstPeer.Stream.Send(msg)
if err != nil {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (s *SignalExchangeServer) ConnectStream(stream proto.SignalExchange_Connect
}
log.Debugf("received a new message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
// lookup the target peer where the message is going to
if dstPeer, found := s.registry.Peers[msg.RemoteKey]; found {
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
//forward the message to the target peer
err := dstPeer.Stream.Send(msg)
if err != nil {
Expand Down