From 6d8ff5cdec285365bc80c83f15f40fcbf37b05a4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 19:48:05 +0300 Subject: [PATCH 01/16] move host/relay to host/autorelay --- config/config.go | 16 +++++----- options.go | 2 +- p2p/host/{relay => autorelay}/addrsplosion.go | 2 +- .../{relay => autorelay}/addrsplosion_test.go | 2 +- p2p/host/{relay => autorelay}/autorelay.go | 2 +- .../{relay => autorelay}/autorelay_test.go | 30 ++++++++++--------- p2p/host/{relay => autorelay}/doc.go | 2 +- p2p/host/{relay => autorelay}/log.go | 2 +- p2p/host/{relay => autorelay}/relay.go | 2 +- 9 files changed, 31 insertions(+), 29 deletions(-) rename p2p/host/{relay => autorelay}/addrsplosion.go (99%) rename p2p/host/{relay => autorelay}/addrsplosion_test.go (99%) rename p2p/host/{relay => autorelay}/autorelay.go (99%) rename p2p/host/{relay => autorelay}/autorelay_test.go (89%) rename p2p/host/{relay => autorelay}/doc.go (98%) rename p2p/host/{relay => autorelay}/log.go (83%) rename p2p/host/{relay => autorelay}/relay.go (98%) diff --git a/config/config.go b/config/config.go index 843566c10d..3662134a3e 100644 --- a/config/config.go +++ b/config/config.go @@ -17,8 +17,8 @@ import ( "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-peerstore/pstoremem" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - "github.com/libp2p/go-libp2p/p2p/host/relay" routed "github.com/libp2p/go-libp2p/p2p/host/routed" circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" @@ -209,7 +209,7 @@ func (cfg *Config) NewNode() (host.Host, error) { // TODO: We shouldn't be doing this here. oldFactory := h.AddrsFactory h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { - return oldFactory(relay.Filter(addrs)) + return oldFactory(autorelay.Filter(addrs)) } } @@ -237,7 +237,7 @@ func (cfg *Config) NewNode() (host.Host, error) { // Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is // used by AutoNAT below. - var autorelay *relay.AutoRelay + var ar *autorelay.AutoRelay addrF := h.AddrsFactory if cfg.EnableAutoRelay { if !cfg.Relay { @@ -246,7 +246,7 @@ func (cfg *Config) NewNode() (host.Host, error) { } if len(cfg.StaticRelays) > 0 { - autorelay = relay.NewAutoRelay(h, nil, router, cfg.StaticRelays) + ar = autorelay.NewAutoRelay(h, nil, router, cfg.StaticRelays) } else { if router == nil { h.Close() @@ -259,7 +259,7 @@ func (cfg *Config) NewNode() (host.Host, error) { } discovery := discovery.NewRoutingDiscovery(crouter) - autorelay = relay.NewAutoRelay(h, discovery, router, cfg.StaticRelays) + ar = autorelay.NewAutoRelay(h, discovery, router, cfg.StaticRelays) } } @@ -330,15 +330,15 @@ func (cfg *Config) NewNode() (host.Host, error) { if router != nil { ho = routed.Wrap(h, router) } - if autorelay != nil { - return &autoRelayHost{Host: ho, autoRelay: autorelay}, nil + if ar != nil { + return &autoRelayHost{Host: ho, autoRelay: ar}, nil } return ho, nil } type autoRelayHost struct { host.Host - autoRelay *relay.AutoRelay + autoRelay *autorelay.AutoRelay } func (h *autoRelayHost) Close() error { diff --git a/options.go b/options.go index b82236c0df..6b42eff023 100644 --- a/options.go +++ b/options.go @@ -17,8 +17,8 @@ import ( "github.com/libp2p/go-libp2p-core/pnet" "github.com/libp2p/go-libp2p/config" + autorelay "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - autorelay "github.com/libp2p/go-libp2p/p2p/host/relay" holepunch "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" ma "github.com/multiformats/go-multiaddr" diff --git a/p2p/host/relay/addrsplosion.go b/p2p/host/autorelay/addrsplosion.go similarity index 99% rename from p2p/host/relay/addrsplosion.go rename to p2p/host/autorelay/addrsplosion.go index 2a2fc3c69f..be3be52b3b 100644 --- a/p2p/host/relay/addrsplosion.go +++ b/p2p/host/autorelay/addrsplosion.go @@ -1,4 +1,4 @@ -package relay +package autorelay import ( "encoding/binary" diff --git a/p2p/host/relay/addrsplosion_test.go b/p2p/host/autorelay/addrsplosion_test.go similarity index 99% rename from p2p/host/relay/addrsplosion_test.go rename to p2p/host/autorelay/addrsplosion_test.go index 394f26e59f..aa79d1c14d 100644 --- a/p2p/host/relay/addrsplosion_test.go +++ b/p2p/host/autorelay/addrsplosion_test.go @@ -1,4 +1,4 @@ -package relay +package autorelay import ( "testing" diff --git a/p2p/host/relay/autorelay.go b/p2p/host/autorelay/autorelay.go similarity index 99% rename from p2p/host/relay/autorelay.go rename to p2p/host/autorelay/autorelay.go index bf4a4d5a08..55eb782476 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -1,4 +1,4 @@ -package relay +package autorelay import ( "context" diff --git a/p2p/host/relay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go similarity index 89% rename from p2p/host/relay/autorelay_test.go rename to p2p/host/autorelay/autorelay_test.go index 51ed3405d0..a7094be2bf 100644 --- a/p2p/host/relay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -1,4 +1,4 @@ -package relay_test +package autorelay_test import ( "context" @@ -9,7 +9,7 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/p2p/host/relay" + "github.com/libp2p/go-libp2p/p2p/host/autorelay" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" "github.com/libp2p/go-libp2p-core/event" @@ -27,8 +27,8 @@ import ( // test specific parameters func init() { - relay.BootDelay = 1 * time.Second - relay.AdvertiseBootDelay = 100 * time.Millisecond + autorelay.BootDelay = 1 * time.Second + autorelay.AdvertiseBootDelay = 100 * time.Millisecond } // mock routing @@ -133,16 +133,18 @@ func TestAutoRelay(t *testing.T) { // this is the relay host // announce dns addrs because filter out private addresses from relays, // and we consider dns addresses "public". - relayHost, err := libp2p.New(libp2p.DisableRelay(), libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - for i, addr := range addrs { - saddr := addr.String() - if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { - addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") - addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) + relayHost, err := libp2p.New( + libp2p.DisableRelay(), + libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { + for i, addr := range addrs { + saddr := addr.String() + if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { + addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") + addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) + } } - } - return addrs - })) + return addrs + })) if err != nil { t.Fatal(err) } @@ -160,7 +162,7 @@ func TestAutoRelay(t *testing.T) { t.Fatal(err) } relayDiscovery := discovery.NewRoutingDiscovery(relayRouting) - relay.Advertise(ctx, relayDiscovery) + autorelay.Advertise(ctx, relayDiscovery) // the client hosts h1, err := libp2p.New(libp2p.EnableRelay()) diff --git a/p2p/host/relay/doc.go b/p2p/host/autorelay/doc.go similarity index 98% rename from p2p/host/relay/doc.go rename to p2p/host/autorelay/doc.go index f7511ad97e..2e93c3e8b8 100644 --- a/p2p/host/relay/doc.go +++ b/p2p/host/autorelay/doc.go @@ -25,4 +25,4 @@ How it works: advertising relay addresses. The new set of addresses is propagated to connected peers through the `identify/push` protocol. */ -package relay +package autorelay diff --git a/p2p/host/relay/log.go b/p2p/host/autorelay/log.go similarity index 83% rename from p2p/host/relay/log.go rename to p2p/host/autorelay/log.go index 975949a48d..9c4e5ed52c 100644 --- a/p2p/host/relay/log.go +++ b/p2p/host/autorelay/log.go @@ -1,4 +1,4 @@ -package relay +package autorelay import ( logging "github.com/ipfs/go-log/v2" diff --git a/p2p/host/relay/relay.go b/p2p/host/autorelay/relay.go similarity index 98% rename from p2p/host/relay/relay.go rename to p2p/host/autorelay/relay.go index 72978a222f..4d7fb0adee 100644 --- a/p2p/host/relay/relay.go +++ b/p2p/host/autorelay/relay.go @@ -1,4 +1,4 @@ -package relay +package autorelay import ( "context" From e002a242300e77e3537c90c3dfe124d9cf2d486d Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 20:12:09 +0300 Subject: [PATCH 02/16] move autorelay wrapper host to package --- config/config.go | 12 +----------- p2p/host/autorelay/host.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 11 deletions(-) create mode 100644 p2p/host/autorelay/host.go diff --git a/config/config.go b/config/config.go index 3662134a3e..30b792f93f 100644 --- a/config/config.go +++ b/config/config.go @@ -331,21 +331,11 @@ func (cfg *Config) NewNode() (host.Host, error) { ho = routed.Wrap(h, router) } if ar != nil { - return &autoRelayHost{Host: ho, autoRelay: ar}, nil + return autorelay.NewAutoRelayHost(ho, ar), nil } return ho, nil } -type autoRelayHost struct { - host.Host - autoRelay *autorelay.AutoRelay -} - -func (h *autoRelayHost) Close() error { - _ = h.autoRelay.Close() - return h.Host.Close() -} - // Option is a libp2p config option that can be given to the libp2p constructor // (`libp2p.New`). type Option func(cfg *Config) error diff --git a/p2p/host/autorelay/host.go b/p2p/host/autorelay/host.go new file mode 100644 index 0000000000..9a90c9e6ea --- /dev/null +++ b/p2p/host/autorelay/host.go @@ -0,0 +1,19 @@ +package autorelay + +import ( + "github.com/libp2p/go-libp2p-core/host" +) + +type AutoRelayHost struct { + host.Host + ar *AutoRelay +} + +func (h *AutoRelayHost) Close() error { + _ = h.ar.Close() + return h.Host.Close() +} + +func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost { + return &AutoRelayHost{Host: h, ar: ar} +} From a3a4881a910bb184ef8b77ed9640f56f8436a26c Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 21:51:44 +0300 Subject: [PATCH 03/16] support v2 relays in autorelay --- p2p/host/autorelay/autorelay.go | 132 +++++++++++++++++++++++++++++--- 1 file changed, 122 insertions(+), 10 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 55eb782476..b9157dbf06 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -15,6 +15,8 @@ import ( basic "github.com/libp2p/go-libp2p/p2p/host/basic" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" + circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -30,7 +32,7 @@ var ( BootDelay = 20 * time.Second ) -// These are the known PL-operated relays +// These are the known PL-operated v1 relays; will be decommissioned in 2022. var DefaultRelays = []string{ "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", @@ -55,7 +57,7 @@ type AutoRelay struct { disconnect chan struct{} mx sync.Mutex - relays map[peer.ID]struct{} + relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay status network.Reachability cachedAddrs []ma.Multiaddr @@ -71,7 +73,7 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router router: router, addrsF: bhost.AddrsFactory, static: static, - relays: make(map[peer.ID]struct{}), + relays: make(map[peer.ID]*circuitv2.Reservation), disconnect: make(chan struct{}, 1), status: network.ReachabilityUnknown, } @@ -79,6 +81,7 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router bhost.Network().Notify(ar) ar.refCount.Add(1) go ar.background(ctx) + go ar.refresh(ctx) return ar } @@ -135,6 +138,75 @@ func (ar *AutoRelay) background(ctx context.Context) { } } +func (ar *AutoRelay) refresh(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var toRefresh []peer.ID + + ar.mx.Lock() + if ar.status == network.ReachabilityPublic { + // we are public, forget about the relays, unprotect peers + for p := range ar.relays { + ar.host.ConnManager().Unprotect(p, "autorelay") + delete(ar.relays, p) + } + + ar.mx.Unlock() + continue + } + + // find reservations about to expire + now := time.Now() + for p, rsvp := range ar.relays { + if rsvp == nil { + continue + } + + if now.Add(time.Minute).Before(rsvp.Expiration) { + continue + } + + toRefresh = append(toRefresh, p) + } + ar.mx.Unlock() + + // refresh reservations about to expire in parallel + var wg sync.WaitGroup + for _, p := range toRefresh { + wg.Add(1) + + go func(p peer.ID) { + rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) + ar.mx.Lock() + if err != nil { + log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err) + delete(ar.relays, p) + // unprotect the connection + ar.host.ConnManager().Unprotect(p, "autorelay") + // notify of relay disconnection + select { + case ar.disconnect <- struct{}{}: + default: + } + } else { + log.Debugf("refreshed relay slot reservation with %s", p) + ar.relays[p] = rsvp + } + ar.mx.Unlock() + }(p) + } + wg.Wait() + + case <-ctx.Done(): + return + } + } +} + func (ar *AutoRelay) findRelays(ctx context.Context) bool { if ar.numRelays() >= DesiredRelays { return false @@ -204,14 +276,48 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool { return false } - ok, err := relayv1.CanHop(ctx, ar.host, pi.ID) + protoIDv1 := string(relayv1.ProtoID) + protoIDv2 := string(circuitv2_proto.ProtoIDv2Hop) + protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) if err != nil { - log.Debugf("error querying relay: %s", err.Error()) + log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err) return false } - if !ok { - // not a hop relay + var supportsv1, supportsv2 bool + for _, proto := range protos { + switch proto { + case protoIDv1: + supportsv1 = true + case protoIDv2: + supportsv2 = true + } + } + + var rsvp *circuitv2.Reservation + + switch { + case supportsv2: + rsvp, err = circuitv2.Reserve(ctx, ar.host, pi) + if err != nil { + log.Debugf("error reserving slot with %s: %s", pi.ID, err) + return false + } + + case supportsv1: + ok, err := relayv1.CanHop(ctx, ar.host, pi.ID) + if err != nil { + log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err) + return false + } + + if !ok { + // not a hop relay + return false + } + + default: + // supports neither, unusable relay. return false } @@ -222,7 +328,11 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool { if ar.host.Network().Connectedness(pi.ID) != network.Connected { return false } - ar.relays[pi.ID] = struct{}{} + + ar.relays[pi.ID] = rsvp + + // protect the connection + ar.host.ConnManager().Protect(pi.ID, "autorelay") return true } @@ -246,8 +356,10 @@ func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool { return false } - // tag the connection as very important - ar.host.ConnManager().TagPeer(pi.ID, "relay", 42) + // wait for identify to complete so that we can check the supported protocols + // TODO we should do this without a delay/sleep. + time.Sleep(time.Second) + return true } From f405703774804c359e7e77cd0a6069f5f2226d4a Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 21:55:51 +0300 Subject: [PATCH 04/16] test autorelay with both v1 and v2 relays --- p2p/host/autorelay/autorelay_test.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index a7094be2bf..046a91cc38 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/p2p/host/autorelay" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" @@ -115,7 +116,15 @@ func connect(t *testing.T, a, b host.Host) { } // and the actual test! -func TestAutoRelay(t *testing.T) { +func TestAutoRelayv1(t *testing.T) { + testAutoRelay(t, false) +} + +func TestAutoRelayv2(t *testing.T) { + testAutoRelay(t, true) +} + +func testAutoRelay(t *testing.T, useRelayv2 bool) { manet.Private4 = []*net.IPNet{} ctx, cancel := context.WithCancel(context.Background()) @@ -150,11 +159,19 @@ func TestAutoRelay(t *testing.T) { } // instantiate the relay - r, err := relayv1.NewRelay(relayHost) - if err != nil { - t.Fatal(err) + if useRelayv2 { + r, err := relayv2.New(relayHost) + if err != nil { + t.Fatal(err) + } + defer r.Close() + } else { + r, err := relayv1.NewRelay(relayHost) + if err != nil { + t.Fatal(err) + } + defer r.Close() } - defer r.Close() // advertise the relay relayRouting, err := makeRouting(relayHost) From 7f1dfdfd43ecb837481e45633d8e6468862fae6c Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 22:09:14 +0300 Subject: [PATCH 05/16] fix test race --- p2p/host/autorelay/autorelay_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 046a91cc38..3b4bd35bfb 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -116,17 +116,16 @@ func connect(t *testing.T, a, b host.Host) { } // and the actual test! -func TestAutoRelayv1(t *testing.T) { - testAutoRelay(t, false) -} +func TestAutoRelay(t *testing.T) { + manet.Private4 = []*net.IPNet{} -func TestAutoRelayv2(t *testing.T) { + t.Log("testing autorelay with circuitv1 relay") + testAutoRelay(t, false) + t.Log("testing autorelay with circuitv2 relay") testAutoRelay(t, true) } func testAutoRelay(t *testing.T, useRelayv2 bool) { - manet.Private4 = []*net.IPNet{} - ctx, cancel := context.WithCancel(context.Background()) defer cancel() From b819bd87242e413cef7c0c51da7ff0598e8ccd84 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 23 Sep 2021 22:12:45 +0300 Subject: [PATCH 06/16] go mod tidy examples/pubsub/chat static checker complains; sigh. --- examples/pubsub/chat/go.sum | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/pubsub/chat/go.sum b/examples/pubsub/chat/go.sum index 5d60db9335..a336faf695 100644 --- a/examples/pubsub/chat/go.sum +++ b/examples/pubsub/chat/go.sum @@ -404,6 +404,7 @@ github.com/libp2p/go-addr-util v0.1.0/go.mod h1:6I3ZYuFr2O/9D+SoyM0zEw0EF3YkldtT github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= +github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic= github.com/libp2p/go-conn-security-multistream v0.2.0/go.mod h1:hZN4MjlNetKD3Rq5Jb/P5ohUnFLNzEAR4DLSzpn2QLU= github.com/libp2p/go-conn-security-multistream v0.2.1/go.mod h1:cR1d8gA0Hr59Fj6NhaTpFhJZrjSYuNmhpT2r25zYR70= @@ -414,6 +415,7 @@ github.com/libp2p/go-eventbus v0.2.1/go.mod h1:jc2S4SoEVPP48H9Wpzm5aiGwUCBMfGhVh github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= +github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a h1:6yEuCOY31elgeJ2KA2JiREZjIznvH6lOWCdHRuhgEgc= github.com/libp2p/go-libp2p-asn-util v0.0.0-20210818120414-1f382a4aa43a/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= github.com/libp2p/go-libp2p-autonat v0.5.0 h1:/+3+4NcQV47DQ/duvRyFDP8oxv6CQTvSKYD5iWoPcYs= github.com/libp2p/go-libp2p-autonat v0.5.0/go.mod h1:085tmmuXn0nXgFwuF7a2tt4UxgTjuapbuml27v4htKY= From 24bd04d03e4823c1523ca36e4d21200cff76da27 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 14:27:57 +0300 Subject: [PATCH 07/16] refactor reservation refresh loop --- p2p/host/autorelay/autorelay.go | 68 +++++++++++++++++---------------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index b9157dbf06..4ec9611100 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -24,6 +24,9 @@ import ( const ( RelayRendezvous = "/libp2p/relay" + + rsvpRefreshInterval = time.Minute + rsvpExpirationSlack = 2 * time.Minute ) var ( @@ -139,14 +142,12 @@ func (ar *AutoRelay) background(ctx context.Context) { } func (ar *AutoRelay) refresh(ctx context.Context) { - ticker := time.NewTicker(time.Minute) + ticker := time.NewTicker(rsvpRefreshInterval) defer ticker.Stop() for { select { - case <-ticker.C: - var toRefresh []peer.ID - + case now := <-ticker.C: ar.mx.Lock() if ar.status == network.ReachabilityPublic { // we are public, forget about the relays, unprotect peers @@ -159,46 +160,22 @@ func (ar *AutoRelay) refresh(ctx context.Context) { continue } - // find reservations about to expire - now := time.Now() + // find reservations about to expire and refresh them in parallel + var wg sync.WaitGroup for p, rsvp := range ar.relays { if rsvp == nil { continue } - if now.Add(time.Minute).Before(rsvp.Expiration) { + if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { continue } - toRefresh = append(toRefresh, p) + wg.Add(1) + go ar.refreshRsvp(ctx, p, &wg) } ar.mx.Unlock() - // refresh reservations about to expire in parallel - var wg sync.WaitGroup - for _, p := range toRefresh { - wg.Add(1) - - go func(p peer.ID) { - rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) - ar.mx.Lock() - if err != nil { - log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err) - delete(ar.relays, p) - // unprotect the connection - ar.host.ConnManager().Unprotect(p, "autorelay") - // notify of relay disconnection - select { - case ar.disconnect <- struct{}{}: - default: - } - } else { - log.Debugf("refreshed relay slot reservation with %s", p) - ar.relays[p] = rsvp - } - ar.mx.Unlock() - }(p) - } wg.Wait() case <-ctx.Done(): @@ -207,6 +184,31 @@ func (ar *AutoRelay) refresh(ctx context.Context) { } } +func (ar *AutoRelay) refreshRsvp(ctx context.Context, p peer.ID, wg *sync.WaitGroup) { + defer wg.Done() + + rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) + + ar.mx.Lock() + defer ar.mx.Unlock() + + if err != nil { + log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err) + + delete(ar.relays, p) + // unprotect the connection + ar.host.ConnManager().Unprotect(p, "autorelay") + // notify of relay disconnection + select { + case ar.disconnect <- struct{}{}: + default: + } + } else { + log.Debugf("refreshed relay slot reservation with %s", p) + ar.relays[p] = rsvp + } +} + func (ar *AutoRelay) findRelays(ctx context.Context) bool { if ar.numRelays() >= DesiredRelays { return false From 8c0e6b3589bb94bad2abccfa8dd762f7658e855a Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 14:34:46 +0300 Subject: [PATCH 08/16] merge background and refresh goroutines --- p2p/host/autorelay/autorelay.go | 72 ++++++++++++++++----------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 4ec9611100..7566bd1725 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -84,7 +84,6 @@ func NewAutoRelay(bhost *basic.BasicHost, discover discovery.Discoverer, router bhost.Network().Notify(ar) ar.refCount.Add(1) go ar.background(ctx) - go ar.refresh(ctx) return ar } @@ -98,6 +97,9 @@ func (ar *AutoRelay) background(ctx context.Context) { subReachability, _ := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) defer subReachability.Close() + ticker := time.NewTicker(rsvpRefreshInterval) + defer ticker.Stop() + // when true, we need to identify push push := false @@ -125,8 +127,13 @@ func (ar *AutoRelay) background(ctx context.Context) { } ar.status = evt.Reachability ar.mx.Unlock() + case <-ar.disconnect: push = true + + case now := <-ticker.C: + ar.refreshReservations(ctx, now) + case <-ctx.Done(): return } @@ -141,50 +148,39 @@ func (ar *AutoRelay) background(ctx context.Context) { } } -func (ar *AutoRelay) refresh(ctx context.Context) { - ticker := time.NewTicker(rsvpRefreshInterval) - defer ticker.Stop() - - for { - select { - case now := <-ticker.C: - ar.mx.Lock() - if ar.status == network.ReachabilityPublic { - // we are public, forget about the relays, unprotect peers - for p := range ar.relays { - ar.host.ConnManager().Unprotect(p, "autorelay") - delete(ar.relays, p) - } - - ar.mx.Unlock() - continue - } - - // find reservations about to expire and refresh them in parallel - var wg sync.WaitGroup - for p, rsvp := range ar.relays { - if rsvp == nil { - continue - } - - if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { - continue - } +func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) { + ar.mx.Lock() + if ar.status == network.ReachabilityPublic { + // we are public, forget about the relays, unprotect peers + for p := range ar.relays { + ar.host.ConnManager().Unprotect(p, "autorelay") + delete(ar.relays, p) + } - wg.Add(1) - go ar.refreshRsvp(ctx, p, &wg) - } - ar.mx.Unlock() + ar.mx.Unlock() + return + } - wg.Wait() + // find reservations about to expire and refresh them in parallel + var wg sync.WaitGroup + for p, rsvp := range ar.relays { + if rsvp == nil { + continue + } - case <-ctx.Done(): - return + if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { + continue } + + wg.Add(1) + go ar.refreshRelayReservation(ctx, p, &wg) } + ar.mx.Unlock() + + wg.Wait() } -func (ar *AutoRelay) refreshRsvp(ctx context.Context, p peer.ID, wg *sync.WaitGroup) { +func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg *sync.WaitGroup) { defer wg.Done() rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) From 5ec5b9350b5b0de3b4776e3ed343008cfc4ee247 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 14:44:09 +0300 Subject: [PATCH 09/16] handle pushes synchronously from reservation refresh failures --- p2p/host/autorelay/autorelay.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 7566bd1725..e469ea4e47 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/discovery" @@ -132,7 +133,7 @@ func (ar *AutoRelay) background(ctx context.Context) { push = true case now := <-ticker.C: - ar.refreshReservations(ctx, now) + push = ar.refreshReservations(ctx, now) case <-ctx.Done(): return @@ -148,7 +149,7 @@ func (ar *AutoRelay) background(ctx context.Context) { } } -func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) { +func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) bool { ar.mx.Lock() if ar.status == network.ReachabilityPublic { // we are public, forget about the relays, unprotect peers @@ -158,11 +159,15 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) { } ar.mx.Unlock() - return + return true } // find reservations about to expire and refresh them in parallel - var wg sync.WaitGroup + var ( + wg sync.WaitGroup + fail int32 + ) + for p, rsvp := range ar.relays { if rsvp == nil { continue @@ -173,14 +178,15 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) { } wg.Add(1) - go ar.refreshRelayReservation(ctx, p, &wg) + go ar.refreshRelayReservation(ctx, p, &wg, &fail) } ar.mx.Unlock() wg.Wait() + return fail > 0 } -func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg *sync.WaitGroup) { +func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg *sync.WaitGroup, fail *int32) { defer wg.Done() rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) @@ -194,11 +200,8 @@ func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg delete(ar.relays, p) // unprotect the connection ar.host.ConnManager().Unprotect(p, "autorelay") - // notify of relay disconnection - select { - case ar.disconnect <- struct{}{}: - default: - } + // increment fail counter + atomic.AddInt32(fail, 1) } else { log.Debugf("refreshed relay slot reservation with %s", p) ar.relays[p] = rsvp From 194c08c3c349a4b0753341e14a15ddc65ba6b92a Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 14:45:34 +0300 Subject: [PATCH 10/16] make connmanager tag a package level constant --- p2p/host/autorelay/autorelay.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index e469ea4e47..938c51b7b7 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -28,6 +28,8 @@ const ( rsvpRefreshInterval = time.Minute rsvpExpirationSlack = 2 * time.Minute + + autorelayTag = "autorelay" ) var ( @@ -154,7 +156,7 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo if ar.status == network.ReachabilityPublic { // we are public, forget about the relays, unprotect peers for p := range ar.relays { - ar.host.ConnManager().Unprotect(p, "autorelay") + ar.host.ConnManager().Unprotect(p, autorelayTag) delete(ar.relays, p) } @@ -199,7 +201,7 @@ func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg delete(ar.relays, p) // unprotect the connection - ar.host.ConnManager().Unprotect(p, "autorelay") + ar.host.ConnManager().Unprotect(p, autorelayTag) // increment fail counter atomic.AddInt32(fail, 1) } else { @@ -333,7 +335,7 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool { ar.relays[pi.ID] = rsvp // protect the connection - ar.host.ConnManager().Protect(pi.ID, "autorelay") + ar.host.ConnManager().Protect(pi.ID, autorelayTag) return true } From 84fc5edbb2f68e46a1d394fd75f4b2a0e9d98bbe Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 14:59:07 +0300 Subject: [PATCH 11/16] dont sleep to wait for identify, use IdentifyWait --- p2p/host/autorelay/autorelay.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 938c51b7b7..4b68985d84 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -359,9 +359,28 @@ func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool { return false } - // wait for identify to complete so that we can check the supported protocols - // TODO we should do this without a delay/sleep. - time.Sleep(time.Second) + // wait for identify to complete in at least one conn so that we can check the supported protocols + conns := ar.host.Network().ConnsToPeer(pi.ID) + if len(conns) == 0 { + return false + } + + ready := make(chan struct{}, len(conns)) + for _, conn := range conns { + go func(conn network.Conn) { + select { + case <-ar.host.IDService().IdentifyWait(conn): + ready <- struct{}{} + case <-ctx.Done(): + } + }(conn) + } + + select { + case <-ready: + case <-ctx.Done(): + return false + } return true } From 1e7cfb5aa7cc62284c7718ffd484d9e5001b8f24 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 15:00:36 +0300 Subject: [PATCH 12/16] make relay protocol ids package-level constants --- p2p/host/autorelay/autorelay.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 4b68985d84..e2f27bc955 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -30,6 +30,9 @@ const ( rsvpExpirationSlack = 2 * time.Minute autorelayTag = "autorelay" + + protoIDv1 = string(relayv1.ProtoID) + protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop) ) var ( @@ -279,8 +282,6 @@ func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) bool { return false } - protoIDv1 := string(relayv1.ProtoID) - protoIDv2 := string(circuitv2_proto.ProtoIDv2Hop) protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) if err != nil { log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err) From 007d1fa5e9d9f6adab09e4300a65ee714b7e461d Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 15:36:54 +0300 Subject: [PATCH 13/16] add comment about v1 relays not having reservations --- p2p/host/autorelay/autorelay.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index e2f27bc955..adbea806d1 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -175,6 +175,7 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo for p, rsvp := range ar.relays { if rsvp == nil { + // this is a circuitv1 relay, there is no reservation continue } From 8ae836418d2b08624e0e70d373d36ae45dabc6ef Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 15:41:27 +0300 Subject: [PATCH 14/16] use errgrp instead of WaitGroup with atomic int --- p2p/host/autorelay/autorelay.go | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index adbea806d1..3bd11d2c24 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -5,9 +5,10 @@ import ( "fmt" "math/rand" "sync" - "sync/atomic" "time" + "golang.org/x/sync/errgroup" + "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" @@ -167,12 +168,13 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo return true } - // find reservations about to expire and refresh them in parallel - var ( - wg sync.WaitGroup - fail int32 - ) + if len(ar.relays) == 0 { + ar.mx.Unlock() + return false + } + // find reservations about to expire and refresh them in parallel + g := new(errgroup.Group) for p, rsvp := range ar.relays { if rsvp == nil { // this is a circuitv1 relay, there is no reservation @@ -183,18 +185,17 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo continue } - wg.Add(1) - go ar.refreshRelayReservation(ctx, p, &wg, &fail) + g.Go(func() error { + return ar.refreshRelayReservation(ctx, p) + }) } ar.mx.Unlock() - wg.Wait() - return fail > 0 + err := g.Wait() + return err != nil } -func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg *sync.WaitGroup, fail *int32) { - defer wg.Done() - +func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) error { rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) ar.mx.Lock() @@ -206,12 +207,12 @@ func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID, wg delete(ar.relays, p) // unprotect the connection ar.host.ConnManager().Unprotect(p, autorelayTag) - // increment fail counter - atomic.AddInt32(fail, 1) } else { log.Debugf("refreshed relay slot reservation with %s", p) ar.relays[p] = rsvp } + + return err } func (ar *AutoRelay) findRelays(ctx context.Context) bool { From 83178c4710922ea369d92842d3c138237d8949a3 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 15:44:18 +0300 Subject: [PATCH 15/16] fix variable capture bug --- p2p/host/autorelay/autorelay.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 3bd11d2c24..db0a676546 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -185,6 +185,7 @@ func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) boo continue } + p := p g.Go(func() error { return ar.refreshRelayReservation(ctx, p) }) From ba1444f5983dfd88b7eb7c0871106cf224263772 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 24 Sep 2021 15:46:18 +0300 Subject: [PATCH 16/16] go get x/sync --- go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/go.mod b/go.mod index a64fe2ce70..007eb6ddc2 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( go.uber.org/zap v1.19.0 // indirect golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e // indirect golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/grpc v1.40.0 // indirect