Skip to content

Commit

Permalink
chore(px): close px streams after resp is sent (#1746)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored May 25, 2023
1 parent 35520bd commit 3c2d289
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
27 changes: 26 additions & 1 deletion tests/v2/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
{.used.}

import
std/[options, sequtils],
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
Expand All @@ -18,6 +20,7 @@ import
../../waku/v2/waku_peer_exchange,
../../waku/v2/waku_peer_exchange/rpc,
../../waku/v2/waku_peer_exchange/rpc_codec,
../../waku/v2/waku_peer_exchange/protocol,
./testlib/wakucore,
./testlib/wakunode

Expand Down Expand Up @@ -259,3 +262,25 @@ procSuite "Waku Peer Exchange":

# Check that it failed gracefully
check: response.isErr


asyncTest "connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))

await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))

# Multiple nodes request to node 0
for i in 1..<3:
let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo())
require resp.isOk

# Wait for streams to be closed
await sleepAsync(1.seconds)

# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
12 changes: 11 additions & 1 deletion waku/v2/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,19 @@ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future
request: PeerExchangeRequest(numPeers: numPeers))

var buffer: seq[byte]
var error: string
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err("write/read failed: " & $exc.msg)
error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()

if error.len > 0:
return err("write/read failed: " & error)

let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
Expand Down Expand Up @@ -155,6 +162,9 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
else:
waku_px_peers_sent.inc(enrs.len().int64())

# close, no data is expected
await conn.closeWithEof()

wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec

Expand Down

0 comments on commit 3c2d289

Please sign in to comment.