Skip to content

Commit

Permalink
bug fix: remove inflights of banned peers (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
lla-dane authored Jul 30, 2024
1 parent e9ab6d9 commit ecc3cbc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ where
if peer == self.1.sync_peer {
self.1.state = ChainSelectorState::CreatingConnections;
}
self.handle_disconnection(peer, idx)?;
self.handle_disconnection(peer, idx).await?;
}

PeerMessages::Addr(addresses) => {
Expand Down
79 changes: 78 additions & 1 deletion crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,30 @@ where
initial_height: peer.height,
})
}
pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
pub(crate) async fn handle_disconnection(
&mut self,
peer: u32,
idx: usize,
) -> Result<(), WireError> {
if let Some(p) = self.peers.remove(&peer) {
p.channel.close();
if !p.feeler && p.state == PeerStatus::Ready {
info!("Peer disconnected: {}", peer);
}
}

let inflight = self
.inflight
.clone()
.into_iter()
.filter(|(_k, v)| v.0 == peer)
.collect::<Vec<_>>();

for req in inflight {
self.inflight.remove(&req.0);
self.redo_inflight_request(req.0.clone()).await?;
}

self.peer_ids.retain(|&id| id != peer);
for (_, v) in self.peer_by_service.iter_mut() {
v.retain(|&id| id != peer);
Expand All @@ -265,6 +281,67 @@ where
);
Ok(())
}

pub(crate) async fn redo_inflight_request(
&mut self,
req: InflightRequests,
) -> Result<(), WireError> {
match req {
InflightRequests::Blocks(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], true)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight
.insert(InflightRequests::Blocks(block), (peer, Instant::now()));
}
InflightRequests::Headers => {
let peer = self
.send_to_random_peer(NodeRequest::GetHeaders(vec![]), ServiceFlags::UTREEXO)
.await?;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
}
InflightRequests::UtreexoState(_) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetUtreexoState((self.chain.get_block_hash(0).unwrap(), 0)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight
.insert(InflightRequests::UtreexoState(peer), (peer, Instant::now()));
}
InflightRequests::RescanBlock(block) => {
let peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block], false)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight
.insert(InflightRequests::RescanBlock(block), (peer, Instant::now()));
}
InflightRequests::GetFilters => {
let peer = self
.send_to_random_peer(
NodeRequest::GetFilter((self.chain.get_block_hash(0).unwrap(), 0)),
ServiceFlags::COMPACT_FILTERS,
)
.await?;
self.inflight
.insert(InflightRequests::GetFilters, (peer, Instant::now()));
}
InflightRequests::Connect(_) | InflightRequests::UserRequest(_) => {
// WE DON'T NEED TO DO ANYTHING HERE
}
}

Ok(())
}

pub(crate) async fn handle_peer_ready(
&mut self,
peer: u32,
Expand Down
2 changes: 1 addition & 1 deletion crates/floresta-wire/src/p2p_wire/running_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ where
self.handle_peer_ready(peer, &version).await?;
}
PeerMessages::Disconnected(idx) => {
self.handle_disconnection(peer, idx)?;
self.handle_disconnection(peer, idx).await?;
}
PeerMessages::Addr(addresses) => {
debug!("Got {} addresses from peer {}", addresses.len(), peer);
Expand Down
20 changes: 12 additions & 8 deletions crates/floresta-wire/src/p2p_wire/sync_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,16 @@ where
if block.udata.is_none() {
error!("Block without proof received from peer {}", peer);
self.send_to_peer(peer, NodeRequest::Shutdown).await?;
self.send_to_random_peer(
NodeRequest::GetBlock((vec![block.block.block_hash()], true)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight
.insert(InflightRequests::Blocks(next_block), (peer, Instant::now()));
let next_peer = self
.send_to_random_peer(
NodeRequest::GetBlock((vec![block.block.block_hash()], true)),
ServiceFlags::UTREEXO,
)
.await?;
self.inflight.insert(
InflightRequests::Blocks(next_block),
(next_peer, Instant::now()),
);
return Err(WireError::PeerMisbehaving);
}

Expand Down Expand Up @@ -252,7 +255,8 @@ where
try_and_log!(self.handle_peer_ready(peer, &version).await);
}
PeerMessages::Disconnected(idx) => {
try_and_log!(self.handle_disconnection(peer, idx));
try_and_log!(self.handle_disconnection(peer, idx).await);

if !self.has_utreexo_peers() {
warn!("No utreexo peers connected, trying to create a new one");
try_and_log!(self.maybe_open_connection().await);
Expand Down

0 comments on commit ecc3cbc

Please sign in to comment.