From 64be725566b9351ffd7887ca23c813d2b9db8d6a Mon Sep 17 00:00:00 2001 From: Sukun Date: Fri, 12 May 2023 10:42:58 +0530 Subject: [PATCH] swarm: change maps with multiaddress keys to use strings (#2284) * swarm: change maps with multiaddress keys to use strings * fix test * fix more flakiness --- p2p/host/routed/routed.go | 6 +-- p2p/net/swarm/dial_worker.go | 28 ++++++------- p2p/net/swarm/dial_worker_test.go | 66 +++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 17 deletions(-) diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index c4601a50dd..eb8e58ee7f 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -119,16 +119,16 @@ func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { } // Build lookup map - lookup := make(map[ma.Multiaddr]struct{}, len(addrs)) + lookup := make(map[string]struct{}, len(addrs)) for _, addr := range addrs { - lookup[addr] = struct{}{} + lookup[string(addr.Bytes())] = struct{}{} } // if there's any address that's not in the previous set // of addresses, try to connect again. If all addresses // where known previously we return the original error. for _, newAddr := range newAddrs { - if _, found := lookup[newAddr]; found { + if _, found := lookup[string(newAddr.Bytes())]; found { continue } diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index f805371cc6..ba7ba87d4b 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -27,9 +27,9 @@ type dialResponse struct { } type pendRequest struct { - req dialRequest // the original request - err *DialError // dial error accumulator - addrs map[ma.Multiaddr]struct{} // pending addr dials + req dialRequest // the original request + err *DialError // dial error accumulator + addrs map[string]struct{} // pending address to dial. The key is a multiaddr } type addrDial struct { @@ -46,7 +46,7 @@ type dialWorker struct { reqch <-chan dialRequest reqno int requests map[int]*pendRequest - pending map[ma.Multiaddr]*addrDial + pending map[string]*addrDial // pending addresses to dial. The key is a multiaddr resch chan dialResult connected bool // true when a connection has been successfully established @@ -66,7 +66,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest) *dialWorker { peer: p, reqch: reqch, requests: make(map[int]*pendRequest), - pending: make(map[ma.Multiaddr]*addrDial), + pending: make(map[string]*addrDial), resch: make(chan dialResult), } } @@ -108,10 +108,10 @@ loop: pr := &pendRequest{ req: req, err: &DialError{Peer: w.peer}, - addrs: make(map[ma.Multiaddr]struct{}), + addrs: make(map[string]struct{}), } for _, a := range addrs { - pr.addrs[a] = struct{}{} + pr.addrs[string(a.Bytes())] = struct{}{} } // check if any of the addrs has been successfully dialed and accumulate @@ -120,7 +120,7 @@ loop: var tojoin []*addrDial for _, a := range addrs { - ad, ok := w.pending[a] + ad, ok := w.pending[string(a.Bytes())] if !ok { todial = append(todial, a) continue @@ -135,7 +135,7 @@ loop: if ad.err != nil { // dial to this addr errored, accumulate the error pr.err.recordErr(a, ad.err) - delete(pr.addrs, a) + delete(pr.addrs, string(a.Bytes())) continue } @@ -164,7 +164,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - w.pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}} + w.pending[string(a.Bytes())] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}} } w.nextDial = append(w.nextDial, todial...) @@ -177,7 +177,7 @@ loop: case <-w.triggerDial: for _, addr := range w.nextDial { // spawn the dial - ad := w.pending[addr] + ad := w.pending[string(addr.Bytes())] err := w.s.dialNextAddr(ad.ctx, w.peer, addr, w.resch) if err != nil { w.dispatchError(ad, err) @@ -192,7 +192,7 @@ loop: w.connected = true } - ad := w.pending[res.Addr] + ad := w.pending[string(res.Addr.Bytes())] if res.Conn != nil { // we got a connection, add it to the swarm @@ -247,7 +247,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) { // accumulate the error pr.err.recordErr(ad.addr, err) - delete(pr.addrs, ad.addr) + delete(pr.addrs, string(ad.addr.Bytes())) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from @@ -271,7 +271,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) { // it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff // regresses without this. if err == ErrDialBackoff { - delete(w.pending, ad.addr) + delete(w.pending, string(ad.addr.Bytes())) } } diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index 2c441106b1..ebdaedb245 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -24,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) @@ -342,3 +343,68 @@ func TestDialWorkerLoopConcurrentFailureStress(t *testing.T) { close(reqch) worker.wg.Wait() } + +func TestDialWorkerLoopAddrDedup(t *testing.T) { + s1 := makeSwarm(t) + s2 := makeSwarm(t) + defer s1.Close() + defer s2.Close() + t1 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000)) + t2 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000)) + + // acceptAndClose accepts a connection and closes it + acceptAndClose := func(a ma.Multiaddr, ch chan struct{}, closech chan struct{}) { + list, err := manet.Listen(a) + if err != nil { + t.Error(err) + return + } + go func() { + ch <- struct{}{} + for { + conn, err := list.Accept() + if err != nil { + return + } + ch <- struct{}{} + conn.Close() + } + }() + <-closech + list.Close() + } + ch := make(chan struct{}, 1) + closeCh := make(chan struct{}) + go acceptAndClose(t1, ch, closeCh) + defer close(closeCh) + <-ch // the routine has started listening on addr + + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t1}, peerstore.PermanentAddrTTL) + + reqch := make(chan dialRequest) + resch := make(chan dialResponse, 2) + + worker := newDialWorker(s1, s2.LocalPeer(), reqch) + go worker.loop() + defer worker.wg.Wait() + defer close(reqch) + + reqch <- dialRequest{ctx: context.Background(), resch: resch} + <-ch + <-resch + // Need to clear backoff otherwise the dial attempt would not be made + s1.Backoff().Clear(s2.LocalPeer()) + + s1.Peerstore().ClearAddrs(s2.LocalPeer()) + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t2}, peerstore.PermanentAddrTTL) + + reqch <- dialRequest{ctx: context.Background(), resch: resch} + select { + case r := <-resch: + require.Error(t, r.err) + case <-ch: + t.Errorf("didn't expect a connection attempt") + case <-time.After(5 * time.Second): + t.Errorf("expected a fail response") + } +}