Skip to content

Commit

Permalink
Request more p2p addrs after peer checks
Browse files Browse the repository at this point in the history
If the height or required services is deemed insufficient after the connection
handshake, do not request more addresses right away, waiting for its response
before killing the TCP connection.  This stops ConnectOutbound from returning
early with error while the TCP connection is still active, and causing the SPV
syncer to begin connecting to more remote peers.  While the SPV syncer remains
limited to 8 total outbound managed peers, the total count of TCP connections
can easily exceed this, and has been observed to max out the circuit limit on
Tor proxies.

Although this appears to move the address requesting to the foreground of
ConnectOutbound, it only writes the getaddr message.  addr message replies are
handled internally by the RemotePeer.
  • Loading branch information
jrick committed Jan 23, 2024
1 parent 3f77f1b commit ab6da24
Showing 1 changed file with 13 additions and 24 deletions.
37 changes: 13 additions & 24 deletions p2p/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,29 +256,15 @@ func (lp *LocalPeer) ConnectOutbound(ctx context.Context, addr string, reqSvcs w

go lp.serveUntilError(ctx, rp)

var waitForAddrs <-chan time.Time
if lp.amgr.NeedMoreAddresses() {
waitForAddrs = time.After(stallTimeout)
err = rp.Addrs(ctx)
if err != nil {
op := errors.Opf(opf, rp.raddr)
return nil, errors.E(op, err)
}
}

// Disconnect from the peer if it does not specify all required services.
if rp.services&reqSvcs != reqSvcs {
op := errors.Opf(opf, rp.raddr)
reason := errors.Errorf("missing required service flags %v", reqSvcs&^rp.services)
reject := wire.NewMsgReject(wire.CmdVersion, wire.RejectNonstandard, reason.Error())
rp.sendMessageAck(ctx, reject)

err := errors.E(op, reason)
go func() {
if waitForAddrs != nil {
<-waitForAddrs
}
reject := wire.NewMsgReject(wire.CmdVersion, wire.RejectNonstandard, reason.Error())
rp.sendMessageAck(ctx, reject)
rp.Disconnect(err)
}()
rp.Disconnect(err)
return nil, err
}

Expand All @@ -288,18 +274,21 @@ func (lp *LocalPeer) ConnectOutbound(ctx context.Context, addr string, reqSvcs w
if rp.initHeight < reqHeight {
op := errors.Opf(opf, rp.raddr)
err := errors.E(op, "peer is not synced")
go func() {
if waitForAddrs != nil {
<-waitForAddrs
}
rp.Disconnect(err)
}()
rp.Disconnect(err)
return nil, err
}

// Mark this as a good address.
lp.amgr.Good(na)

if lp.amgr.NeedMoreAddresses() {
err = rp.Addrs(ctx)
if err != nil {
rp.Disconnect(err)
return nil, err
}
}

return rp, nil
}

Expand Down

0 comments on commit ab6da24

Please sign in to comment.