Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[relay] Refactor initial Relay connection #2800

Merged
merged 11 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold

relayURLs, token := parseRelayInfo(loginResp)
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String())
c.statusRecorder.SetRelayMgr(relayManager)
if len(relayURLs) > 0 {
if token != nil {
if err := relayManager.UpdateToken(token); err != nil {
Expand All @@ -240,9 +241,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", "))
if err = relayManager.Serve(); err != nil {
log.Error(err)
return wrapErr(err)
}
c.statusRecorder.SetRelayMgr(relayManager)
}

peerConfig := loginResp.GetPeerConfig()
Expand Down
13 changes: 10 additions & 3 deletions client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager"


nbssh "github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
Expand Down Expand Up @@ -171,7 +170,7 @@ type Engine struct {

relayManager *relayClient.Manager
stateManager *statemanager.Manager
srWatcher *guard.SRWatcher
srWatcher *guard.SRWatcher
}

// Peer is an instance of the Connection Peer
Expand Down Expand Up @@ -538,6 +537,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {

relayMsg := wCfg.GetRelay()
if relayMsg != nil {
// when we receive token we expect valid address list too
c := &auth.Token{
Payload: relayMsg.GetTokenPayload(),
Signature: relayMsg.GetTokenSignature(),
Expand All @@ -546,9 +546,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
log.Errorf("failed to update relay token: %v", err)
return fmt.Errorf("update relay token: %w", err)
}

e.relayManager.UpdateServerURLs(relayMsg.Urls)

// Just in case the agent started with an MGM server where the relay was disabled but was later enabled.
// We can ignore all errors because the guard will manage the reconnection retries.
_ = e.relayManager.Serve()
} else {
e.relayManager.UpdateServerURLs(nil)
}

// todo update relay address in the relay manager
// todo update signal
}

Expand Down
20 changes: 9 additions & 11 deletions client/internal/peer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,25 +669,23 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
// extend the list of stun, turn servers with relay address
relayStates := slices.Clone(d.relayStates)

var relayState relay.ProbeResult

// if the server connection is not established then we will use the general address
// in case of connection we will use the instance specific address
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
if err != nil {
// TODO add their status
if errors.Is(err, relayClient.ErrRelayClientNotConnected) {
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
})
}
return relayStates
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
Err: err,
})
}
relayState.Err = err
return relayStates
}

relayState.URI = instanceAddr
relayState := relay.ProbeResult{
URI: instanceAddr,
}
return append(relayStates, relayState)
}

Expand Down
14 changes: 9 additions & 5 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ type WorkerICE struct {
hasRelayOnLocally bool
conn WorkerICECallbacks

selectedPriority ConnPriority

agent *ice.Agent
muxAgent sync.Mutex

Expand Down Expand Up @@ -92,10 +90,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {

var preferredCandidateTypes []ice.CandidateType
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
w.selectedPriority = connPriorityICEP2P
preferredCandidateTypes = icemaker.CandidateTypesP2P()
} else {
w.selectedPriority = connPriorityICETurn
preferredCandidateTypes = icemaker.CandidateTypes()
}

Expand Down Expand Up @@ -156,7 +152,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
RelayedOnLocal: isRelayCandidate(pair.Local),
}
w.log.Debugf("on ICE conn read to use ready")
go w.conn.OnConnReady(w.selectedPriority, ci)
go w.conn.OnConnReady(selectedPriority(pair), ci)
}

// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
Expand Down Expand Up @@ -378,3 +374,11 @@ func isRelayed(pair *ice.CandidatePair) bool {
}
return false
}

func selectedPriority(pair *ice.CandidatePair) ConnPriority {
if isRelayed(pair) {
return connPriorityICETurn
} else {
return connPriorityICEP2P
}
}
6 changes: 3 additions & 3 deletions relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type Client struct {
instanceURL *RelayAddr
muInstanceURL sync.Mutex

onDisconnectListener func()
onDisconnectListener func(string)
onConnectedListener func()
listenerMutex sync.Mutex
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *Client) ServerInstanceURL() (string, error) {
}

// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
func (c *Client) SetOnDisconnectListener(fn func()) {
func (c *Client) SetOnDisconnectListener(fn func(string)) {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
c.onDisconnectListener = fn
Expand Down Expand Up @@ -555,7 +555,7 @@ func (c *Client) notifyDisconnected() {
if c.onDisconnectListener == nil {
return
}
go c.onDisconnectListener()
go c.onDisconnectListener(c.connectionURL)
}

func (c *Client) notifyConnected() {
Expand Down
2 changes: 1 addition & 1 deletion relay/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func TestCloseByServer(t *testing.T) {
}

disconnected := make(chan struct{})
relayClient.SetOnDisconnectListener(func() {
relayClient.SetOnDisconnectListener(func(_ string) {
log.Infof("client disconnected")
close(disconnected)
})
Expand Down
91 changes: 73 additions & 18 deletions relay/client/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,120 @@ import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)

var (
reconnectingTimeout = 5 * time.Second
reconnectingTimeout = 60 * time.Second
)

// Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct {
ctx context.Context
relayClient *Client
// OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
OnNewRelayClient chan *Client
serverPicker *ServerPicker
}

// NewGuard creates a new guard for the relay client.
func NewGuard(context context.Context, relayClient *Client) *Guard {
func NewGuard(sp *ServerPicker) *Guard {
g := &Guard{
ctx: context,
relayClient: relayClient,
OnNewRelayClient: make(chan *Client, 1),
serverPicker: sp,
}
return g
}

// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
// to the same server that was used before, if the server URL is still valid. If the quick
// reconnect fails, it starts a ticker to periodically attempt server picking until it
// succeeds or the context is done.
//
// Parameters:
// - ctx: The context to control the lifecycle of the reconnection attempts.
// - relayClient: The relay client instance that was disconnected.
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
func (g *Guard) OnDisconnected() {
if g.quickReconnect() {
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
if relayClient == nil {
goto RETRY
}
if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
return
}

ticker := time.NewTicker(reconnectingTimeout)
RETRY:
ticker := exponentTicker(ctx)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := g.relayClient.Connect()
if err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
if err := g.retry(ctx); err != nil {
log.Errorf("failed to pick new Relay server: %s", err)
continue
}
return
case <-g.ctx.Done():
case <-ctx.Done():
return
}
}
}

func (g *Guard) quickReconnect() bool {
ctx, cancel := context.WithTimeout(g.ctx, 1500*time.Millisecond)
func (g *Guard) retry(ctx context.Context) error {
log.Infof("try to pick up a new Relay server")
relayClient, err := g.serverPicker.PickServer(ctx)
if err != nil {
return err
}

// prevent to work with a deprecated Relay client instance
g.drainRelayClientChan()

g.OnNewRelayClient <- relayClient
return nil
}

func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
defer cancel()
<-ctx.Done()

if g.ctx.Err() != nil {
if parentCtx.Err() != nil {
return false
}
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)

if err := g.relayClient.Connect(); err != nil {
if err := rc.Connect(); err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
return false
}
return true
}

func (g *Guard) drainRelayClientChan() {
select {
case <-g.OnNewRelayClient:
default:
}
}

func (g *Guard) isServerURLStillValid(rc *Client) bool {
for _, url := range g.serverPicker.ServerURLs.Load().([]string) {
if url == rc.connectionURL {
return true
}
}
return false
}

func exponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
Multiplier: 2,
MaxInterval: reconnectingTimeout,
Clock: backoff.SystemClock,
}, ctx)

return backoff.NewTicker(bo)
}
Loading
Loading