diff --git a/crates/floresta-wire/src/p2p_wire/chain_selector.rs b/crates/floresta-wire/src/p2p_wire/chain_selector.rs index e5be6527..bfe8d1ab 100644 --- a/crates/floresta-wire/src/p2p_wire/chain_selector.rs +++ b/crates/floresta-wire/src/p2p_wire/chain_selector.rs @@ -753,7 +753,7 @@ where if peer == self.1.sync_peer { self.1.state = ChainSelectorState::CreatingConnections; } - self.handle_disconnection(peer, idx)?; + self.handle_disconnection(peer, idx).await?; } PeerMessages::Addr(addresses) => { diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 06327f4d..ce5506c3 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -241,7 +241,11 @@ where initial_height: peer.height, }) } - pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> { + pub(crate) async fn handle_disconnection( + &mut self, + peer: u32, + idx: usize, + ) -> Result<(), WireError> { if let Some(p) = self.peers.remove(&peer) { p.channel.close(); if !p.feeler && p.state == PeerStatus::Ready { @@ -249,6 +253,18 @@ where } } + let inflight = self + .inflight + .clone() + .into_iter() + .filter(|(_k, v)| v.0 == peer) + .collect::>(); + + for req in inflight { + self.inflight.remove(&req.0); + self.redo_inflight_request(req.0.clone()).await?; + } + self.peer_ids.retain(|&id| id != peer); for (_, v) in self.peer_by_service.iter_mut() { v.retain(|&id| id != peer); @@ -265,6 +281,67 @@ where ); Ok(()) } + + pub(crate) async fn redo_inflight_request( + &mut self, + req: InflightRequests, + ) -> Result<(), WireError> { + match req { + InflightRequests::Blocks(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], true)), + ServiceFlags::UTREEXO, + ) + .await?; + self.inflight + .insert(InflightRequests::Blocks(block), (peer, Instant::now())); + } + InflightRequests::Headers => { + let peer = self + .send_to_random_peer(NodeRequest::GetHeaders(vec![]), ServiceFlags::UTREEXO) + .await?; + self.inflight + .insert(InflightRequests::Headers, (peer, Instant::now())); + } + InflightRequests::UtreexoState(_) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetUtreexoState((self.chain.get_block_hash(0).unwrap(), 0)), + ServiceFlags::UTREEXO, + ) + .await?; + self.inflight + .insert(InflightRequests::UtreexoState(peer), (peer, Instant::now())); + } + InflightRequests::RescanBlock(block) => { + let peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block], false)), + ServiceFlags::UTREEXO, + ) + .await?; + self.inflight + .insert(InflightRequests::RescanBlock(block), (peer, Instant::now())); + } + InflightRequests::GetFilters => { + let peer = self + .send_to_random_peer( + NodeRequest::GetFilter((self.chain.get_block_hash(0).unwrap(), 0)), + ServiceFlags::COMPACT_FILTERS, + ) + .await?; + self.inflight + .insert(InflightRequests::GetFilters, (peer, Instant::now())); + } + InflightRequests::Connect(_) | InflightRequests::UserRequest(_) => { + // WE DON'T NEED TO DO ANYTHING HERE + } + } + + Ok(()) + } + pub(crate) async fn handle_peer_ready( &mut self, peer: u32, diff --git a/crates/floresta-wire/src/p2p_wire/running_node.rs b/crates/floresta-wire/src/p2p_wire/running_node.rs index a21d8a4d..d72be541 100644 --- a/crates/floresta-wire/src/p2p_wire/running_node.rs +++ b/crates/floresta-wire/src/p2p_wire/running_node.rs @@ -800,7 +800,7 @@ where self.handle_peer_ready(peer, &version).await?; } PeerMessages::Disconnected(idx) => { - self.handle_disconnection(peer, idx)?; + self.handle_disconnection(peer, idx).await?; } PeerMessages::Addr(addresses) => { debug!("Got {} addresses from peer {}", addresses.len(), peer); diff --git a/crates/floresta-wire/src/p2p_wire/sync_node.rs b/crates/floresta-wire/src/p2p_wire/sync_node.rs index ad82f3f9..889339d8 100644 --- a/crates/floresta-wire/src/p2p_wire/sync_node.rs +++ b/crates/floresta-wire/src/p2p_wire/sync_node.rs @@ -158,13 +158,16 @@ where if block.udata.is_none() { error!("Block without proof received from peer {}", peer); self.send_to_peer(peer, NodeRequest::Shutdown).await?; - self.send_to_random_peer( - NodeRequest::GetBlock((vec![block.block.block_hash()], true)), - ServiceFlags::UTREEXO, - ) - .await?; - self.inflight - .insert(InflightRequests::Blocks(next_block), (peer, Instant::now())); + let next_peer = self + .send_to_random_peer( + NodeRequest::GetBlock((vec![block.block.block_hash()], true)), + ServiceFlags::UTREEXO, + ) + .await?; + self.inflight.insert( + InflightRequests::Blocks(next_block), + (next_peer, Instant::now()), + ); return Err(WireError::PeerMisbehaving); } @@ -252,7 +255,8 @@ where try_and_log!(self.handle_peer_ready(peer, &version).await); } PeerMessages::Disconnected(idx) => { - try_and_log!(self.handle_disconnection(peer, idx)); + try_and_log!(self.handle_disconnection(peer, idx).await); + if !self.has_utreexo_peers() { warn!("No utreexo peers connected, trying to create a new one"); try_and_log!(self.maybe_open_connection().await);