Skip to content

Commit

Permalink
Support new Management service protocol (NetworkMap) (#193)
Browse files Browse the repository at this point in the history
* feature: support new management service protocol

* chore: add more logging to track networkmap serial

* refactor: organize peer update code in engine

* chore: fix lint issues

* refactor: extract Signal client interface

* test: add signal client mock

* refactor: introduce Management Service client interface

* chore: place management and signal clients mocks to respective packages

* test: add Serial test to the engine

* fix: lint issues

* test: unit tests for a networkMapUpdate

* test: unit tests Sync update
  • Loading branch information
braginini authored Jan 18, 2022
1 parent 9a3fba3 commit 5db130a
Show file tree
Hide file tree
Showing 14 changed files with 1,102 additions and 651 deletions.
4 changes: 2 additions & 2 deletions client/cmd/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var (
)

// loginPeer attempts to login to Management Service. If peer wasn't registered, tries the registration flow.
func loginPeer(serverPublicKey wgtypes.Key, client *mgm.Client, setupKey string) (*mgmProto.LoginResponse, error) {
func loginPeer(serverPublicKey wgtypes.Key, client *mgm.GrpcClient, setupKey string) (*mgmProto.LoginResponse, error) {

loginResp, err := client.Login(serverPublicKey)
if err != nil {
Expand All @@ -101,7 +101,7 @@ func loginPeer(serverPublicKey wgtypes.Key, client *mgm.Client, setupKey string)

// registerPeer checks whether setupKey was provided via cmd line and if not then it prompts user to enter a key.
// Otherwise tries to register with the provided setupKey via command line.
func registerPeer(serverPublicKey wgtypes.Key, client *mgm.Client, setupKey string) (*mgmProto.LoginResponse, error) {
func registerPeer(serverPublicKey wgtypes.Key, client *mgm.GrpcClient, setupKey string) (*mgmProto.LoginResponse, error) {

var err error
if setupKey == "" {
Expand Down
4 changes: 2 additions & 2 deletions client/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg
}

// connectToSignal creates Signal Service client and established a connection
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (*signal.Client, error) {
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (*signal.GrpcClient, error) {
var sigTLSEnabled bool
if wtConfig.Signal.Protocol == mgmProto.HostConfig_HTTPS {
sigTLSEnabled = true
Expand All @@ -101,7 +101,7 @@ func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig,
}

// connectToManagement creates Management Services client, establishes a connection, logs-in and gets a global Wiretrustee config (signal, turn, stun hosts, etc)
func connectToManagement(ctx context.Context, managementAddr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*mgm.Client, *mgmProto.LoginResponse, error) {
func connectToManagement(ctx context.Context, managementAddr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*mgm.GrpcClient, *mgmProto.LoginResponse, error) {
log.Debugf("connecting to management server %s", managementAddr)
client, err := mgm.NewClient(ctx, managementAddr, ourPrivateKey, tlsEnabled)
if err != nil {
Expand Down
168 changes: 108 additions & 60 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
mgmProto "github.com/wiretrustee/wiretrustee/management/proto"
signal "github.com/wiretrustee/wiretrustee/signal/client"
sProto "github.com/wiretrustee/wiretrustee/signal/proto"
"github.com/wiretrustee/wiretrustee/util"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"math/rand"
"strings"
Expand Down Expand Up @@ -44,9 +45,9 @@ type EngineConfig struct {
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
type Engine struct {
// signal is a Signal Service client
signal *signal.Client
signal signal.Client
// mgmClient is a Management Service client
mgmClient *mgm.Client
mgmClient mgm.Client
// peerConns is a map that holds all the peers that are known to this peer
peerConns map[string]*peer.Conn

Expand All @@ -64,6 +65,9 @@ type Engine struct {
ctx context.Context

wgInterface iface.WGIface

// networkSerial is the latest Serial (state ID) of the network sent by the Management service
networkSerial uint64
}

// Peer is an instance of the Connection Peer
Expand All @@ -73,25 +77,26 @@ type Peer struct {
}

// NewEngine creates a new Connection Engine
func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
func NewEngine(signalClient signal.Client, mgmClient mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
return &Engine{
signal: signalClient,
mgmClient: mgmClient,
peerConns: map[string]*peer.Conn{},
syncMsgMux: &sync.Mutex{},
config: config,
STUNs: []*ice.URL{},
TURNs: []*ice.URL{},
cancel: cancel,
ctx: ctx,
signal: signalClient,
mgmClient: mgmClient,
peerConns: map[string]*peer.Conn{},
syncMsgMux: &sync.Mutex{},
config: config,
STUNs: []*ice.URL{},
TURNs: []*ice.URL{},
cancel: cancel,
ctx: ctx,
networkSerial: 0,
}
}

func (e *Engine) Stop() error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

err := e.removeAllPeerConnections()
err := e.removeAllPeers()
if err != nil {
return err
}
Expand Down Expand Up @@ -146,8 +151,22 @@ func (e *Engine) Start() error {
return nil
}

func (e *Engine) removePeers(peers []string) error {
for _, p := range peers {
// removePeers finds and removes peers that do not exist anymore in the network map received from the Management Service
func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error {

currentPeers := make([]string, 0, len(e.peerConns))
for p := range e.peerConns {
currentPeers = append(currentPeers, p)
}

newPeers := make([]string, 0, len(peersUpdate))
for _, p := range peersUpdate {
newPeers = append(newPeers, p.GetWgPubKey())
}

toRemove := util.SliceDiff(currentPeers, newPeers)

for _, p := range toRemove {
err := e.removePeer(p)
if err != nil {
return err
Expand All @@ -157,7 +176,7 @@ func (e *Engine) removePeers(peers []string) error {
return nil
}

func (e *Engine) removeAllPeerConnections() error {
func (e *Engine) removeAllPeers() error {
log.Debugf("removing all peer connections")
for p := range e.peerConns {
err := e.removePeer(p)
Expand Down Expand Up @@ -189,6 +208,16 @@ func (e *Engine) GetPeerConnectionStatus(peerKey string) peer.ConnStatus {

return -1
}
func (e *Engine) GetPeers() []string {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

peers := []string{}
for s := range e.peerConns {
peers = append(peers, s)
}
return peers
}

// GetConnectedPeers returns a connection Status or nil if peer connection wasn't found
func (e *Engine) GetConnectedPeers() []string {
Expand All @@ -205,7 +234,7 @@ func (e *Engine) GetConnectedPeers() []string {
return peers
}

func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error {
err := s.Send(&sProto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey.String(),
Expand All @@ -223,7 +252,7 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp
return nil
}

func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client, isAnswer bool) error {
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error {

var t sProto.Body_Type
if isAnswer {
Expand All @@ -246,37 +275,42 @@ func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.K
return nil
}

// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
go func() {
err := e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()

if update.GetWiretrusteeConfig() != nil {
err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns())
if err != nil {
return err
}
if update.GetWiretrusteeConfig() != nil {
err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns())
if err != nil {
return err
}

err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns())
if err != nil {
return err
}
err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns())
if err != nil {
return err
}

//todo update signal
}
//todo update signal
}

if update.GetRemotePeers() != nil || update.GetRemotePeersIsEmpty() {
// empty arrays are serialized by protobuf to null, but for our case empty array is a valid state.
err := e.updatePeers(update.GetRemotePeers())
if err != nil {
return err
}
}
if update.GetNetworkMap() != nil {
// only apply new changes and ignore old ones
err := e.updateNetworkMap(update.GetNetworkMap())
if err != nil {
return err
}
}

return nil
return nil

}

// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
go func() {
err := e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error {
return e.handleSync(update)
})
if err != nil {
// happens if management is unavailable for a long time.
Expand Down Expand Up @@ -327,27 +361,41 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
return nil
}

func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(remotePeers))
remotePeerMap := make(map[string]struct{})
for _, p := range remotePeers {
remotePeerMap[p.GetWgPubKey()] = struct{}{}
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {

serial := networkMap.GetSerial()
if e.networkSerial > serial {
log.Debugf("received outdated NetworkMap with serial %d, ignoring", serial)
return nil
}

//remove peers that are no longer available for us
toRemove := []string{}
for p := range e.peerConns {
if _, ok := remotePeerMap[p]; !ok {
toRemove = append(toRemove, p)
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))

// cleanup request, most likely our peer has been deleted
if networkMap.GetRemotePeersIsEmpty() {
err := e.removeAllPeers()
if err != nil {
return err
}
} else {
err := e.removePeers(networkMap.GetRemotePeers())
if err != nil {
return err
}

err = e.addNewPeers(networkMap.GetRemotePeers())
if err != nil {
return err
}
}
err := e.removePeers(toRemove)
if err != nil {
return err
}

// add new peers
for _, p := range remotePeers {
e.networkSerial = serial
return nil
}

// addNewPeers finds and adds peers that were not know before but arrived from the Management service with the update
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
for _, p := range peersUpdate {
peerKey := p.GetWgPubKey()
peerIPs := p.GetAllowedIps()
if _, ok := e.peerConns[peerKey]; !ok {
Expand Down
Loading

0 comments on commit 5db130a

Please sign in to comment.