Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client] Limit P2P attempts and restart on specific events #2657

Merged
merged 14 commits into from
Oct 8, 2024
60 changes: 35 additions & 25 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
connPriorityRelay ConnPriority = 1
connPriorityICETurn ConnPriority = 1
connPriorityICEP2P ConnPriority = 2

reconnectMaxElapsedTime = 30 * time.Minute
)

type WgConfig struct {
Expand Down Expand Up @@ -82,6 +84,7 @@ type Conn struct {
wgProxyICE wgproxy.Proxy
wgProxyRelay wgproxy.Proxy
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
relayManager *relayClient.Manager
allowedIPsIP string
handshaker *Handshaker
Expand All @@ -107,6 +110,8 @@ type Conn struct {
// for reconnection operations
iCEDisconnected chan bool
relayDisconnected chan bool
connMonitor *ConnMonitor
reconnectCh <-chan struct{}
}

// NewConn creates a new not opened Conn to the remote peer.
Expand All @@ -122,21 +127,31 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
connLog := log.WithField("peer", config.Key)

var conn = &Conn{
log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
wgProxyFactory: wgProxyFactory,
signaler: signaler,
relayManager: relayManager,
allowedIPsIP: allowedIPsIP.String(),
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
wgProxyFactory: wgProxyFactory,
signaler: signaler,
iFaceDiscover: iFaceDiscover,
relayManager: relayManager,
allowedIPsIP: allowedIPsIP.String(),
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),

iCEDisconnected: make(chan bool, 1),
relayDisconnected: make(chan bool, 1),
}

conn.connMonitor, conn.reconnectCh = NewConnMonitor(
signaler,
iFaceDiscover,
config,
conn.relayDisconnected,
conn.iCEDisconnected,
)

rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnDisconnected: conn.onWorkerRelayStateDisconnected,
Expand Down Expand Up @@ -199,6 +214,8 @@ func (conn *Conn) startHandshakeAndReconnect() {
conn.log.Errorf("failed to send initial offer: %v", err)
}

go conn.connMonitor.Start(conn.ctx)

if conn.workerRelay.IsController() {
conn.reconnectLoopWithRetry()
} else {
Expand Down Expand Up @@ -308,12 +325,14 @@ func (conn *Conn) reconnectLoopWithRetry() {
// With it, we can decrease to send necessary offer
select {
case <-conn.ctx.Done():
return
case <-time.After(3 * time.Second):
}

ticker := conn.prepareExponentTicker()
defer ticker.Stop()
time.Sleep(1 * time.Second)

for {
select {
case t := <-ticker.C:
Expand Down Expand Up @@ -341,20 +360,11 @@ func (conn *Conn) reconnectLoopWithRetry() {
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
case changed := <-conn.relayDisconnected:
if !changed {
continue
}
conn.log.Debugf("Relay state changed, reset reconnect timer")
ticker.Stop()
ticker = conn.prepareExponentTicker()
case changed := <-conn.iCEDisconnected:
if !changed {
continue
}
conn.log.Debugf("ICE state changed, reset reconnect timer")

case <-conn.reconnectCh:
ticker.Stop()
ticker = conn.prepareExponentTicker()

case <-conn.ctx.Done():
conn.log.Debugf("context is done, stop reconnect loop")
return
Expand All @@ -365,10 +375,10 @@ func (conn *Conn) reconnectLoopWithRetry() {
func (conn *Conn) prepareExponentTicker() *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.01,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: conn.config.Timeout,
MaxElapsedTime: 0,
MaxElapsedTime: reconnectMaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, conn.ctx)
Expand Down
212 changes: 212 additions & 0 deletions client/internal/peer/conn_monitor.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make no sense to move this file into a separate structure? I do not see any dependency from the conn that is not exchangeable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sonar will complain :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That the file is too large

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not follow you. If you move everything from conn_monitor.go to monitor.go and in it create a

type ConnMonitor struct {
}

Then we can get a better separated code structure that is better testable by unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved it to a separate struct. But not its own package yet because that requires more refactoring on the other components

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks nice! I will review it again.

Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package peer

import (
"context"
"fmt"
"sync"
"time"

"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"

"github.com/netbirdio/netbird/client/internal/stdnet"
)

const (
signalerMonitorPeriod = 5 * time.Second
candidatesMonitorPeriod = 5 * time.Minute
candidateGatheringTimeout = 5 * time.Second
)

type ConnMonitor struct {
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
config ConnConfig
relayDisconnected chan bool
iCEDisconnected chan bool
reconnectCh chan struct{}
currentCandidates []ice.Candidate
candidatesMu sync.Mutex
}

func NewConnMonitor(signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, config ConnConfig, relayDisconnected, iCEDisconnected chan bool) (*ConnMonitor, <-chan struct{}) {
reconnectCh := make(chan struct{}, 1)
cm := &ConnMonitor{
signaler: signaler,
iFaceDiscover: iFaceDiscover,
config: config,
relayDisconnected: relayDisconnected,
iCEDisconnected: iCEDisconnected,
reconnectCh: reconnectCh,
}
return cm, reconnectCh
}

func (cm *ConnMonitor) Start(ctx context.Context) {
signalerReady := make(chan struct{}, 1)
go cm.monitorSignalerReady(ctx, signalerReady)

localCandidatesChanged := make(chan struct{}, 1)
go cm.monitorLocalCandidatesChanged(ctx, localCandidatesChanged)

for {
select {
case changed := <-cm.relayDisconnected:
if !changed {
continue
}
log.Debugf("Relay state changed, triggering reconnect")
cm.triggerReconnect()

case changed := <-cm.iCEDisconnected:
if !changed {
continue
}
log.Debugf("ICE state changed, triggering reconnect")
cm.triggerReconnect()

case <-signalerReady:
log.Debugf("Signaler became ready, triggering reconnect")
cm.triggerReconnect()

case <-localCandidatesChanged:
log.Debugf("Local candidates changed, triggering reconnect")
cm.triggerReconnect()

case <-ctx.Done():
return
}
}
}

func (cm *ConnMonitor) monitorSignalerReady(ctx context.Context, signalerReady chan<- struct{}) {
if cm.signaler == nil {
return
}

ticker := time.NewTicker(signalerMonitorPeriod)
defer ticker.Stop()

lastReady := true
for {
select {
case <-ticker.C:
currentReady := cm.signaler.Ready()
if !lastReady && currentReady {
select {
case signalerReady <- struct{}{}:
default:
}
}
lastReady = currentReady
case <-ctx.Done():
return
}
}
}

func (cm *ConnMonitor) monitorLocalCandidatesChanged(ctx context.Context, localCandidatesChanged chan<- struct{}) {
ufrag, pwd, err := generateICECredentials()
if err != nil {
log.Warnf("Failed to generate ICE credentials: %v", err)
return
}

ticker := time.NewTicker(candidatesMonitorPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := cm.handleCandidateTick(ctx, localCandidatesChanged, ufrag, pwd); err != nil {
log.Warnf("Failed to handle candidate tick: %v", err)
}
case <-ctx.Done():
return
}
}
}

func (cm *ConnMonitor) handleCandidateTick(ctx context.Context, localCandidatesChanged chan<- struct{}, ufrag string, pwd string) error {
log.Debugf("Gathering ICE candidates")

transportNet, err := newStdNet(cm.iFaceDiscover, cm.config.ICEConfig.InterfaceBlackList)
if err != nil {
log.Errorf("failed to create pion's stdnet: %s", err)
}

agent, err := newAgent(cm.config, transportNet, candidateTypesP2P(), ufrag, pwd)
if err != nil {
return fmt.Errorf("create ICE agent: %w", err)
}
defer func() {
if err := agent.Close(); err != nil {
log.Warnf("Failed to close ICE agent: %v", err)
}
}()

gatherDone := make(chan struct{})
err = agent.OnCandidate(func(c ice.Candidate) {
log.Tracef("Got candidate: %v", c)
if c == nil {
close(gatherDone)
}
})
if err != nil {
return fmt.Errorf("set ICE candidate handler: %w", err)
}

if err := agent.GatherCandidates(); err != nil {
return fmt.Errorf("gather ICE candidates: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, candidateGatheringTimeout)
defer cancel()

select {
case <-ctx.Done():
return fmt.Errorf("wait for gathering: %w", ctx.Err())
case <-gatherDone:
}

candidates, err := agent.GetLocalCandidates()
if err != nil {
return fmt.Errorf("get local candidates: %w", err)
}
log.Tracef("Got candidates: %v", candidates)

if changed := cm.updateCandidates(candidates); changed {
select {
case localCandidatesChanged <- struct{}{}:
default:
}
}

return nil
}

func (cm *ConnMonitor) updateCandidates(newCandidates []ice.Candidate) bool {
cm.candidatesMu.Lock()
defer cm.candidatesMu.Unlock()

if len(cm.currentCandidates) != len(newCandidates) {
cm.currentCandidates = newCandidates
return true
}

for i, candidate := range cm.currentCandidates {
if candidate.Address() != newCandidates[i].Address() {
cm.currentCandidates = newCandidates
return true
}
}

return false
}

func (cm *ConnMonitor) triggerReconnect() {
select {
case cm.reconnectCh <- struct{}{}:
default:
}
}
4 changes: 2 additions & 2 deletions client/internal/peer/stdnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
"github.com/netbirdio/netbird/client/internal/stdnet"
)

func (w *WorkerICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNet(w.config.ICEConfig.InterfaceBlackList)
func newStdNet(_ stdnet.ExternalIFaceDiscover, ifaceBlacklist []string) (*stdnet.Net, error) {
return stdnet.NewNet(ifaceBlacklist)
}
4 changes: 2 additions & 2 deletions client/internal/peer/stdnet_android.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package peer

import "github.com/netbirdio/netbird/client/internal/stdnet"

func (w *WorkerICE) newStdNet() (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(w.iFaceDiscover, w.config.ICEConfig.InterfaceBlackList)
func newStdNet(iFaceDiscover stdnet.ExternalIFaceDiscover, ifaceBlacklist []string) (*stdnet.Net, error) {
return stdnet.NewNetWithDiscover(iFaceDiscover, ifaceBlacklist)
}
Loading
Loading