Skip to content
This repository was archived by the owner on Sep 8, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions p2p/kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ async def wait_pong(self, remote: Node, token: bytes, cancel_token: CancelToken)
del self.pong_callbacks[pingid]
return got_pong

async def wait_neighbours(self, remote: Node, cancel_token: CancelToken) -> List[Node]:
async def wait_neighbours(self, remote: Node, cancel_token: CancelToken) -> Tuple[Node, ...]:
"""Wait for a neihgbours packet from the given node.

Returns the list of neighbours received.
Expand All @@ -476,10 +476,9 @@ def process(response):
self.logger.debug('got expected neighbours response from %s', remote)
except TimeoutError:
self.logger.debug('timed out waiting for neighbours response from %s', remote)

# TODO: Use a contextmanager to ensure we always delete the callback from the list.
del self.neighbours_callbacks[remote]
return [n for n in neighbours if n != self.this_node]
finally:
del self.neighbours_callbacks[remote]
return tuple(n for n in neighbours if n != self.this_node)

def ping(self, node: Node) -> bytes:
if node == self.this_node:
Expand Down Expand Up @@ -540,12 +539,12 @@ async def _find_node(node_id, remote):
if not candidates:
self.logger.debug("got no candidates from %s, returning", remote)
return candidates
candidates = [c for c in candidates if c not in nodes_seen]
candidates = tuple(c for c in candidates if c not in nodes_seen)
self.logger.debug("got %s new candidates", len(candidates))
# Add new candidates to nodes_seen so that we don't attempt to bond with failing ones
# in the future.
nodes_seen.update(candidates)
bonded = await asyncio.gather(*[self.bond(c, cancel_token) for c in candidates])
bonded = await asyncio.gather(*(self.bond(c, cancel_token) for c in candidates))
self.logger.debug("bonded with %s candidates", bonded.count(True))
return [c for c in candidates if bonded[candidates.index(c)]]

Expand All @@ -559,8 +558,12 @@ def _exclude_if_asked(nodes):
while nodes_to_ask:
self.logger.debug("node lookup; querying %s", nodes_to_ask)
nodes_asked.update(nodes_to_ask)
results = await asyncio.gather(
*[_find_node(node_id, n) for n in nodes_to_ask])
results = await asyncio.gather(*(
_find_node(node_id, n)
for n
in nodes_to_ask
if n not in self.neighbours_callbacks
))
for candidates in results:
closest.extend(candidates)
closest = sort_by_distance(closest, node_id)[:k_bucket_size]
Expand Down
4 changes: 2 additions & 2 deletions tests/p2p/test_kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def test_wait_neighbours(cancel_token):

# Schedule a call to proto.recv_neighbours() simulating a neighbours response from the node we
# expect.
neighbours = [random_node(), random_node(), random_node()]
neighbours = (random_node(), random_node(), random_node())
recv_neighbours_coroutine = asyncio.coroutine(lambda: proto.recv_neighbours(node, neighbours))
asyncio.ensure_future(recv_neighbours_coroutine())

Expand All @@ -125,7 +125,7 @@ async def test_wait_neighbours(cancel_token):
# If wait_neighbours() times out, we get an empty list of neighbours.
received_neighbours = await proto.wait_neighbours(node, cancel_token)

assert received_neighbours == []
assert received_neighbours == tuple()
assert node not in proto.neighbours_callbacks


Expand Down