Skip to content

Commit

Permalink
[relay] Refactor initial Relay connection (#2800)
Browse files Browse the repository at this point in the history
Can support firewalls with restricted WS rules

allow to run engine without Relay servers
keep up to date Relay address changes
  • Loading branch information
pappz authored Nov 22, 2024
1 parent 9db1932 commit 2a5cb16
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 96 deletions.
3 changes: 1 addition & 2 deletions client/internal/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold

relayURLs, token := parseRelayInfo(loginResp)
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String())
c.statusRecorder.SetRelayMgr(relayManager)
if len(relayURLs) > 0 {
if token != nil {
if err := relayManager.UpdateToken(token); err != nil {
Expand All @@ -242,9 +243,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold
log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", "))
if err = relayManager.Serve(); err != nil {
log.Error(err)
return wrapErr(err)
}
c.statusRecorder.SetRelayMgr(relayManager)
}

peerConfig := loginResp.GetPeerConfig()
Expand Down
10 changes: 9 additions & 1 deletion client/internal/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {

relayMsg := wCfg.GetRelay()
if relayMsg != nil {
// when we receive token we expect valid address list too
c := &auth.Token{
Payload: relayMsg.GetTokenPayload(),
Signature: relayMsg.GetTokenSignature(),
Expand All @@ -546,9 +547,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
log.Errorf("failed to update relay token: %v", err)
return fmt.Errorf("update relay token: %w", err)
}

e.relayManager.UpdateServerURLs(relayMsg.Urls)

// Just in case the agent started with an MGM server where the relay was disabled but was later enabled.
// We can ignore all errors because the guard will manage the reconnection retries.
_ = e.relayManager.Serve()
} else {
e.relayManager.UpdateServerURLs(nil)
}

// todo update relay address in the relay manager
// todo update signal
}

Expand Down
20 changes: 9 additions & 11 deletions client/internal/peer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,25 +676,23 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
// extend the list of stun, turn servers with relay address
relayStates := slices.Clone(d.relayStates)

var relayState relay.ProbeResult

// if the server connection is not established then we will use the general address
// in case of connection we will use the instance specific address
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
if err != nil {
// TODO add their status
if errors.Is(err, relayClient.ErrRelayClientNotConnected) {
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
})
}
return relayStates
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
Err: err,
})
}
relayState.Err = err
return relayStates
}

relayState.URI = instanceAddr
relayState := relay.ProbeResult{
URI: instanceAddr,
}
return append(relayStates, relayState)
}

Expand Down
14 changes: 9 additions & 5 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ type WorkerICE struct {
hasRelayOnLocally bool
conn WorkerICECallbacks

selectedPriority ConnPriority

agent *ice.Agent
muxAgent sync.Mutex

Expand Down Expand Up @@ -95,10 +93,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {

var preferredCandidateTypes []ice.CandidateType
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
w.selectedPriority = connPriorityICEP2P
preferredCandidateTypes = icemaker.CandidateTypesP2P()
} else {
w.selectedPriority = connPriorityICETurn
preferredCandidateTypes = icemaker.CandidateTypes()
}

Expand Down Expand Up @@ -159,7 +155,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
RelayedOnLocal: isRelayCandidate(pair.Local),
}
w.log.Debugf("on ICE conn read to use ready")
go w.conn.OnConnReady(w.selectedPriority, ci)
go w.conn.OnConnReady(selectedPriority(pair), ci)
}

// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
Expand Down Expand Up @@ -394,3 +390,11 @@ func isRelayed(pair *ice.CandidatePair) bool {
}
return false
}

func selectedPriority(pair *ice.CandidatePair) ConnPriority {
if isRelayed(pair) {
return connPriorityICETurn
} else {
return connPriorityICEP2P
}
}
6 changes: 3 additions & 3 deletions relay/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type Client struct {
instanceURL *RelayAddr
muInstanceURL sync.Mutex

onDisconnectListener func()
onDisconnectListener func(string)
onConnectedListener func()
listenerMutex sync.Mutex
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *Client) ServerInstanceURL() (string, error) {
}

// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
func (c *Client) SetOnDisconnectListener(fn func()) {
func (c *Client) SetOnDisconnectListener(fn func(string)) {
c.listenerMutex.Lock()
defer c.listenerMutex.Unlock()
c.onDisconnectListener = fn
Expand Down Expand Up @@ -554,7 +554,7 @@ func (c *Client) notifyDisconnected() {
if c.onDisconnectListener == nil {
return
}
go c.onDisconnectListener()
go c.onDisconnectListener(c.connectionURL)
}

func (c *Client) notifyConnected() {
Expand Down
2 changes: 1 addition & 1 deletion relay/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func TestCloseByServer(t *testing.T) {
}

disconnected := make(chan struct{})
relayClient.SetOnDisconnectListener(func() {
relayClient.SetOnDisconnectListener(func(_ string) {
log.Infof("client disconnected")
close(disconnected)
})
Expand Down
91 changes: 73 additions & 18 deletions relay/client/guard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,120 @@ import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)

var (
reconnectingTimeout = 5 * time.Second
reconnectingTimeout = 60 * time.Second
)

// Guard manage the reconnection tries to the Relay server in case of disconnection event.
type Guard struct {
ctx context.Context
relayClient *Client
// OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
OnNewRelayClient chan *Client
serverPicker *ServerPicker
}

// NewGuard creates a new guard for the relay client.
func NewGuard(context context.Context, relayClient *Client) *Guard {
func NewGuard(sp *ServerPicker) *Guard {
g := &Guard{
ctx: context,
relayClient: relayClient,
OnNewRelayClient: make(chan *Client, 1),
serverPicker: sp,
}
return g
}

// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
// to the same server that was used before, if the server URL is still valid. If the quick
// reconnect fails, it starts a ticker to periodically attempt server picking until it
// succeeds or the context is done.
//
// Parameters:
// - ctx: The context to control the lifecycle of the reconnection attempts.
// - relayClient: The relay client instance that was disconnected.
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
func (g *Guard) OnDisconnected() {
if g.quickReconnect() {
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
if relayClient == nil {
goto RETRY
}
if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
return
}

ticker := time.NewTicker(reconnectingTimeout)
RETRY:
ticker := exponentTicker(ctx)
defer ticker.Stop()

for {
select {
case <-ticker.C:
err := g.relayClient.Connect()
if err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
if err := g.retry(ctx); err != nil {
log.Errorf("failed to pick new Relay server: %s", err)
continue
}
return
case <-g.ctx.Done():
case <-ctx.Done():
return
}
}
}

func (g *Guard) quickReconnect() bool {
ctx, cancel := context.WithTimeout(g.ctx, 1500*time.Millisecond)
func (g *Guard) retry(ctx context.Context) error {
log.Infof("try to pick up a new Relay server")
relayClient, err := g.serverPicker.PickServer(ctx)
if err != nil {
return err
}

// prevent to work with a deprecated Relay client instance
g.drainRelayClientChan()

g.OnNewRelayClient <- relayClient
return nil
}

func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
defer cancel()
<-ctx.Done()

if g.ctx.Err() != nil {
if parentCtx.Err() != nil {
return false
}
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)

if err := g.relayClient.Connect(); err != nil {
if err := rc.Connect(); err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
return false
}
return true
}

func (g *Guard) drainRelayClientChan() {
select {
case <-g.OnNewRelayClient:
default:
}
}

func (g *Guard) isServerURLStillValid(rc *Client) bool {
for _, url := range g.serverPicker.ServerURLs.Load().([]string) {
if url == rc.connectionURL {
return true
}
}
return false
}

func exponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 2 * time.Second,
Multiplier: 2,
MaxInterval: reconnectingTimeout,
Clock: backoff.SystemClock,
}, ctx)

return backoff.NewTicker(bo)
}
Loading

0 comments on commit 2a5cb16

Please sign in to comment.