Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor the AutoRelay code #1240

Merged
merged 5 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 115 additions & 128 deletions p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -101,7 +102,8 @@ type AutoRelay struct {
refCount sync.WaitGroup
ctxCancel context.CancelFunc

disconnect chan struct{}
relayFound chan struct{}
findRelaysRunning int32 // to be used as an atomic

mx sync.Mutex
relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay
Expand All @@ -119,7 +121,7 @@ func NewAutoRelay(bhost *basic.BasicHost, router routing.PeerRouting, opts ...Op
router: router,
addrsF: bhost.AddrsFactory,
relays: make(map[peer.ID]*circuitv2.Reservation),
disconnect: make(chan struct{}, 1),
relayFound: make(chan struct{}, 1),
status: network.ReachabilityUnknown,
}
for _, opt := range opts {
Expand All @@ -128,7 +130,6 @@ func NewAutoRelay(bhost *basic.BasicHost, router routing.PeerRouting, opts ...Op
}
}
bhost.AddrsFactory = ar.hostAddrs
bhost.Network().Notify(ar)
ar.refCount.Add(1)
go ar.background(ctx)
return ar, nil
Expand All @@ -141,46 +142,83 @@ func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
func (ar *AutoRelay) background(ctx context.Context) {
defer ar.refCount.Done()

subReachability, _ := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
subReachability, err := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
if err != nil {
log.Error("failed to subscribe to the EvtLocalReachabilityChanged")
return
}
defer subReachability.Close()
subConnectedness, err := ar.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
return
}
defer subConnectedness.Close()

ticker := time.NewTicker(rsvpRefreshInterval)
defer ticker.Stop()

// when true, we need to identify push
push := false

for {
// when true, we need to identify push
var push bool

select {
case ev, ok := <-subReachability.Out():
case ev, ok := <-subConnectedness.Out():
if !ok {
return
}
evt, ok := ev.(event.EvtLocalReachabilityChanged)
evt := ev.(event.EvtPeerConnectednessChanged)
switch evt.Connectedness {
case network.Connected:
// If we just connect to one of our static relays, get a reservation immediately.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might make discovery strategy refactoring tricky, but i guess it is fine for now.

for _, pi := range ar.static {
if pi.ID == evt.Peer {
rsvp, ok := ar.tryRelay(ctx, pi)
if ok {
ar.mx.Lock()
ar.relays[pi.ID] = rsvp
ar.mx.Unlock()
}
push = true
break
}
}
case network.NotConnected:
ar.mx.Lock()
if ar.usingRelay(evt.Peer) { // we were disconnected from a relay
delete(ar.relays, evt.Peer)
push = true
}
ar.mx.Unlock()
}
case ev, ok := <-subReachability.Out():
if !ok {
return
}
evt := ev.(event.EvtLocalReachabilityChanged)

var update bool
if evt.Reachability == network.ReachabilityPrivate {
// TODO: this is a long-lived (2.5min task) that should get spun up in a separate thread
// and canceled if the relay learns the nat is now public.
update = ar.findRelays(ctx)
// findRelays is a long-lived task (runs up to 2.5 minutes)
// Make sure we only start it once.
if atomic.CompareAndSwapInt32(&ar.findRelaysRunning, 0, 1) {
go func() {
defer atomic.StoreInt32(&ar.findRelaysRunning, 0)
ar.findRelays(ctx)
}()
}
}

ar.mx.Lock()
if update || (ar.status != evt.Reachability && evt.Reachability != network.ReachabilityUnknown) {
// if our reachability changed
if ar.status != evt.Reachability && evt.Reachability != network.ReachabilityUnknown {
push = true
}
ar.status = evt.Reachability
ar.mx.Unlock()

case <-ar.disconnect:
case <-ar.relayFound:
push = true

case now := <-ticker.C:
push = ar.refreshReservations(ctx, now)

case <-ctx.Done():
return
}
Expand All @@ -189,7 +227,6 @@ func (ar *AutoRelay) background(ctx context.Context) {
ar.mx.Lock()
ar.cachedAddrs = nil
ar.mx.Unlock()
push = false
ar.host.SignalAddressChange()
}
}
Expand Down Expand Up @@ -220,15 +257,12 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo
// this is a circuitv1 relay, there is no reservation
continue
}

if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) {
continue
}

p := p
g.Go(func() error {
return ar.refreshRelayReservation(ctx, p)
})
g.Go(func() error { return ar.refreshRelayReservation(ctx, p) })
}
ar.mx.Unlock()

Expand Down Expand Up @@ -256,132 +290,119 @@ func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) err
return err
}

func (ar *AutoRelay) findRelays(ctx context.Context) bool {
if ar.numRelays() >= DesiredRelays {
return false
}

update := false
func (ar *AutoRelay) findRelays(ctx context.Context) {
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
for retry := 0; retry < 5; retry++ {
if retry > 0 {
log.Debug("no relays connected; retrying in 30s")
select {
case <-time.After(30 * time.Second):
case <-timer.C:
case <-ctx.Done():
return update
return
}
}

update = ar.findRelaysOnce(ctx) || update
if ar.numRelays() > 0 {
return update
if foundAtLeastOneRelay := ar.findRelaysOnce(ctx); foundAtLeastOneRelay {
return
}
}
return update
}

func (ar *AutoRelay) findRelaysOnce(ctx context.Context) bool {
pis, err := ar.discoverRelays(ctx)
relays, err := ar.discoverRelays(ctx)
if err != nil {
log.Debugf("error discovering relays: %s", err)
return false
}
log.Debugf("discovered %d relays", len(pis))
pis = ar.selectRelays(ctx, pis)
log.Debugf("selected %d relays", len(pis))
log.Debugf("discovered %d relays", len(relays))
relays = ar.selectRelays(ctx, relays)
log.Debugf("selected %d relays", len(relays))

var found bool
for _, pi := range relays {
ar.mx.Lock()
relayInUse := ar.usingRelay(pi.ID)
ar.mx.Unlock()
if relayInUse {
continue
}
rsvp, ok := ar.tryRelay(ctx, pi)
if !ok {
continue
}
// make sure we're still connected.
if ar.host.Network().Connectedness(pi.ID) != network.Connected {
continue
}
found = true
ar.mx.Lock()
ar.relays[pi.ID] = rsvp
// protect the connection
ar.host.ConnManager().Protect(pi.ID, autorelayTag)
numRelays := len(ar.relays)
ar.mx.Unlock()

update := false
for _, pi := range pis {
update = ar.tryRelay(ctx, pi) || update
if ar.numRelays() >= DesiredRelays {
if numRelays >= DesiredRelays {
break
}
}
return update
}

func (ar *AutoRelay) numRelays() int {
ar.mx.Lock()
defer ar.mx.Unlock()
return len(ar.relays)
if found {
ar.relayFound <- struct{}{}
return true
}
return false
}

// usingRelay returns if we're currently using the given relay.
func (ar *AutoRelay) usingRelay(p peer.ID) bool {
ar.mx.Lock()
defer ar.mx.Unlock()
_, ok := ar.relays[p]
return ok
}

// addRelay adds the given relay to our set of relays.
// returns true when we add a new relay
func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool {
if ar.usingRelay(pi.ID) {
return false
}

func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) (*circuitv2.Reservation, bool) {
if !ar.connect(ctx, pi) {
return false
return nil, false
}

protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2)
if err != nil {
log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err)
return false
return nil, false
}

var supportsv1, supportsv2 bool
protoLoop:
for _, proto := range protos {
switch proto {
case protoIDv1:
supportsv1 = true
case protoIDv2:
supportsv2 = true
break protoLoop
}
}

var rsvp *circuitv2.Reservation

switch {
case supportsv2:
rsvp, err = circuitv2.Reserve(ctx, ar.host, pi)
rsvp, err := circuitv2.Reserve(ctx, ar.host, pi)
if err != nil {
log.Debugf("error reserving slot with %s: %s", pi.ID, err)
return false
return nil, false
}

return rsvp, true
case supportsv1:
ok, err := relayv1.CanHop(ctx, ar.host, pi.ID)
if err != nil {
log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err)
return false
return nil, false
}

if !ok {
// not a hop relay
return false
}

default:
// supports neither, unusable relay.
return false
return nil, ok
default: // supports neither, unusable relay.
return nil, false
}

ar.mx.Lock()
defer ar.mx.Unlock()

// make sure we're still connected.
if ar.host.Network().Connectedness(pi.ID) != network.Connected {
return false
}

ar.relays[pi.ID] = rsvp

// protect the connection
ar.host.ConnManager().Protect(pi.ID, autorelayTag)

return true
}

func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool {
Expand Down Expand Up @@ -448,10 +469,11 @@ func (ar *AutoRelay) discoverRelays(ctx context.Context) ([]peer.AddrInfo, error
}

func (ar *AutoRelay) selectRelays(ctx context.Context, pis []peer.AddrInfo) []peer.AddrInfo {
// TODO better relay selection strategy; this just selects random relays
// but we should probably use ping latency as the selection metric

shuffleRelays(pis)
// TODO: better relay selection strategy; this just selects random relays,
// but we should probably use ping latency as the selection metric
rand.Shuffle(len(pis), func(i, j int) {
pis[i], pis[j] = pis[j], pis[i]
})
return pis
}

Expand Down Expand Up @@ -509,38 +531,3 @@ func (ar *AutoRelay) Close() error {
ar.refCount.Wait()
return nil
}

func shuffleRelays(pis []peer.AddrInfo) {
for i := range pis {
j := rand.Intn(i + 1)
pis[i], pis[j] = pis[j], pis[i]
}
}

// Notifee
func (ar *AutoRelay) Listen(network.Network, ma.Multiaddr) {}
func (ar *AutoRelay) ListenClose(network.Network, ma.Multiaddr) {}
func (ar *AutoRelay) Connected(network.Network, network.Conn) {}

func (ar *AutoRelay) Disconnected(net network.Network, c network.Conn) {
p := c.RemotePeer()

ar.mx.Lock()
defer ar.mx.Unlock()

if ar.host.Network().Connectedness(p) == network.Connected {
// We have a second connection.
return
}

if _, ok := ar.relays[p]; ok {
delete(ar.relays, p)
select {
case ar.disconnect <- struct{}{}:
default:
}
}
}

func (ar *AutoRelay) OpenedStream(network.Network, network.Stream) {}
func (ar *AutoRelay) ClosedStream(network.Network, network.Stream) {}
Loading