From 6c91e36a9601f57e49392bfe44f45fd355109244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 7 Nov 2024 15:56:45 +0100 Subject: [PATCH 01/16] Remove loop after route calculation --- client/internal/peer/status.go | 80 +++++++++++++++----------- client/internal/routemanager/client.go | 13 +++-- 2 files changed, 53 insertions(+), 40 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index a28992fac13..a5443f63c50 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -237,10 +237,6 @@ func (d *Status) UpdatePeerState(receivedState State) error { peerState.IP = receivedState.IP } - if receivedState.GetRoutes() != nil { - peerState.SetRoutes(receivedState.GetRoutes()) - } - skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState) if receivedState.ConnStatus != peerState.ConnStatus { @@ -261,12 +257,33 @@ func (d *Status) UpdatePeerState(receivedState State) error { return nil } - ch, found := d.changeNotify[receivedState.PubKey] - if found && ch != nil { - close(ch) - d.changeNotify[receivedState.PubKey] = nil + d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.notifyPeerListChanged() + return nil +} + +// UpdatePeerRouteState updates peer's route state. It operates with routes only, ignore other fields +func (d *Status) UpdatePeerRouteState(receivedState State) error { + d.mux.Lock() + defer d.mux.Unlock() + + peerState, ok := d.peers[receivedState.PubKey] + if !ok { + return errors.New("peer doesn't exist") + } + + if receivedState.GetRoutes() != nil { + peerState.SetRoutes(receivedState.GetRoutes()) } + skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState) + + d.peers[receivedState.PubKey] = peerState + + if skipNotification { + return nil + } + // todo: consider to make sense of this notification or not d.notifyPeerListChanged() return nil } @@ -301,12 +318,6 @@ func (d *Status) UpdatePeerICEState(receivedState State) error { return nil } - ch, found := d.changeNotify[receivedState.PubKey] - if found && ch != nil { - close(ch) - d.changeNotify[receivedState.PubKey] = nil - } - d.notifyPeerListChanged() return nil } @@ -334,12 +345,7 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error { return nil } - ch, found := d.changeNotify[receivedState.PubKey] - if found && ch != nil { - close(ch) - d.changeNotify[receivedState.PubKey] = nil - } - + d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerListChanged() return nil } @@ -366,12 +372,7 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error return nil } - ch, found := d.changeNotify[receivedState.PubKey] - if found && ch != nil { - close(ch) - d.changeNotify[receivedState.PubKey] = nil - } - + d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerListChanged() return nil } @@ -401,12 +402,7 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error { return nil } - ch, found := d.changeNotify[receivedState.PubKey] - if found && ch != nil { - close(ch) - d.changeNotify[receivedState.PubKey] = nil - } - + d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerListChanged() return nil } @@ -477,11 +473,14 @@ func (d *Status) FinishPeerListModifications() { func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} { d.mux.Lock() defer d.mux.Unlock() + ch, found := d.changeNotify[peer] - if !found || ch == nil { - ch = make(chan struct{}) - d.changeNotify[peer] = ch + if found { + return ch } + + ch = make(chan struct{}) + d.changeNotify[peer] = ch return ch } @@ -755,6 +754,17 @@ func (d *Status) onConnectionChanged() { d.notifier.updateServerStates(d.managementState, d.signalState) } +// notifyPeerStateChangeListeners notifies route manager about the change in peer state +func (d *Status) notifyPeerStateChangeListeners(peerID string) { + ch, found := d.changeNotify[peerID] + if !found { + return + } + + close(ch) + delete(d.changeNotify, peerID) +} + func (d *Status) notifyPeerListChanged() { d.notifier.peerListChanged(d.numOfPeers()) } diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index eaa23215135..a401bdf725e 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -195,10 +195,13 @@ func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey stri func (c *clientNetwork) startPeersStatusChangeWatcher() { for _, r := range c.routes { _, found := c.routePeersNotifiers[r.Peer] - if !found { - c.routePeersNotifiers[r.Peer] = make(chan struct{}) - go c.watchPeerStatusChanges(c.ctx, r.Peer, c.peerStateUpdate, c.routePeersNotifiers[r.Peer]) + if found { + continue } + + closerChan := make(chan struct{}) + c.routePeersNotifiers[r.Peer] = closerChan + go c.watchPeerStatusChanges(c.ctx, r.Peer, c.peerStateUpdate, closerChan) } } @@ -281,7 +284,7 @@ func (c *clientNetwork) addStateRoute() { } state.AddRoute(c.handler.String()) - if err := c.statusRecorder.UpdatePeerState(state); err != nil { + if err := c.statusRecorder.UpdatePeerRouteState(state); err != nil { log.Warnf("Failed to update peer state: %v", err) } } @@ -294,7 +297,7 @@ func (c *clientNetwork) removeStateRoute() { } state.DeleteRoute(c.handler.String()) - if err := c.statusRecorder.UpdatePeerState(state); err != nil { + if err := c.statusRecorder.UpdatePeerRouteState(state); err != nil { log.Warnf("Failed to update peer state: %v", err) } } From 00a5e3f550ea6d1ceaf214a99156280586f95bf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 7 Nov 2024 23:25:40 +0100 Subject: [PATCH 02/16] Protect route state manipulation with mutex --- client/internal/peer/status.go | 28 +++++++++++------- client/internal/routemanager/client.go | 41 ++++++++------------------ 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index a5443f63c50..66b55a2f662 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -262,27 +262,35 @@ func (d *Status) UpdatePeerState(receivedState State) error { return nil } -// UpdatePeerRouteState updates peer's route state. It operates with routes only, ignore other fields -func (d *Status) UpdatePeerRouteState(receivedState State) error { +func (d *Status) AddPeerStateRoute(peer string, route string) error { d.mux.Lock() defer d.mux.Unlock() - peerState, ok := d.peers[receivedState.PubKey] + peerState, ok := d.peers[peer] if !ok { return errors.New("peer doesn't exist") } - if receivedState.GetRoutes() != nil { - peerState.SetRoutes(receivedState.GetRoutes()) - } + peerState.AddRoute(route) + d.peers[peer] = peerState - skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState) + // todo: consider to make sense of this notification or not + d.notifyPeerListChanged() + return nil +} - d.peers[receivedState.PubKey] = peerState +func (d *Status) RemovePeerStateRoute(peer string, route string) error { + d.mux.Lock() + defer d.mux.Unlock() - if skipNotification { - return nil + peerState, ok := d.peers[peer] + if !ok { + return errors.New("peer doesn't exist") } + + peerState.DeleteRoute(route) + d.peers[peer] = peerState + // todo: consider to make sense of this notification or not d.notifyPeerListChanged() return nil diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index a401bdf725e..f319c8c52e2 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -205,13 +205,16 @@ func (c *clientNetwork) startPeersStatusChangeWatcher() { } } -func (c *clientNetwork) removeRouteFromWireguardPeer() error { - c.removeStateRoute() +func (c *clientNetwork) removeRouteFromWireGuardPeer() error { + var multiErr *multierror.Error + if err := c.statusRecorder.RemovePeerStateRoute(c.currentChosen.Peer, c.handler.String()); err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("remove peer state route: %w", err)) + } if err := c.handler.RemoveAllowedIPs(); err != nil { - return fmt.Errorf("remove allowed IPs: %w", err) + multiErr = multierror.Append(multiErr, fmt.Errorf("remove allowed IPs: %w", err)) } - return nil + return nberrors.FormatErrorOrNil(multiErr) } func (c *clientNetwork) removeRouteFromPeerAndSystem() error { @@ -221,7 +224,7 @@ func (c *clientNetwork) removeRouteFromPeerAndSystem() error { var merr *multierror.Error - if err := c.removeRouteFromWireguardPeer(); err != nil { + if err := c.removeRouteFromWireGuardPeer(); err != nil { merr = multierror.Append(merr, fmt.Errorf("remove allowed IPs for peer %s: %w", c.currentChosen.Peer, err)) } if err := c.handler.RemoveRoute(); err != nil { @@ -260,7 +263,7 @@ func (c *clientNetwork) recalculateRouteAndUpdatePeerAndSystem() error { } } else { // Otherwise, remove the allowed IPs from the previous peer first - if err := c.removeRouteFromWireguardPeer(); err != nil { + if err := c.removeRouteFromWireGuardPeer(); err != nil { return fmt.Errorf("remove allowed IPs for peer %s: %w", c.currentChosen.Peer, err) } } @@ -271,35 +274,15 @@ func (c *clientNetwork) recalculateRouteAndUpdatePeerAndSystem() error { return fmt.Errorf("add allowed IPs for peer %s: %w", c.currentChosen.Peer, err) } - c.addStateRoute() - - return nil -} - -func (c *clientNetwork) addStateRoute() { - state, err := c.statusRecorder.GetPeer(c.currentChosen.Peer) + err := c.statusRecorder.AddPeerStateRoute(c.currentChosen.Peer, c.handler.String()) if err != nil { - log.Errorf("Failed to get peer state: %v", err) - return - } - - state.AddRoute(c.handler.String()) - if err := c.statusRecorder.UpdatePeerRouteState(state); err != nil { - log.Warnf("Failed to update peer state: %v", err) + return fmt.Errorf("add peer state route: %w", err) } + return nil } func (c *clientNetwork) removeStateRoute() { - state, err := c.statusRecorder.GetPeer(c.currentChosen.Peer) - if err != nil { - log.Errorf("Failed to get peer state: %v", err) - return - } - state.DeleteRoute(c.handler.String()) - if err := c.statusRecorder.UpdatePeerRouteState(state); err != nil { - log.Warnf("Failed to update peer state: %v", err) - } } func (c *clientNetwork) sendUpdateToClientNetworkWatcher(update routesUpdate) { From 0e891fe8f927aeca2db30074195514f6e0dc11b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 7 Nov 2024 23:27:18 +0100 Subject: [PATCH 03/16] Do not update routes after peer.Open() and peer.Close() --- client/internal/peer/status.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 66b55a2f662..5bcfcee60b5 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -257,7 +257,6 @@ func (d *Status) UpdatePeerState(receivedState State) error { return nil } - d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerListChanged() return nil } From a55207b0ee7416f276759a24b32fe1725508f0ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 7 Nov 2024 23:30:52 +0100 Subject: [PATCH 04/16] Remove unused function --- client/internal/routemanager/client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index f319c8c52e2..d882b73d5b3 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -281,10 +281,6 @@ func (c *clientNetwork) recalculateRouteAndUpdatePeerAndSystem() error { return nil } -func (c *clientNetwork) removeStateRoute() { - -} - func (c *clientNetwork) sendUpdateToClientNetworkWatcher(update routesUpdate) { go func() { c.routeUpdate <- update From 16deec6bff82c7eac7e8f850faa70fa6d4ac2250 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 7 Nov 2024 23:45:14 +0100 Subject: [PATCH 05/16] Fix test --- client/internal/peer/status_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 1d283433b00..931ec90054d 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -93,7 +93,7 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { peerState.IP = ip - err := status.UpdatePeerState(peerState) + err := status.UpdatePeerRelayedStateToDisconnected(peerState) assert.NoError(t, err, "shouldn't return error") select { From 20896568adf7bc7750b7387593358967f591feb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 09:45:44 +0100 Subject: [PATCH 06/16] Avoid duplicated disconnects events --- client/internal/peer/worker_ice.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 55894218d73..4fae1bde85f 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -57,6 +57,9 @@ type WorkerICE struct { localUfrag string localPwd string + + // we record the last known state of the ICE agent to avoid duplicate on disconnected events + lastKnownState ice.ConnectionState } func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) { @@ -215,16 +218,21 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) - if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { + if state != ice.ConnectionStateFailed && state != ice.ConnectionStateDisconnected { + return + } + + if w.lastKnownState != ice.ConnectionStateDisconnected { + w.lastKnownState = ice.ConnectionStateDisconnected w.conn.OnStatusChanged(StatusDisconnected) + } - w.muxAgent.Lock() - agentCancel() - _ = agent.Close() - w.agent = nil + w.muxAgent.Lock() + defer w.muxAgent.Unlock() - w.muxAgent.Unlock() - } + agentCancel() + _ = agent.Close() + w.agent = nil }) if err != nil { return nil, err From 3eb99c0ba03465a3ded3f0a2f8ae6ad5e1739c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 11:37:02 +0100 Subject: [PATCH 07/16] Reset lastKnownState flag --- client/internal/peer/worker_ice.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 4fae1bde85f..66c48ae86aa 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -218,6 +218,10 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) + if state == ice.ConnectionStateConnected { + w.lastKnownState = ice.ConnectionStateConnected + return + } if state != ice.ConnectionStateFailed && state != ice.ConnectionStateDisconnected { return } From a2164a80cd2c956a428f216942a95488fd3692f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 11:38:38 +0100 Subject: [PATCH 08/16] Add log for score calculation --- client/internal/routemanager/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index d882b73d5b3..749dae13f8a 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -150,6 +150,8 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] } } + log.Debugf("chosen route: %s, chosen score: %f, current route: %s, current score: %f", chosen, chosenScore, currID, currScore) + switch { case chosen == "": var peers []string From 8f07049fb5e0533403f030062d42f1c2d6e2f0cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 14:55:40 +0100 Subject: [PATCH 09/16] Fix thread safe MarshalJson on refcounter --- client/internal/routemanager/refcounter/refcounter.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/internal/routemanager/refcounter/refcounter.go b/client/internal/routemanager/refcounter/refcounter.go index c121b7d774b..1e1913477c2 100644 --- a/client/internal/routemanager/refcounter/refcounter.go +++ b/client/internal/routemanager/refcounter/refcounter.go @@ -217,13 +217,19 @@ func (rm *Counter[Key, I, O]) Clear() { // MarshalJSON implements the json.Marshaler interface for Counter. func (rm *Counter[Key, I, O]) MarshalJSON() ([]byte, error) { - return json.Marshal(struct { + rm.refCountMu.Lock() + defer rm.refCountMu.Unlock() + rm.idMu.Lock() + defer rm.idMu.Unlock() + + b, err := json.Marshal(struct { RefCountMap map[Key]Ref[O] `json:"refCountMap"` IDMap map[string][]Key `json:"idMap"` }{ RefCountMap: rm.refCountMap, IDMap: rm.idMap, }) + return b, err } // UnmarshalJSON implements the json.Unmarshaler interface for Counter. From c093f89e0ee8d2348a8e701c2841937e3e3e7bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 15:42:02 +0100 Subject: [PATCH 10/16] Add route change notification for UpdatePeerICEState --- client/internal/peer/status.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 5bcfcee60b5..241dfabbb01 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -325,6 +325,7 @@ func (d *Status) UpdatePeerICEState(receivedState State) error { return nil } + d.notifyPeerStateChangeListeners(receivedState.PubKey) d.notifyPeerListChanged() return nil } From bb39a892a4ba4712bf7eac32516ef7cb75963d0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 15:48:42 +0100 Subject: [PATCH 11/16] Code cleaning --- client/internal/peer/worker_ice.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 66c48ae86aa..24f620f44d7 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -218,25 +218,23 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) - if state == ice.ConnectionStateConnected { + switch state { + case ice.ConnectionStateConnected: w.lastKnownState = ice.ConnectionStateConnected return - } - if state != ice.ConnectionStateFailed && state != ice.ConnectionStateDisconnected { + case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected: + if w.lastKnownState != ice.ConnectionStateDisconnected { + w.lastKnownState = ice.ConnectionStateDisconnected + w.conn.OnStatusChanged(StatusDisconnected) + } + w.muxAgent.Lock() + agentCancel() + _ = w.agent.Close() + w.agent = nil + w.muxAgent.Unlock() + default: return } - - if w.lastKnownState != ice.ConnectionStateDisconnected { - w.lastKnownState = ice.ConnectionStateDisconnected - w.conn.OnStatusChanged(StatusDisconnected) - } - - w.muxAgent.Lock() - defer w.muxAgent.Unlock() - - agentCancel() - _ = agent.Close() - w.agent = nil }) if err != nil { return nil, err From 1e0b8b71c6cf2e225917e361b59b05727d51e657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 16:09:47 +0100 Subject: [PATCH 12/16] Handle negative score --- client/internal/routemanager/client.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 749dae13f8a..c2cc691555d 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -122,13 +122,18 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] tempScore = float64(metricDiff) * 10 } - // in some temporal cases, latency can be 0, so we set it to 1s to not block but try to avoid this route - latency := time.Second + // in some temporal cases, latency can be 0, so we set it to 999ms to not block but try to avoid this route + latency := 999 * time.Millisecond if peerStatus.latency != 0 { latency = peerStatus.latency } else { - log.Warnf("peer %s has 0 latency", r.Peer) + log.Infof("peer %s has 0 latency, range %s", r.Peer, c.handler) } + + if latency > 1*time.Second { + latency = 999 * time.Millisecond + } + tempScore += 1 - latency.Seconds() if !peerStatus.relayed { From 3d2d25a036858c9ddf1a4644f4ec7cf32d9a490e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 16:15:32 +0100 Subject: [PATCH 13/16] Add comment --- client/internal/routemanager/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index c2cc691555d..32f947e6f9a 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -130,10 +130,12 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] log.Infof("peer %s has 0 latency, range %s", r.Peer, c.handler) } + // avoid negative tempScore on the higher latency calculation if latency > 1*time.Second { latency = 999 * time.Millisecond } + // higher latency is worse score tempScore += 1 - latency.Seconds() if !peerStatus.relayed { From e6eba4b22ff0ce4f95df0f7f13a5a2fdb6492d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 16:20:53 +0100 Subject: [PATCH 14/16] Lint fix --- client/internal/peer/worker_ice.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 24f620f44d7..de467b6ce6d 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -227,11 +227,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i w.lastKnownState = ice.ConnectionStateDisconnected w.conn.OnStatusChanged(StatusDisconnected) } - w.muxAgent.Lock() - agentCancel() - _ = w.agent.Close() - w.agent = nil - w.muxAgent.Unlock() + w.closeAgent(agentCancel) default: return } @@ -259,6 +255,15 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i return agent, nil } +func (w *WorkerICE) closeAgent(cancel context.CancelFunc) { + w.muxAgent.Lock() + defer w.muxAgent.Unlock() + + cancel() + _ = w.agent.Close() + w.agent = nil +} + func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { // wait local endpoint configuration time.Sleep(time.Second) From 76c6ab555b1d0fa5b272f47d585c5bb25c9c5d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 8 Nov 2024 18:29:20 +0100 Subject: [PATCH 15/16] Fix error handling --- client/internal/routemanager/client.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 32f947e6f9a..2ff4af44757 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -215,15 +215,14 @@ func (c *clientNetwork) startPeersStatusChangeWatcher() { } func (c *clientNetwork) removeRouteFromWireGuardPeer() error { - var multiErr *multierror.Error if err := c.statusRecorder.RemovePeerStateRoute(c.currentChosen.Peer, c.handler.String()); err != nil { - multiErr = multierror.Append(multiErr, fmt.Errorf("remove peer state route: %w", err)) + log.Warnf("Failed to update peer state: %v", err) } if err := c.handler.RemoveAllowedIPs(); err != nil { - multiErr = multierror.Append(multiErr, fmt.Errorf("remove allowed IPs: %w", err)) + return fmt.Errorf("remove allowed IPs: %w", err) } - return nberrors.FormatErrorOrNil(multiErr) + return nil } func (c *clientNetwork) removeRouteFromPeerAndSystem() error { From 03e674badb783b994bf55bcefd948fca2768a466 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Sun, 10 Nov 2024 11:31:39 +0100 Subject: [PATCH 16/16] Close deprecated proxy instances --- client/iface/wgproxy/bind/proxy.go | 6 +++++- client/iface/wgproxy/ebpf/wrapper.go | 2 +- client/iface/wgproxy/udp/proxy.go | 2 +- client/internal/peer/conn.go | 13 +++++++++++-- relay/client/client.go | 7 +++---- relay/client/conn.go | 3 +-- relay/server/listener/ws/conn.go | 5 ++--- relay/server/peer.go | 4 ++-- 8 files changed, 26 insertions(+), 16 deletions(-) diff --git a/client/iface/wgproxy/bind/proxy.go b/client/iface/wgproxy/bind/proxy.go index e0883715a99..8a2e65382cd 100644 --- a/client/iface/wgproxy/bind/proxy.go +++ b/client/iface/wgproxy/bind/proxy.go @@ -2,6 +2,7 @@ package bind import ( "context" + "errors" "fmt" "net" "net/netip" @@ -94,7 +95,10 @@ func (p *ProxyBind) close() error { p.Bind.RemoveEndpoint(p.wgAddr) - return p.remoteConn.Close() + if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) { + return rErr + } + return nil } func (p *ProxyBind) proxyToLocal(ctx context.Context) { diff --git a/client/iface/wgproxy/ebpf/wrapper.go b/client/iface/wgproxy/ebpf/wrapper.go index efd5fd946cf..54cab4e1bbd 100644 --- a/client/iface/wgproxy/ebpf/wrapper.go +++ b/client/iface/wgproxy/ebpf/wrapper.go @@ -77,7 +77,7 @@ func (e *ProxyWrapper) CloseConn() error { e.cancel() - if err := e.remoteConn.Close(); err != nil { + if err := e.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { return fmt.Errorf("failed to close remote conn: %w", err) } return nil diff --git a/client/iface/wgproxy/udp/proxy.go b/client/iface/wgproxy/udp/proxy.go index 200d961f3c8..ba0004b8a4a 100644 --- a/client/iface/wgproxy/udp/proxy.go +++ b/client/iface/wgproxy/udp/proxy.go @@ -116,7 +116,7 @@ func (p *WGUDPProxy) close() error { p.cancel() var result *multierror.Error - if err := p.remoteConn.Close(); err != nil { + if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) { result = multierror.Append(result, fmt.Errorf("remote conn: %s", err)) } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 84a8c221fa6..81c456db747 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -442,7 +442,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { if conn.iceP2PIsActive() { conn.log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnPriority) - conn.wgProxyRelay = wgProxy + conn.setRelayedProxy(wgProxy) conn.statusRelay.Set(StatusConnected) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) return @@ -465,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { wgConfigWorkaround() conn.currentConnPriority = connPriorityRelay conn.statusRelay.Set(StatusConnected) - conn.wgProxyRelay = wgProxy + conn.setRelayedProxy(wgProxy) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.log.Infof("start to communicate with peer via relay") conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr) @@ -736,6 +736,15 @@ func (conn *Conn) logTraceConnState() { } } +func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) { + if conn.wgProxyRelay != nil { + if err := conn.wgProxyRelay.CloseConn(); err != nil { + conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err) + } + } + conn.wgProxyRelay = proxy +} + func isController(config ConnConfig) bool { return config.LocalKey > config.Key } diff --git a/relay/client/client.go b/relay/client/client.go index a82a75453bf..154c1787f2c 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "io" "net" "sync" "time" @@ -449,11 +448,11 @@ func (c *Client) writeTo(connReference *Conn, id string, dstID []byte, payload [ conn, ok := c.conns[id] c.mu.Unlock() if !ok { - return 0, io.EOF + return 0, net.ErrClosed } if conn.conn != connReference { - return 0, io.EOF + return 0, net.ErrClosed } // todo: use buffer pool instead of create new transport msg. @@ -508,7 +507,7 @@ func (c *Client) closeConn(connReference *Conn, id string) error { container, ok := c.conns[id] if !ok { - return fmt.Errorf("connection already closed") + return net.ErrClosed } if container.conn != connReference { diff --git a/relay/client/conn.go b/relay/client/conn.go index b4ff903e828..fe1b6fb52ed 100644 --- a/relay/client/conn.go +++ b/relay/client/conn.go @@ -1,7 +1,6 @@ package client import ( - "io" "net" "time" ) @@ -40,7 +39,7 @@ func (c *Conn) Write(p []byte) (n int, err error) { func (c *Conn) Read(b []byte) (n int, err error) { msg, ok := <-c.messageChan if !ok { - return 0, io.EOF + return 0, net.ErrClosed } n = copy(b, msg.Payload) diff --git a/relay/server/listener/ws/conn.go b/relay/server/listener/ws/conn.go index c248963b9f5..12e721fdb19 100644 --- a/relay/server/listener/ws/conn.go +++ b/relay/server/listener/ws/conn.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "io" "net" "sync" "time" @@ -100,7 +99,7 @@ func (c *Conn) isClosed() bool { func (c *Conn) ioErrHandling(err error) error { if c.isClosed() { - return io.EOF + return net.ErrClosed } var wErr *websocket.CloseError @@ -108,7 +107,7 @@ func (c *Conn) ioErrHandling(err error) error { return err } if wErr.Code == websocket.StatusNormalClosure { - return io.EOF + return net.ErrClosed } return err } diff --git a/relay/server/peer.go b/relay/server/peer.go index a9c542f84c4..c909c35d542 100644 --- a/relay/server/peer.go +++ b/relay/server/peer.go @@ -2,7 +2,7 @@ package server import ( "context" - "io" + "errors" "net" "sync" "time" @@ -57,7 +57,7 @@ func (p *Peer) Work() { for { n, err := p.conn.Read(buf) if err != nil { - if err != io.EOF { + if !errors.Is(err, net.ErrClosed) { p.log.Errorf("failed to read message: %s", err) } return