diff --git a/client/cmd/testutil.go b/client/cmd/testutil.go index da76ee73ad4..d9c72517a3e 100644 --- a/client/cmd/testutil.go +++ b/client/cmd/testutil.go @@ -37,8 +37,8 @@ func startManagement(config *mgmt.Config, t *testing.T) (*grpc.Server, net.Liste t.Fatal(err) } - accountManager := mgmt.NewManager(store) peersUpdateManager := mgmt.NewPeersUpdateManager() + accountManager := mgmt.NewManager(store, peersUpdateManager) turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager) if err != nil { diff --git a/client/cmd/up.go b/client/cmd/up.go index 4e7e15a7e10..5d2469f82e8 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -5,7 +5,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/wiretrustee/wiretrustee/client/internal" - "github.com/wiretrustee/wiretrustee/iface" mgm "github.com/wiretrustee/wiretrustee/management/client" mgmProto "github.com/wiretrustee/wiretrustee/management/proto" signal "github.com/wiretrustee/wiretrustee/signal/client" @@ -38,8 +37,8 @@ var ( log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error()) return err } - - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mgmTlsEnabled := false if config.ManagementURL.Scheme == "https" { @@ -67,7 +66,7 @@ var ( } // create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers. - engine := internal.NewEngine(signalClient, mgmClient, engineConfig) + engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel) err = engine.Start() if err != nil { log.Errorf("error while starting Wiretrustee Connection Engine: %s", err) @@ -75,7 +74,12 @@ var ( } SetupCloseHandler() - <-stopCh + + select { + case <-stopCh: + case <-ctx.Done(): + } + log.Infof("receive signal to stop running") err = mgmClient.Close() if err != nil { @@ -88,10 +92,9 @@ var ( return err } - log.Debugf("removing Wiretrustee interface %s", config.WgIface) - err = iface.Close() + err = engine.Stop() if err != nil { - log.Errorf("failed closing Wiretrustee interface %s %v", config.WgIface, err) + log.Errorf("failed stopping engine %v", err) return err } diff --git a/client/internal/engine.go b/client/internal/engine.go index 3684de7fa25..d166836b63f 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1,6 +1,7 @@ package internal import ( + "context" "fmt" "github.com/cenkalti/backoff/v4" ice "github.com/pion/ice/v2" @@ -54,6 +55,8 @@ type Engine struct { STUNs []*ice.URL // TURNs is a list of STUN servers used by ICE TURNs []*ice.URL + + cancel context.CancelFunc } // Peer is an instance of the Connection Peer @@ -63,7 +66,7 @@ type Peer struct { } // NewEngine creates a new Connection Engine -func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig) *Engine { +func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc) *Engine { return &Engine{ signal: signalClient, mgmClient: mgmClient, @@ -73,7 +76,19 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin config: config, STUNs: []*ice.URL{}, TURNs: []*ice.URL{}, + cancel: cancel, + } +} + +func (e *Engine) Stop() error { + log.Debugf("removing Wiretrustee interface %s", e.config.WgIface) + err := iface.Close() + if err != nil { + log.Errorf("failed closing Wiretrustee interface %s %v", e.config.WgIface, err) + return err } + + return nil } // Start creates a new Wireguard tunnel interface and listens to events from Signal and Management services @@ -262,36 +277,42 @@ func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.K // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service // E.g. when a new peer has been registered and we are allowed to connect to it. func (e *Engine) receiveManagementEvents() { - - log.Debugf("connecting to Management Service updates stream") - - e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error { - e.syncMsgMux.Lock() - defer e.syncMsgMux.Unlock() - - if update.GetWiretrusteeConfig() != nil { - err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns()) - if err != nil { - return err + go func() { + err := e.mgmClient.Sync(func(update *mgmProto.SyncResponse) error { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() + + if update.GetWiretrusteeConfig() != nil { + err := e.updateTURNs(update.GetWiretrusteeConfig().GetTurns()) + if err != nil { + return err + } + + err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns()) + if err != nil { + return err + } + + //todo update signal } - err = e.updateSTUNs(update.GetWiretrusteeConfig().GetStuns()) - if err != nil { - return err + if update.GetRemotePeers() != nil || update.GetRemotePeersIsEmpty() { + // empty arrays are serialized by protobuf to null, but for our case empty array is a valid state. + err := e.updatePeers(update.GetRemotePeers()) + if err != nil { + return err + } } - //todo update signal - } - - err := e.updatePeers(update.GetRemotePeers()) + return nil + }) if err != nil { - return err + e.cancel() + return } - - return nil - }) - - log.Infof("connected to Management Service updates stream") + log.Infof("connected to Management Service updates stream") + }() + log.Debugf("connecting to Management Service updates stream") } func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error { @@ -333,10 +354,6 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error { } func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error { - if len(remotePeers) == 0 { - return nil - } - log.Debugf("got peers update from Management Service, updating") remotePeerMap := make(map[string]struct{}) for _, peer := range remotePeers { diff --git a/management/client/client.go b/management/client/client.go index 747f4d081c2..d011a6af677 100644 --- a/management/client/client.go +++ b/management/client/client.go @@ -64,54 +64,61 @@ func (c *Client) Close() error { return c.conn.Close() } +//defaultBackoff is a basic backoff mechanism for general issues +func defaultBackoff() backoff.BackOff { + return &backoff.ExponentialBackOff{ + InitialInterval: 800 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } +} + // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages -// Non blocking request (executed in go routine). The result will be sent via msgHandler callback function -func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) { - - go func() { - - var backOff = &backoff.ExponentialBackOff{ - InitialInterval: 800 * time.Millisecond, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 3 * time.Second, - MaxElapsedTime: time.Duration(0), //never stop retrying - Stop: backoff.Stop, - Clock: backoff.SystemClock, +// Blocking request. The result will be sent via msgHandler callback function +func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { + + var backOff = defaultBackoff() + + operation := func() error { + + // todo we already have it since we did the Login, maybe cache it locally? + serverPubKey, err := c.GetServerPublicKey() + if err != nil { + log.Errorf("failed getting Management Service public key: %s", err) + return err } - operation := func() error { - - // todo we already have it since we did the Login, maybe cache it locally? - serverPubKey, err := c.GetServerPublicKey() - if err != nil { - log.Errorf("failed getting Management Service public key: %s", err) - return err - } - - stream, err := c.connectToStream(*serverPubKey) - if err != nil { - log.Errorf("failed to open Management Service stream: %s", err) - return err - } - - log.Infof("connected to the Management Service Stream") - - // blocking until error - err = c.receiveEvents(stream, *serverPubKey, msgHandler) - if err != nil { - return err - } - backOff.Reset() - return nil + stream, err := c.connectToStream(*serverPubKey) + if err != nil { + log.Errorf("failed to open Management Service stream: %s", err) + return err } - err := backoff.Retry(operation, backOff) + log.Infof("connected to the Management Service Stream") + + // blocking until error + err = c.receiveEvents(stream, *serverPubKey, msgHandler) if err != nil { - log.Errorf("failed communicating with Management Service %s ", err) - return + /*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied { + //todo handle differently?? + }*/ + return err } - }() + backOff.Reset() + return nil + } + + err := backoff.Retry(operation, backOff) + if err != nil { + log.Errorf("exiting Management Service connection retry loop due to unrecoverable error %s ", err) + return err + } + + return nil } func (c *Client) connectToStream(serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) { @@ -138,7 +145,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server return err } if err != nil { - log.Errorf("disconnected from Management Service syn stream: %v", err) + log.Errorf("disconnected from Management Service sync stream: %v", err) return err } diff --git a/management/client/client_test.go b/management/client/client_test.go index dd8897b8ffe..f6efbac627a 100644 --- a/management/client/client_test.go +++ b/management/client/client_test.go @@ -60,8 +60,8 @@ func startManagement(config *mgmt.Config, t *testing.T) (*grpc.Server, net.Liste t.Fatal(err) } - accountManager := mgmt.NewManager(store) peersUpdateManager := mgmt.NewPeersUpdateManager() + accountManager := mgmt.NewManager(store, peersUpdateManager) turnManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) mgmtServer, err := mgmt.NewServer(config, accountManager, peersUpdateManager, turnManager) if err != nil { @@ -146,10 +146,15 @@ func TestClient_Sync(t *testing.T) { ch := make(chan *mgmtProto.SyncResponse, 1) - tested.Sync(func(msg *mgmtProto.SyncResponse) error { - ch <- msg - return nil - }) + go func() { + err = tested.Sync(func(msg *mgmtProto.SyncResponse) error { + ch <- msg + return nil + }) + if err != nil { + return + } + }() select { case resp := <-ch: @@ -162,6 +167,9 @@ func TestClient_Sync(t *testing.T) { if len(resp.GetRemotePeers()) != 1 { t.Errorf("expecting RemotePeers size %d got %d", 1, len(resp.GetRemotePeers())) } + if resp.GetRemotePeersIsEmpty() == true { + t.Error("expecting RemotePeers property to be false, got true") + } if resp.GetRemotePeers()[0].GetWgPubKey() != remoteKey.PublicKey().String() { t.Errorf("expecting RemotePeer public key %s got %s", remoteKey.PublicKey().String(), resp.GetRemotePeers()[0].GetWgPubKey()) } diff --git a/management/cmd/management.go b/management/cmd/management.go index d3455567aaa..a1e26fd448c 100644 --- a/management/cmd/management.go +++ b/management/cmd/management.go @@ -64,7 +64,8 @@ var ( if err != nil { log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) } - accountManager := server.NewManager(store) + peersUpdateManager := server.NewPeersUpdateManager() + accountManager := server.NewManager(store, peersUpdateManager) var opts []grpc.ServerOption @@ -81,7 +82,6 @@ var ( opts = append(opts, grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) grpcServer := grpc.NewServer(opts...) - peersUpdateManager := server.NewPeersUpdateManager() turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) server, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) if err != nil { diff --git a/management/proto/management.pb.go b/management/proto/management.pb.go index e4fc1b42731..eb006dd6b30 100644 --- a/management/proto/management.pb.go +++ b/management/proto/management.pb.go @@ -181,6 +181,8 @@ type SyncResponse struct { WiretrusteeConfig *WiretrusteeConfig `protobuf:"bytes,1,opt,name=wiretrusteeConfig,proto3" json:"wiretrusteeConfig,omitempty"` PeerConfig *PeerConfig `protobuf:"bytes,2,opt,name=peerConfig,proto3" json:"peerConfig,omitempty"` RemotePeers []*RemotePeerConfig `protobuf:"bytes,3,rep,name=remotePeers,proto3" json:"remotePeers,omitempty"` + // Indicates whether remotePeers array is empty or not to bypass protobuf null and empty array equality. + RemotePeersIsEmpty bool `protobuf:"varint,4,opt,name=remotePeersIsEmpty,proto3" json:"remotePeersIsEmpty,omitempty"` } func (x *SyncResponse) Reset() { @@ -236,6 +238,13 @@ func (x *SyncResponse) GetRemotePeers() []*RemotePeerConfig { return nil } +func (x *SyncResponse) GetRemotePeersIsEmpty() bool { + if x != nil { + return x.RemotePeersIsEmpty + } + return false +} + type LoginRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -860,7 +869,7 @@ var file_management_proto_rawDesc = []byte{ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x67, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x22, 0xd3, 0x01, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x73, 0x74, 0x22, 0x83, 0x02, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x11, 0x77, 0x69, 0x72, 0x65, 0x74, 0x72, 0x75, 0x73, 0x74, 0x65, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x57, 0x69, 0x72, 0x65, @@ -873,7 +882,10 @@ var file_management_proto_rawDesc = []byte{ 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x72, 0x65, 0x6d, - 0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x22, 0x5a, 0x0a, 0x0c, 0x4c, 0x6f, 0x67, 0x69, + 0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6d, 0x6f, + 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x73, 0x49, 0x73, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x73, 0x49, 0x73, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x5a, 0x0a, 0x0c, 0x4c, 0x6f, 0x67, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x74, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x74, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, diff --git a/management/proto/management.proto b/management/proto/management.proto index 703e2e5677a..cc80c9580d4 100644 --- a/management/proto/management.proto +++ b/management/proto/management.proto @@ -42,6 +42,8 @@ message SyncResponse { PeerConfig peerConfig = 2; repeated RemotePeerConfig remotePeers = 3; + // Indicates whether remotePeers array is empty or not to bypass protobuf null and empty array equality. + bool remotePeersIsEmpty = 4; } message LoginRequest { diff --git a/management/server/account.go b/management/server/account.go index 234bc831307..6714e6e1a9d 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -13,7 +13,8 @@ import ( type AccountManager struct { Store Store // mutex to synchronise account operations (e.g. generating Peer IP address inside the Network) - mux sync.Mutex + mux sync.Mutex + peersUpdateManager *PeersUpdateManager } // Account represents a unique account of the system @@ -25,19 +26,20 @@ type Account struct { } // NewManager creates a new AccountManager with a provided Store -func NewManager(store Store) *AccountManager { +func NewManager(store Store, peersUpdateManager *PeersUpdateManager) *AccountManager { return &AccountManager{ - Store: store, - mux: sync.Mutex{}, + Store: store, + mux: sync.Mutex{}, + peersUpdateManager: peersUpdateManager, } } //AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account -func (manager *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.NotFound, "account not found") } @@ -45,7 +47,7 @@ func (manager *AccountManager) AddSetupKey(accountId string, keyName string, key setupKey := GenerateSetupKey(keyName, keyType, expiresIn) account.SetupKeys[setupKey.Key] = setupKey - err = manager.Store.SaveAccount(account) + err = am.Store.SaveAccount(account) if err != nil { return nil, status.Errorf(codes.Internal, "failed adding account key") } @@ -54,11 +56,11 @@ func (manager *AccountManager) AddSetupKey(accountId string, keyName string, key } //RevokeSetupKey marks SetupKey as revoked - becomes not valid anymore -func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (*SetupKey, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) RevokeSetupKey(accountId string, keyId string) (*SetupKey, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.NotFound, "account not found") } @@ -71,7 +73,7 @@ func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (* keyCopy := setupKey.Copy() keyCopy.Revoked = true account.SetupKeys[keyCopy.Key] = keyCopy - err = manager.Store.SaveAccount(account) + err = am.Store.SaveAccount(account) if err != nil { return nil, status.Errorf(codes.Internal, "failed adding account key") } @@ -80,11 +82,11 @@ func (manager *AccountManager) RevokeSetupKey(accountId string, keyId string) (* } //RenameSetupKey renames existing setup key of the specified account. -func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, newName string) (*SetupKey, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) RenameSetupKey(accountId string, keyId string, newName string) (*SetupKey, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.NotFound, "account not found") } @@ -97,7 +99,7 @@ func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, ne keyCopy := setupKey.Copy() keyCopy.Name = newName account.SetupKeys[keyCopy.Key] = keyCopy - err = manager.Store.SaveAccount(account) + err = am.Store.SaveAccount(account) if err != nil { return nil, status.Errorf(codes.Internal, "failed adding account key") } @@ -106,11 +108,11 @@ func (manager *AccountManager) RenameSetupKey(accountId string, keyId string, ne } //GetAccount returns an existing account or error (NotFound) if doesn't exist -func (manager *AccountManager) GetAccount(accountId string) (*Account, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) GetAccount(accountId string) (*Account, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.NotFound, "account not found") } @@ -119,21 +121,21 @@ func (manager *AccountManager) GetAccount(accountId string) (*Account, error) { } // GetOrCreateAccount returns an existing account or creates a new one if doesn't exist -func (manager *AccountManager) GetOrCreateAccount(accountId string) (*Account, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) GetOrCreateAccount(accountId string) (*Account, error) { + am.mux.Lock() + defer am.mux.Unlock() - _, err := manager.Store.GetAccount(accountId) + _, err := am.Store.GetAccount(accountId) if err != nil { if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { - return manager.createAccount(accountId) + return am.createAccount(accountId) } else { // other error return nil, err } } - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.Internal, "failed retrieving account") } @@ -142,12 +144,12 @@ func (manager *AccountManager) GetOrCreateAccount(accountId string) (*Account, e } //AccountExists checks whether account exists (returns true) or not (returns false) -func (manager *AccountManager) AccountExists(accountId string) (*bool, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) AccountExists(accountId string) (*bool, error) { + am.mux.Lock() + defer am.mux.Unlock() var res bool - _, err := manager.Store.GetAccount(accountId) + _, err := am.Store.GetAccount(accountId) if err != nil { if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound { res = false @@ -162,19 +164,19 @@ func (manager *AccountManager) AccountExists(accountId string) (*bool, error) { } // AddAccount generates a new Account with a provided accountId and saves to the Store -func (manager *AccountManager) AddAccount(accountId string) (*Account, error) { +func (am *AccountManager) AddAccount(accountId string) (*Account, error) { - manager.mux.Lock() - defer manager.mux.Unlock() + am.mux.Lock() + defer am.mux.Unlock() - return manager.createAccount(accountId) + return am.createAccount(accountId) } -func (manager *AccountManager) createAccount(accountId string) (*Account, error) { +func (am *AccountManager) createAccount(accountId string) (*Account, error) { account, _ := newAccountWithId(accountId) - err := manager.Store.SaveAccount(account) + err := am.Store.SaveAccount(account) if err != nil { return nil, status.Errorf(codes.Internal, "failed creating account") } @@ -188,17 +190,19 @@ func newAccountWithId(accountId string) (*Account, *SetupKey) { log.Debugf("creating new account") setupKeys := make(map[string]*SetupKey) - setupKey := GenerateDefaultSetupKey() - setupKeys[setupKey.Key] = setupKey + defaultKey := GenerateDefaultSetupKey() + oneOffKey := GenerateSetupKey("One-off key", SetupKeyOneOff, DefaultSetupKeyDuration) + setupKeys[defaultKey.Key] = defaultKey + setupKeys[oneOffKey.Key] = oneOffKey network := &Network{ Id: uuid.New().String(), Net: net.IPNet{IP: net.ParseIP("100.64.0.0"), Mask: net.IPMask{255, 192, 0, 0}}, Dns: ""} peers := make(map[string]*Peer) - log.Debugf("created new account %s with setup key %s", accountId, setupKey.Key) + log.Debugf("created new account %s with setup key %s", accountId, defaultKey.Key) - return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers}, setupKey + return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers}, defaultKey } // newAccount creates a new Account with a default SetupKey (doesn't store in a Store) diff --git a/management/server/account_test.go b/management/server/account_test.go index 2a2f4d4f6fc..5eb7558484e 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -17,7 +17,7 @@ func TestAccountManager_AddAccount(t *testing.T) { expectedId := "test_account" expectedPeersSize := 0 - expectedSetupKeysSize := 1 + expectedSetupKeysSize := 2 expectedNetwork := net.IPNet{ IP: net.IP{100, 64, 0, 0}, Mask: net.IPMask{255, 192, 0, 0}, @@ -201,7 +201,7 @@ func createManager(t *testing.T) (*AccountManager, error) { if err != nil { return nil, err } - return NewManager(store), nil + return NewManager(store, NewPeersUpdateManager()), nil } func createStore(t *testing.T) (Store, error) { diff --git a/management/server/file_store.go b/management/server/file_store.go index 1f3442695c5..2b693c82bda 100644 --- a/management/server/file_store.go +++ b/management/server/file_store.go @@ -190,6 +190,22 @@ func (s *FileStore) GetAccountBySetupKey(setupKey string) (*Account, error) { return account, nil } +func (s *FileStore) GetAccountPeers(accountId string) ([]*Peer, error) { + s.mux.Lock() + defer s.mux.Unlock() + + account, err := s.GetAccount(accountId) + if err != nil { + return nil, err + } + + var peers []*Peer + for _, peer := range account.Peers { + peers = append(peers, peer) + } + + return peers, nil +} func (s *FileStore) GetAccount(accountId string) (*Account, error) { diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index a29fde44016..9cd1b2c6c99 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -118,6 +118,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S if err != nil { return status.Errorf(codes.Internal, "failed sending update message") } + log.Debugf("sent an update to peer %s", peerKey.String()) // condition when client <-> server connection has been terminated case <-srv.Context().Done(): // happens when connection drops, e.g. client disconnects @@ -274,7 +275,7 @@ func toWiretrusteeConfig(config *Config, turnCredentials *TURNCredentials) *prot password = turnCredentials.Password } else { username = turn.Username - password = string(turn.Password) + password = turn.Password } turns = append(turns, &proto.ProtectedHostConfig{ HostConfig: &proto.HostConfig{ @@ -302,13 +303,9 @@ func toPeerConfig(peer *Peer) *proto.PeerConfig { } } -func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials *TURNCredentials) *proto.SyncResponse { - - wtConfig := toWiretrusteeConfig(config, turnCredentials) - - pConfig := toPeerConfig(peer) +func toRemotePeerConfig(peers []*Peer) []*proto.RemotePeerConfig { - remotePeers := make([]*proto.RemotePeerConfig, 0, len(peers)) + remotePeers := []*proto.RemotePeerConfig{} for _, rPeer := range peers { remotePeers = append(remotePeers, &proto.RemotePeerConfig{ WgPubKey: rPeer.Key, @@ -316,10 +313,23 @@ func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials * }) } + return remotePeers + +} + +func toSyncResponse(config *Config, peer *Peer, peers []*Peer, turnCredentials *TURNCredentials) *proto.SyncResponse { + + wtConfig := toWiretrusteeConfig(config, turnCredentials) + + pConfig := toPeerConfig(peer) + + remotePeers := toRemotePeerConfig(peers) + return &proto.SyncResponse{ - WiretrusteeConfig: wtConfig, - PeerConfig: pConfig, - RemotePeers: remotePeers, + WiretrusteeConfig: wtConfig, + PeerConfig: pConfig, + RemotePeers: remotePeers, + RemotePeersIsEmpty: len(remotePeers) == 0, } } diff --git a/management/server/management_test.go b/management/server/management_test.go index 8e7fae5c384..eb86dd328d0 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -492,8 +492,8 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) { if err != nil { log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) } - accountManager := server.NewManager(store) peersUpdateManager := server.NewPeersUpdateManager() + accountManager := server.NewManager(store, peersUpdateManager) turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) Expect(err).NotTo(HaveOccurred()) diff --git a/management/server/peer.go b/management/server/peer.go index 939e45fe954..f7750c165a3 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -1,6 +1,7 @@ package server import ( + "github.com/wiretrustee/wiretrustee/management/proto" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "net" @@ -55,11 +56,11 @@ func (p *Peer) Copy() *Peer { } //GetPeer returns a peer from a Store -func (manager *AccountManager) GetPeer(peerKey string) (*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) GetPeer(peerKey string) (*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() - peer, err := manager.Store.GetPeer(peerKey) + peer, err := am.Store.GetPeer(peerKey) if err != nil { return nil, err } @@ -68,16 +69,16 @@ func (manager *AccountManager) GetPeer(peerKey string) (*Peer, error) { } //MarkPeerConnected marks peer as connected (true) or disconnected (false) -func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool) error { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) MarkPeerConnected(peerKey string, connected bool) error { + am.mux.Lock() + defer am.mux.Unlock() - peer, err := manager.Store.GetPeer(peerKey) + peer, err := am.Store.GetPeer(peerKey) if err != nil { return err } - account, err := manager.Store.GetPeerAccount(peerKey) + account, err := am.Store.GetPeerAccount(peerKey) if err != nil { return err } @@ -85,7 +86,7 @@ func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool) peerCopy := peer.Copy() peerCopy.Status.LastSeen = time.Now() peerCopy.Status.Connected = connected - err = manager.Store.SavePeer(account.Id, peerCopy) + err = am.Store.SavePeer(account.Id, peerCopy) if err != nil { return err } @@ -93,18 +94,18 @@ func (manager *AccountManager) MarkPeerConnected(peerKey string, connected bool) } //RenamePeer changes peer's name -func (manager *AccountManager) RenamePeer(accountId string, peerKey string, newName string) (*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) RenamePeer(accountId string, peerKey string, newName string) (*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() - peer, err := manager.Store.GetPeer(peerKey) + peer, err := am.Store.GetPeer(peerKey) if err != nil { return nil, err } peerCopy := peer.Copy() peerCopy.Name = newName - err = manager.Store.SavePeer(accountId, peerCopy) + err = am.Store.SavePeer(accountId, peerCopy) if err != nil { return nil, err } @@ -113,18 +114,60 @@ func (manager *AccountManager) RenamePeer(accountId string, peerKey string, newN } //DeletePeer removes peer from the account by it's IP -func (manager *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() - return manager.Store.DeletePeer(accountId, peerKey) +func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() + + peer, err := am.Store.DeletePeer(accountId, peerKey) + if err != nil { + return nil, err + } + + err = am.peersUpdateManager.SendUpdate(peerKey, + &UpdateMessage{ + Update: &proto.SyncResponse{ + RemotePeers: []*proto.RemotePeerConfig{}, + RemotePeersIsEmpty: true, + }}) + if err != nil { + return nil, err + } + + //notify other peers of the change + peers, err := am.Store.GetAccountPeers(accountId) + if err != nil { + return nil, err + } + + for _, p := range peers { + peersToSend := []*Peer{} + for _, remote := range peers { + if p.Key != remote.Key { + peersToSend = append(peersToSend, remote) + } + } + update := toRemotePeerConfig(peersToSend) + err = am.peersUpdateManager.SendUpdate(p.Key, + &UpdateMessage{ + Update: &proto.SyncResponse{ + RemotePeers: update, + RemotePeersIsEmpty: len(update) == 0, + }}) + if err != nil { + return nil, err + } + } + + am.peersUpdateManager.CloseChannel(peerKey) + return peer, nil } //GetPeerByIP returns peer by it's IP -func (manager *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetAccount(accountId) + account, err := am.Store.GetAccount(accountId) if err != nil { return nil, status.Errorf(codes.NotFound, "account not found") } @@ -140,11 +183,11 @@ func (manager *AccountManager) GetPeerByIP(accountId string, peerIP string) (*Pe // GetPeersForAPeer returns a list of peers available for a given peer (key) // Effectively all the peers of the original peer's account except for the peer itself -func (manager *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() - account, err := manager.Store.GetPeerAccount(peerKey) + account, err := am.Store.GetPeerAccount(peerKey) if err != nil { return nil, status.Errorf(codes.Internal, "Invalid peer key %s", peerKey) } @@ -165,9 +208,9 @@ func (manager *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) // Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused). // If the specified setupKey is empty then a new Account will be created //todo remove this part // The peer property is just a placeholder for the Peer properties to pass further -func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) { - manager.mux.Lock() - defer manager.mux.Unlock() +func (am *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) { + am.mux.Lock() + defer am.mux.Unlock() upperKey := strings.ToUpper(setupKey) @@ -178,7 +221,7 @@ func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error // Empty setup key, create a new account for it. account, sk = newAccount() } else { - account, err = manager.Store.GetAccountBySetupKey(upperKey) + account, err = am.Store.GetAccountBySetupKey(upperKey) if err != nil { return nil, status.Errorf(codes.NotFound, "unknown setupKey %s", upperKey) } @@ -213,7 +256,7 @@ func (manager *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error account.Peers[newPeer.Key] = newPeer account.SetupKeys[sk.Key] = sk.IncrementUsage() - err = manager.Store.SaveAccount(account) + err = am.Store.SaveAccount(account) if err != nil { return nil, status.Errorf(codes.Internal, "failed adding peer") } diff --git a/management/server/store.go b/management/server/store.go index 84cf75c82c3..e292a0f4395 100644 --- a/management/server/store.go +++ b/management/server/store.go @@ -5,6 +5,7 @@ type Store interface { DeletePeer(accountId string, peerKey string) (*Peer, error) SavePeer(accountId string, peer *Peer) error GetAccount(accountId string) (*Account, error) + GetAccountPeers(accountId string) ([]*Peer, error) GetPeerAccount(peerKey string) (*Account, error) GetAccountBySetupKey(setupKey string) (*Account, error) SaveAccount(account *Account) error diff --git a/signal/client/client.go b/signal/client/client.go index 7537f6e38b3..a603be3a81b 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -75,6 +75,19 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo }, nil } +//defaultBackoff is a basic backoff mechanism for general issues +func defaultBackoff() backoff.BackOff { + return &backoff.ExponentialBackOff{ + InitialInterval: 800 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } +} + // Receive Connects to the Signal Exchange message stream and starts receiving messages. // The messages will be handled by msgHandler function provided. // This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) @@ -83,15 +96,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { c.connWg.Add(1) go func() { - var backOff = &backoff.ExponentialBackOff{ - InitialInterval: backoff.DefaultInitialInterval, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 3 * time.Second, - MaxElapsedTime: time.Duration(0), //never stop - Stop: backoff.Stop, - Clock: backoff.SystemClock, - } + var backOff = defaultBackoff() operation := func() error { err := c.connect(c.key.PublicKey().String(), msgHandler)