Skip to content

Commit

Permalink
Revert "Apply new logic"
Browse files Browse the repository at this point in the history
This reverts commit 1ef8abd.
  • Loading branch information
pappz committed Oct 18, 2024
1 parent 1ef8abd commit 5acc21f
Show file tree
Hide file tree
Showing 18 changed files with 454 additions and 677 deletions.
25 changes: 5 additions & 20 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/netbirdio/netbird/client/internal/peer/guard"
"maps"
"math/rand"
"net"
Expand All @@ -24,14 +23,14 @@ import (

"github.com/netbirdio/netbird/client/firewall"
"github.com/netbirdio/netbird/client/firewall/manager"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns"

"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
"github.com/netbirdio/netbird/client/internal/peer"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
"github.com/netbirdio/netbird/client/internal/relay"
"github.com/netbirdio/netbird/client/internal/rosenpass"
"github.com/netbirdio/netbird/client/internal/routemanager"
Expand Down Expand Up @@ -167,8 +166,6 @@ type Engine struct {
checks []*mgmProto.Checks

relayManager *relayClient.Manager

srWatcher *guard.SRWatcher
}

// Peer is an instance of the Connection Peer
Expand Down Expand Up @@ -372,18 +369,6 @@ func (e *Engine) Start() error {
return fmt.Errorf("initialize dns server: %w", err)
}

iceCfg := icemaker.Config{
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
NATExternalIPs: e.parseNATExternalIPMappings(),
}
// todo: review the cancel event handling
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
e.srWatcher.Start(e.ctx)

e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveProbeEvents()
Expand Down Expand Up @@ -966,7 +951,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
LocalWgPort: e.config.WgPort,
RosenpassPubKey: e.getRosenpassPubKey(),
RosenpassAddr: e.getRosenpassAddr(),
ICEConfig: icemaker.Config{
ICEConfig: peer.ICEConfig{
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
Expand All @@ -976,7 +961,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, e
},
}

peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher)
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager)
if err != nil {
return nil, err
}
Expand Down
156 changes: 127 additions & 29 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/peer/guard"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
"github.com/netbirdio/netbird/client/internal/stdnet"
relayClient "github.com/netbirdio/netbird/relay/client"
"github.com/netbirdio/netbird/route"
Expand All @@ -33,6 +32,8 @@ const (
connPriorityRelay ConnPriority = 1
connPriorityICETurn ConnPriority = 1
connPriorityICEP2P ConnPriority = 2

reconnectMaxElapsedTime = 30 * time.Minute
)

type WgConfig struct {
Expand Down Expand Up @@ -62,7 +63,7 @@ type ConnConfig struct {
RosenpassAddr string

// ICEConfig ICE protocol configuration
ICEConfig icemaker.Config
ICEConfig ICEConfig
}

type WorkerCallbacks struct {
Expand Down Expand Up @@ -108,12 +109,13 @@ type Conn struct {
// for reconnection operations
iCEDisconnected chan bool
relayDisconnected chan bool
guard *guard.Guard
connMonitor *ConnMonitor
reconnectCh <-chan struct{}
}

// NewConn creates a new not opened Conn to the remote peer.
// To establish a connection run Conn.Open
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher) (*Conn, error) {
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) {
allowedIP, allowedNet, err := net.ParseCIDR(config.WgConfig.AllowedIps)
if err != nil {
log.Errorf("failed to parse allowedIPS: %v", err)
Expand All @@ -122,8 +124,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu

ctx, ctxCancel := context.WithCancel(engineCtx)
connLog := log.WithField("peer", config.Key)
iCEDisconnected := make(chan bool, 1)
relayDisconnected := make(chan bool, 1)

var conn = &Conn{
log: connLog,
Expand All @@ -137,10 +137,18 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
allowedNet: allowedNet.String(),
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
iCEDisconnected: iCEDisconnected,
relayDisconnected: relayDisconnected,
iCEDisconnected: make(chan bool, 1),
relayDisconnected: make(chan bool, 1),
}

conn.connMonitor, conn.reconnectCh = NewConnMonitor(
signaler,
iFaceDiscover,
config,
conn.relayDisconnected,
conn.iCEDisconnected,
)

rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnDisconnected: conn.onWorkerRelayStateDisconnected,
Expand All @@ -166,8 +174,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}

conn.guard = guard.NewGuard(connLog, true, conn.isConnected, conn.handshaker, config.Timeout, srWatcher, relayDisconnected, iCEDisconnected)

go conn.handshaker.Listen()

return conn, nil
Expand All @@ -194,18 +200,24 @@ func (conn *Conn) Open() {
conn.log.Warnf("error while updating the state err: %v", err)
}

go conn.startHandshakeAndReconnect(conn.ctx)
go conn.startHandshakeAndReconnect()
}

func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
conn.waitInitialRandomSleepTime(ctx)
func (conn *Conn) startHandshakeAndReconnect() {
conn.waitInitialRandomSleepTime()

err := conn.handshaker.sendOffer()
if err != nil {
conn.log.Errorf("failed to send initial offer: %v", err)
}

conn.guard.Start(ctx)
go conn.connMonitor.Start(conn.ctx)

if conn.workerRelay.IsController() {
conn.reconnectLoopWithRetry()
} else {
conn.reconnectLoopForOnDisconnectedEvent()
}
}

// Close closes this peer Conn issuing a close event to the Conn closeCh
Expand Down Expand Up @@ -304,6 +316,104 @@ func (conn *Conn) GetKey() string {
return conn.config.Key
}

func (conn *Conn) reconnectLoopWithRetry() {
// Give chance to the peer to establish the initial connection.
// With it, we can decrease to send necessary offer
select {
case <-conn.ctx.Done():
return
case <-time.After(3 * time.Second):
}

ticker := conn.prepareExponentTicker()
defer ticker.Stop()
time.Sleep(1 * time.Second)

for {
select {
case t := <-ticker.C:
if t.IsZero() {
// in case if the ticker has been canceled by context then avoid the temporary loop
return
}

if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
if conn.statusRelay.Get() == StatusDisconnected || conn.statusICE.Get() == StatusDisconnected {
conn.log.Tracef("connectivity guard timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
}
} else {
if conn.statusICE.Get() == StatusDisconnected {
conn.log.Tracef("connectivity guard timedout, ice state: %s", conn.statusICE)
}
}

// checks if there is peer connection is established via relay or ice
if conn.isConnected() {
continue
}

err := conn.handshaker.sendOffer()
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}

case <-conn.reconnectCh:
ticker.Stop()
ticker = conn.prepareExponentTicker()

case <-conn.ctx.Done():
conn.log.Debugf("context is done, stop reconnect loop")
return
}
}
}

func (conn *Conn) prepareExponentTicker() *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: conn.config.Timeout,
MaxElapsedTime: reconnectMaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, conn.ctx)

ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created

return ticker
}

// reconnectLoopForOnDisconnectedEvent is used when the peer is not a controller and it should reconnect to the peer
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
// It track separately the ice and relay connection status. Just because a lover priority connection reestablished it does not
// mean that to switch to it. We always force to use the higher priority connection.
func (conn *Conn) reconnectLoopForOnDisconnectedEvent() {
for {
select {
case changed := <-conn.relayDisconnected:
if !changed {
continue
}
conn.log.Debugf("Relay state changed, try to send new offer")
case changed := <-conn.iCEDisconnected:
if !changed {
continue
}
conn.log.Debugf("ICE state changed, try to send new offer")
case <-conn.ctx.Done():
conn.log.Debugf("context is done, stop reconnect loop")
return
}

err := conn.handshaker.SendOffer()
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
}
}

// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
conn.mu.Lock()
Expand Down Expand Up @@ -583,7 +693,7 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd
}
}

func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
func (conn *Conn) waitInitialRandomSleepTime() {
minWait := 100
maxWait := 800
duration := time.Duration(rand.Intn(maxWait-minWait)+minWait) * time.Millisecond
Expand All @@ -592,7 +702,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
defer timeout.Stop()

select {
case <-ctx.Done():
case <-conn.ctx.Done():
case <-timeout.C:
}
}
Expand Down Expand Up @@ -721,18 +831,6 @@ func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
}
}

func (conn *Conn) logTraceConnState() {
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
if conn.statusRelay.Get() == StatusDisconnected || conn.statusICE.Get() == StatusDisconnected {
conn.log.Tracef("connectivity guard timedout, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
}
} else {
if conn.statusICE.Get() == StatusDisconnected {
conn.log.Tracef("connectivity guard timedout, ice state: %s", conn.statusICE)
}
}
}

func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
return remoteRosenpassPubKey != nil
}
Expand Down
Loading

0 comments on commit 5acc21f

Please sign in to comment.