From 84c2ebf58c12584aa52a5b92d0012b8a7c8eef91 Mon Sep 17 00:00:00 2001 From: Elvis Date: Tue, 24 Jan 2023 08:20:15 +0100 Subject: [PATCH] request response for get transactions & sealed header from p2p network (#919) closes #878 #877 The work has been done on 2 (now merged) branches, by @freesig and myself. Co-authored-by: green Co-authored-by: Brandon Kite Co-authored-by: Tom Co-authored-by: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> --- Cargo.lock | 1 + crates/fuel-core/src/database/sealed_block.rs | 49 ++++- crates/fuel-core/src/service/adapters/p2p.rs | 30 ++- crates/services/p2p/Cargo.toml | 1 + crates/services/p2p/src/behavior.rs | 16 +- crates/services/p2p/src/codecs.rs | 22 +- crates/services/p2p/src/codecs/bincode.rs | 88 ++++++-- crates/services/p2p/src/p2p_service.rs | 195 +++++++++++++----- crates/services/p2p/src/ports.rs | 27 ++- .../p2p/src/request_response/messages.rs | 58 ++++-- crates/services/p2p/src/service.rs | 151 +++++++++++--- crates/types/src/blockchain/primitives.rs | 12 ++ 12 files changed, 501 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f691f4cdd63..012e6bb1bab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2488,6 +2488,7 @@ dependencies = [ "prometheus-client", "rand 0.8.5", "serde", + "serde_with", "sha2 0.10.6", "tokio", "tracing", diff --git a/crates/fuel-core/src/database/sealed_block.rs b/crates/fuel-core/src/database/sealed_block.rs index eb98ce29d8d..c76ff48d239 100644 --- a/crates/fuel-core/src/database/sealed_block.rs +++ b/crates/fuel-core/src/database/sealed_block.rs @@ -14,17 +14,21 @@ use fuel_core_storage::{ StorageInspect, StorageMutate, }; -use fuel_core_types::blockchain::{ - consensus::{ - Consensus, - Genesis, +use fuel_core_types::{ + blockchain::{ + consensus::{ + Consensus, + Genesis, + Sealed, + }, + primitives::{ + BlockHeight, + BlockId, + }, + SealedBlock, + SealedBlockHeader, }, - primitives::{ - BlockHeight, - BlockId, - }, - SealedBlock, - SealedBlockHeader, + fuel_tx::Transaction, }; use std::borrow::Cow; @@ -86,7 +90,10 @@ impl Database { &self, height: &BlockHeight, ) -> StorageResult> { - let block_id = self.get_block_id(height)?.ok_or(not_found!("BlockId"))?; + let block_id = match self.get_block_id(height)? { + Some(i) => i, + None => return Ok(None), + }; self.get_sealed_block_by_id(&block_id) } @@ -104,6 +111,17 @@ impl Database { } } + pub fn get_sealed_block_header_by_height( + &self, + height: &BlockHeight, + ) -> StorageResult> { + let block_id = match self.get_block_id(height)? { + Some(i) => i, + None => return Ok(None), + }; + self.get_sealed_block_header(&block_id) + } + pub fn get_sealed_block_header( &self, block_id: &BlockId, @@ -122,4 +140,13 @@ impl Database { Ok(None) } } + + pub fn get_transactions_on_block( + &self, + block_id: &BlockId, + ) -> StorageResult>> { + Ok(self + .get_sealed_block_by_id(block_id)? + .map(|Sealed { entity: block, .. }| block.into_inner().1)) + } } diff --git a/crates/fuel-core/src/service/adapters/p2p.rs b/crates/fuel-core/src/service/adapters/p2p.rs index 55296990e6c..62949d166c9 100644 --- a/crates/fuel-core/src/service/adapters/p2p.rs +++ b/crates/fuel-core/src/service/adapters/p2p.rs @@ -6,19 +6,39 @@ use fuel_core_p2p::ports::{ }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; -use fuel_core_types::blockchain::{ - primitives::BlockHeight, - SealedBlock, +use fuel_core_types::{ + blockchain::{ + primitives::{ + BlockHeight, + BlockId, + }, + SealedBlock, + SealedBlockHeader, + }, + fuel_tx::Transaction, }; -#[async_trait::async_trait] impl P2pDb for Database { - async fn get_sealed_block( + fn get_sealed_block( &self, height: &BlockHeight, ) -> StorageResult> { self.get_sealed_block_by_height(height) } + + fn get_sealed_header( + &self, + height: &BlockHeight, + ) -> StorageResult> { + self.get_sealed_block_header_by_height(height) + } + + fn get_transactions( + &self, + block_id: &BlockId, + ) -> StorageResult>> { + self.get_transactions_on_block(block_id) + } } impl BlockHeightImporter for BlockImporterAdapter { diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 6a71c556759..5282fbebb05 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -56,6 +56,7 @@ libp2p-yamux = "=0.42.0" prometheus-client = "0.18" rand = "0.8" serde = { version = "1.0", features = ["derive"] } +serde_with = "1.11" sha2 = "0.10" tokio = { version = "1.21", features = ["sync"] } tracing = "0.1" diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 99182a1dfe4..13cb414ae6b 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -22,7 +22,7 @@ use crate::{ PeerManagerBehaviour, }, request_response::messages::{ - IntermediateResponse, + NetworkResponse, RequestMessage, }, }; @@ -56,7 +56,7 @@ pub enum FuelBehaviourEvent { Discovery(DiscoveryEvent), PeerInfo(PeerInfoEvent), Gossipsub(GossipsubEvent), - RequestResponse(RequestResponseEvent), + RequestResponse(RequestResponseEvent), } /// Handles all p2p protocols needed for Fuel. @@ -180,9 +180,9 @@ impl FuelBehaviour { pub fn send_response_msg( &mut self, - channel: ResponseChannel, - message: IntermediateResponse, - ) -> Result<(), IntermediateResponse> { + channel: ResponseChannel, + message: NetworkResponse, + ) -> Result<(), NetworkResponse> { self.request_response.send_response(channel, message) } @@ -228,10 +228,8 @@ impl From for FuelBehaviourEvent { } } -impl From> - for FuelBehaviourEvent -{ - fn from(event: RequestResponseEvent) -> Self { +impl From> for FuelBehaviourEvent { + fn from(event: RequestResponseEvent) -> Self { FuelBehaviourEvent::RequestResponse(event) } } diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index c712779b258..05f146baca5 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -7,7 +7,7 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - IntermediateResponse, + NetworkResponse, OutboundResponse, RequestMessage, ResponseMessage, @@ -31,22 +31,22 @@ pub trait GossipsubCodec { } pub trait RequestResponseConverter { - /// Response that is ready to be converted into IntermediateResponse + /// Response that is ready to be converted into NetworkResponse type OutboundResponse; /// Response that is sent over the network - type IntermediateResponse; + type NetworkResponse; /// Final Response Message deserialized from IntermediateResponse type ResponseMessage; - fn convert_to_response( + fn convert_to_network_response( &self, - inter_msg: &Self::IntermediateResponse, - ) -> Result; + res_msg: &Self::OutboundResponse, + ) -> Result; - fn convert_to_intermediate( + fn convert_to_response( &self, - res_msg: &Self::OutboundResponse, - ) -> Result; + inter_msg: &Self::NetworkResponse, + ) -> Result; } /// Main Codec trait @@ -55,9 +55,9 @@ pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + RequestResponseCodec + > + RequestResponseCodec + RequestResponseConverter< - IntermediateResponse = IntermediateResponse, + NetworkResponse = NetworkResponse, OutboundResponse = OutboundResponse, ResponseMessage = ResponseMessage, > + Clone diff --git a/crates/services/p2p/src/codecs/bincode.rs b/crates/services/p2p/src/codecs/bincode.rs index 6e80a93ae37..e1bc0f86e1b 100644 --- a/crates/services/p2p/src/codecs/bincode.rs +++ b/crates/services/p2p/src/codecs/bincode.rs @@ -10,7 +10,7 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ - IntermediateResponse, + NetworkResponse, OutboundResponse, RequestMessage, ResponseMessage, @@ -81,7 +81,7 @@ impl BincodeCodec { impl RequestResponseCodec for BincodeCodec { type Protocol = MessageExchangeBincodeProtocol; type Request = RequestMessage; - type Response = IntermediateResponse; + type Response = NetworkResponse; async fn read_request( &mut self, @@ -186,29 +186,77 @@ impl GossipsubCodec for BincodeCodec { } impl RequestResponseConverter for BincodeCodec { - type IntermediateResponse = IntermediateResponse; + type NetworkResponse = NetworkResponse; type OutboundResponse = OutboundResponse; type ResponseMessage = ResponseMessage; fn convert_to_response( &self, - inter_msg: &Self::IntermediateResponse, + inter_msg: &Self::NetworkResponse, ) -> Result { match inter_msg { - IntermediateResponse::ResponseBlock(block_bytes) => Ok( - ResponseMessage::ResponseBlock(self.deserialize(block_bytes)?), - ), + NetworkResponse::Block(block_bytes) => { + let response = if let Some(block_bytes) = block_bytes { + Some(self.deserialize(block_bytes)?) + } else { + None + }; + + Ok(ResponseMessage::SealedBlock(response)) + } + NetworkResponse::Header(header_bytes) => { + let response = if let Some(header_bytes) = header_bytes { + Some(self.deserialize(header_bytes)?) + } else { + None + }; + + Ok(ResponseMessage::SealedHeader(response)) + } + NetworkResponse::Transactions(tx_bytes) => { + let response = if let Some(tx_bytes) = tx_bytes { + Some(self.deserialize(tx_bytes)?) + } else { + None + }; + + Ok(ResponseMessage::Transactions(response)) + } } } - fn convert_to_intermediate( + fn convert_to_network_response( &self, res_msg: &Self::OutboundResponse, - ) -> Result { + ) -> Result { match res_msg { - OutboundResponse::ResponseBlock(sealed_block) => Ok( - IntermediateResponse::ResponseBlock(self.serialize(&**sealed_block)?), - ), + OutboundResponse::Block(sealed_block) => { + let response = if let Some(sealed_block) = sealed_block { + Some(self.serialize(&**sealed_block)?) + } else { + None + }; + + Ok(NetworkResponse::Block(response)) + } + OutboundResponse::SealedHeader(sealed_header) => { + let response = if let Some(sealed_header) = sealed_header { + Some(self.serialize(&**sealed_header)?) + } else { + None + }; + + Ok(NetworkResponse::Header(response)) + } + OutboundResponse::Transactions(transactions) => { + let response = if let Some(transactions) = transactions { + Some(self.serialize(&**transactions)?) + } else { + None + }; + + Ok(NetworkResponse::Transactions(response)) + } } } } @@ -227,3 +275,19 @@ impl ProtocolName for MessageExchangeBincodeProtocol { REQUEST_RESPONSE_PROTOCOL_ID } } + +#[cfg(test)] +mod tests { + use fuel_core_types::blockchain::primitives::BlockId; + + use super::*; + + #[test] + fn test_request_size_matches() { + let m = RequestMessage::Transactions(BlockId::default()); + assert_eq!( + bincode::serialized_size(&m).unwrap() as usize, + MAX_REQUEST_SIZE + ); + } +} diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index e55a59d2d0e..bc406dce500 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -18,7 +18,7 @@ use crate::{ }, peer_manager::PeerInfoEvent, request_response::messages::{ - IntermediateResponse, + NetworkResponse, OutboundResponse, RequestError, RequestMessage, @@ -88,7 +88,7 @@ pub struct FuelP2PService { /// Holds the ResponseChannel(s) for the inbound requests from the p2p Network /// Once the Response is prepared by the NetworkOrchestrator /// It will send it to the specified Peer via its unique ResponseChannel - inbound_requests_table: HashMap>, + inbound_requests_table: HashMap>, /// NetworkCodec used as for encoding and decoding of Gossipsub messages network_codec: Codec, @@ -269,18 +269,10 @@ impl FuelP2PService { pub fn send_response_msg( &mut self, request_id: RequestId, - message: Option, + message: OutboundResponse, ) -> Result<(), ResponseError> { - // if the response message wasn't successfully prepared - // we still need to remove the `request_id` from `inbound_requests_table` - if message.is_none() { - self.inbound_requests_table.remove(&request_id); - return Ok(()) - } - match ( - self.network_codec - .convert_to_intermediate(&message.unwrap()), + self.network_codec.convert_to_network_response(&message), self.inbound_requests_table.remove(&request_id), ) { (Ok(message), Some(channel)) => { @@ -444,7 +436,7 @@ impl FuelP2PService { } }, FuelBehaviourEvent::RequestResponse(req_res_event) => match req_res_event { - RequestResponseEvent::Message { message, .. } => match message { + RequestResponseEvent::Message { peer, message } => match message { RequestResponseMessage::Request { request, channel, @@ -466,8 +458,8 @@ impl FuelP2PService { self.network_codec.convert_to_response(&response), ) { ( - Some(ResponseChannelItem::ResponseBlock(channel)), - Ok(ResponseMessage::ResponseBlock(block)), + Some(ResponseChannelItem::Block(channel)), + Ok(ResponseMessage::SealedBlock(block)), ) => { if channel.send(block).is_err() { debug!( @@ -476,6 +468,28 @@ impl FuelP2PService { ); } } + ( + Some(ResponseChannelItem::Transactions(channel)), + Ok(ResponseMessage::Transactions(transactions)), + ) => { + if channel.send(transactions).is_err() { + debug!( + "Failed to send through the channel for {:?}", + request_id + ); + } + } + ( + Some(ResponseChannelItem::SealedHeader(channel)), + Ok(ResponseMessage::SealedHeader(header)), + ) => { + if channel.send(header.map(|h| (peer, h))).is_err() { + debug!( + "Failed to send through the channel for {:?}", + request_id + ); + } + } (Some(_), Err(e)) => { debug!("Failed to convert IntermediateResponse into a ResponseMessage {:?} with {:?}", response, e); @@ -547,7 +561,9 @@ mod tests { ConsensusVote, }, header::PartialBlockHeader, + primitives::BlockId, SealedBlock, + SealedBlockHeader, }, fuel_tx::Transaction, }; @@ -840,6 +856,17 @@ mod tests { // it serves as our exit from the loop let mut bootstrapped_node = node_services.pop().unwrap(); + let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); + let jh = tokio::spawn(async move { + while rx.try_recv().is_err() { + futures::stream::iter(node_services.iter_mut()) + .for_each_concurrent(10, |node| async move { + node.next_event().await; + }) + .await; + } + }); + loop { tokio::select! { event_from_node_5 = node_5.next_event() => { @@ -868,13 +895,11 @@ mod tests { } tracing::info!("Event from the bootstrapped_node: {:?}", event_from_bootstrapped_node); }, - _ = async { - for node in &mut node_services { - node.next_event().await; - } - } => {} } } + + tx.send(()).unwrap(); + jh.await.unwrap() } // Simulate 2 Sets of Sentry nodes. @@ -1252,12 +1277,8 @@ mod tests { } } - #[tokio::test] - #[instrument] - async fn request_response_works() { - use fuel_core_types::fuel_tx::Transaction; - - let mut p2p_config = Config::default_initialized("request_response_works"); + async fn request_response_works_with(request_msg: RequestMessage) { + let mut p2p_config = Config::default_initialized("request_response_works_with"); // Node A let node_a_data = NodeData::random(); @@ -1275,7 +1296,7 @@ mod tests { tokio::select! { message_sent = rx_test_end.recv() => { // we received a signal to end the test - assert_eq!(message_sent, Some(true), "Received wrong block height!"); + assert_eq!(message_sent, Some(true), "Receuved incorrect or missing missing messsage"); break; } node_a_event = node_a.next_event() => { @@ -1285,25 +1306,57 @@ mod tests { if !peer_addresses.is_empty() && !request_sent { request_sent = true; - // 1. Simulating Oneshot channel from the NetworkOrchestrator - let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + match request_msg { + RequestMessage::Block(_) => { + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::Block(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); - let requested_block_height = RequestMessage::RequestBlock(0_u64.into()); - assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::ResponseBlock(tx_orchestrator)).is_ok()); + tokio::spawn(async move { + let response_message = rx_orchestrator.await; - let tx_test_end = tx_test_end.clone(); - tokio::spawn(async move { - // 4. Simulating NetworkOrchestrator receiving a message from Node B - let response_message = rx_orchestrator.await; - - if let Ok(sealed_block) = response_message { - let _ = tx_test_end.send(*sealed_block.entity.header().height() == 0_u64.into()).await; - } else { - tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); - panic!("Message not received successfully!") - } + if let Ok(Some(sealed_block)) = response_message { + let _ = tx_test_end.send(*sealed_block.entity.header().height() == 0_u64.into()).await; + } else { + tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); + let _ = tx_test_end.send(false).await; + } + }); - }); + } + RequestMessage::SealedHeader(_) => { + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::SealedHeader(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); + + tokio::spawn(async move { + let response_message = rx_orchestrator.await; + + if let Ok(Some(_)) = response_message { + let _ = tx_test_end.send(true).await; + } else { + tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); + let _ = tx_test_end.send(false).await; + } + }); + } + RequestMessage::Transactions(_) => { + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::Transactions(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); + + tokio::spawn(async move { + let response_message = rx_orchestrator.await; + + if let Ok(Some(transactions)) = response_message { + let _ = tx_test_end.send(transactions.len() == 5).await; + } else { + tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); + let _ = tx_test_end.send(false).await; + } + }); + } + } } } } @@ -1312,15 +1365,34 @@ mod tests { }, node_b_event = node_b.next_event() => { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator - if let Some(FuelP2PEvent::RequestMessage{ request_id, .. }) = node_b_event { - let block = Block::new(PartialBlockHeader::default(), vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], &[]); + if let Some(FuelP2PEvent::RequestMessage{ request_id, request_message: received_request_message }) = node_b_event { + match received_request_message { + RequestMessage::Block(_) => { + let block = Block::new(PartialBlockHeader::default(), vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], &[]); - let sealed_block = SealedBlock { - entity: block, - consensus: Consensus::PoA(PoAConsensus::new(Default::default())), - }; + let sealed_block = SealedBlock { + entity: block, + consensus: Consensus::PoA(PoAConsensus::new(Default::default())), + }; + + let _ = node_b.send_response_msg(request_id, OutboundResponse::Block(Some(Arc::new(sealed_block)))); + } + RequestMessage::SealedHeader(_) => { + let header = Default::default(); + + let sealed_header = SealedBlockHeader { + entity: header, + consensus: Consensus::PoA(PoAConsensus::new(Default::default())), + }; + + let _ = node_b.send_response_msg(request_id, OutboundResponse::SealedHeader(Some(Arc::new(sealed_header)))); + } + RequestMessage::Transactions(_) => { + let transactions = vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()]; + let _ = node_b.send_response_msg(request_id, OutboundResponse::Transactions(Some(Arc::new(transactions)))); + } + } - let _ = node_b.send_response_msg(request_id, Some(OutboundResponse::ResponseBlock(Arc::new(sealed_block)))); } tracing::info!("Node B Event: {:?}", node_b_event); @@ -1329,6 +1401,25 @@ mod tests { } } + #[tokio::test] + #[instrument] + async fn request_response_works_with_transactions() { + request_response_works_with(RequestMessage::Transactions(BlockId::default())) + .await + } + + #[tokio::test] + #[instrument] + async fn request_response_works_with_block() { + request_response_works_with(RequestMessage::Block(0_u64.into())).await + } + + #[tokio::test] + #[instrument] + async fn request_response_works_with_sealed_header() { + request_response_works_with(RequestMessage::SealedHeader(0_u64.into())).await + } + #[tokio::test] #[instrument] async fn req_res_outbound_timeout_works() { @@ -1367,8 +1458,8 @@ mod tests { assert_eq!(node_a.outbound_requests_table.len(), 0); // Request successfully sent - let requested_block_height = RequestMessage::RequestBlock(0_u64.into()); - assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::ResponseBlock(tx_orchestrator)).is_ok()); + let requested_block_height = RequestMessage::Block(0_u64.into()); + assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::Block(tx_orchestrator)).is_ok()); // 2b. there should be ONE pending outbound requests in the table assert_eq!(node_a.outbound_requests_table.len(), 1); diff --git a/crates/services/p2p/src/ports.rs b/crates/services/p2p/src/ports.rs index e4790ad5442..67a91788017 100644 --- a/crates/services/p2p/src/ports.rs +++ b/crates/services/p2p/src/ports.rs @@ -1,17 +1,32 @@ -use async_trait::async_trait; use fuel_core_services::stream::BoxStream; use fuel_core_storage::Result as StorageResult; -use fuel_core_types::blockchain::{ - primitives::BlockHeight, - SealedBlock, +use fuel_core_types::{ + blockchain::{ + primitives::{ + BlockHeight, + BlockId, + }, + SealedBlock, + SealedBlockHeader, + }, + fuel_tx::Transaction, }; -#[async_trait] pub trait P2pDb: Send + Sync { - async fn get_sealed_block( + fn get_sealed_block( &self, height: &BlockHeight, ) -> StorageResult>; + + fn get_sealed_header( + &self, + height: &BlockHeight, + ) -> StorageResult>; + + fn get_transactions( + &self, + block_id: &BlockId, + ) -> StorageResult>>; } pub trait BlockHeightImporter: Send + Sync { diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index fe802afe417..a920e8698fe 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -1,50 +1,84 @@ use std::sync::Arc; -use fuel_core_types::blockchain::{ - primitives::BlockHeight, - SealedBlock, +use fuel_core_types::{ + blockchain::{ + primitives::{ + BlockHeight, + BlockId, + }, + SealedBlock, + SealedBlockHeader, + }, + fuel_tx::Transaction, }; +use libp2p::PeerId; use serde::{ Deserialize, Serialize, }; +use serde_with::{ + serde_as, + FromInto, +}; use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &[u8] = b"/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message -/// Currently the only and the biggest message is RequestBlock(BlockHeight) -pub(crate) const MAX_REQUEST_SIZE: usize = 8; +pub(crate) const MAX_REQUEST_SIZE: usize = core::mem::size_of::(); + +pub type ChannelItem = oneshot::Sender>; + +// Peer receives a `RequestMessage`. +// It prepares a response in form of `OutboundResponse` +// This `OutboundResponse` gets prepared to be sent over the wire in `NetworkResponse` format. +// The Peer that requested the message receives the response over the wire in `NetworkResponse` format. +// It then unpacks it into `ResponseMessage`. +// `ResponseChannelItem` is used to forward the data within `ResponseMessage` to the receving channel. +// Client Peer: `RequestMessage` (send request) +// Server Peer: `RequestMessage` (receive request) -> `OutboundResponse` -> `NetworkResponse` (send response) +// Client Peer: `NetworkResponse` (receive response) -> `ResponseMessage(data)` -> `ResponseChannelItem(channel, data)` (handle response) +#[serde_as] #[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone, Copy)] pub enum RequestMessage { - RequestBlock(BlockHeight), + Block(BlockHeight), + SealedHeader(BlockHeight), + Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId), } /// Final Response Message that p2p service sends to the Orchestrator #[derive(Serialize, Deserialize, Debug, Clone)] pub enum ResponseMessage { - ResponseBlock(SealedBlock), + SealedBlock(Option), + SealedHeader(Option), + Transactions(Option>), } /// Holds oneshot channels for specific responses #[derive(Debug)] pub enum ResponseChannelItem { - ResponseBlock(oneshot::Sender), + Block(ChannelItem), + SealedHeader(ChannelItem<(PeerId, SealedBlockHeader)>), + Transactions(ChannelItem>), } /// Response that is sent over the wire /// and then additionaly deserialized into `ResponseMessage` #[derive(Serialize, Deserialize, Debug, Clone)] -pub enum IntermediateResponse { - ResponseBlock(Vec), +pub enum NetworkResponse { + Block(Option>), + Header(Option>), + Transactions(Option>), } /// Initial state of the `ResponseMessage` prior to having its inner value serialized -/// and wrapped into `IntermediateResponse` +/// and wrapped into `NetworkResponse` #[derive(Debug, Clone)] pub enum OutboundResponse { - ResponseBlock(Arc), + Block(Option>), + SealedHeader(Option>), + Transactions(Option>>), } #[derive(Debug)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 960e0e9694d..ea0ec2faf16 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -34,8 +34,12 @@ use fuel_core_types::{ blockchain::{ block::Block, consensus::ConsensusVote, - primitives::BlockHeight, + primitives::{ + BlockHeight, + BlockId, + }, SealedBlock, + SealedBlockHeader, }, fuel_tx::Transaction, services::p2p::{ @@ -48,7 +52,6 @@ use fuel_core_types::{ use futures::StreamExt; use libp2p::{ gossipsub::MessageAcceptance, - request_response::RequestId, PeerId, }; use std::{ @@ -75,10 +78,21 @@ enum TaskRequest { BroadcastVote(Arc), // Request to get one-off data from p2p network GetPeerIds(oneshot::Sender>), - GetBlock((BlockHeight, oneshot::Sender)), + GetBlock { + height: BlockHeight, + channel: oneshot::Sender>, + }, + GetSealedHeader { + height: BlockHeight, + channel: oneshot::Sender>, + }, + GetTransactions { + block_id: BlockId, + from_peer: PeerId, + channel: oneshot::Sender>>, + }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), - RespondWithRequestedBlock((Option>, RequestId)), } impl Debug for TaskRequest { @@ -187,17 +201,24 @@ where let peer_ids = self.p2p_service.get_peers_ids().into_iter().copied().collect(); let _ = channel.send(peer_ids); } - Some(TaskRequest::GetBlock((height, response))) => { - let request_msg = RequestMessage::RequestBlock(height); - let channel_item = ResponseChannelItem::ResponseBlock(response); + Some(TaskRequest::GetBlock { height, channel }) => { + let request_msg = RequestMessage::Block(height); + let channel_item = ResponseChannelItem::Block(channel); + let _ = self.p2p_service.send_request_msg(None, request_msg, channel_item); + } + Some(TaskRequest::GetSealedHeader{ height, channel: response }) => { + let request_msg = RequestMessage::SealedHeader(height); + let channel_item = ResponseChannelItem::SealedHeader(response); let _ = self.p2p_service.send_request_msg(None, request_msg, channel_item); } + Some(TaskRequest::GetTransactions { block_id, from_peer, channel }) => { + let request_msg = RequestMessage::Transactions(block_id); + let channel_item = ResponseChannelItem::Transactions(channel); + let _ = self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item); + } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { report_message(&mut self.p2p_service, message, acceptance); } - Some(TaskRequest::RespondWithRequestedBlock((response, request_id))) => { - let _ = self.p2p_service.send_response_msg(request_id, response.map(OutboundResponse::ResponseBlock)); - } None => { unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); } @@ -235,26 +256,33 @@ where }, Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => { match request_message { - RequestMessage::RequestBlock(block_height) => { + RequestMessage::Block(block_height) => { let db = self.db.clone(); - let request_sender = self.shared.request_sender.clone(); - - tokio::spawn(async move { - // TODO: Process `StorageError` somehow. - let block_response = db.get_sealed_block(&block_height) - .await - .expect("Didn't expect error from database") - .map(Arc::new); - let _ = request_sender.send( - TaskRequest::RespondWithRequestedBlock( - (block_response, request_id) - ) - ); - }); + + // TODO: Process `StorageError` somehow. + let block_response = db.get_sealed_block(&block_height)? + .map(Arc::new); + let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(block_response)); + } + RequestMessage::Transactions(block_id) => { + let db = self.db.clone(); + + let transactions_response = db.get_transactions(&block_id)? + .map(Arc::new); + + let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Transactions(transactions_response)); + } + RequestMessage::SealedHeader(block_height) => { + let db = self.db.clone(); + + let response = db.get_sealed_header(&block_height)? + .map(Arc::new); + + let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::SealedHeader(response)); } } }, - _ => {} + _ => (), } }, latest_block_height = self.next_block_height.next() => { @@ -299,11 +327,55 @@ impl SharedState { Ok(()) } - pub async fn get_block(&self, height: BlockHeight) -> anyhow::Result { + pub async fn get_block( + &self, + height: BlockHeight, + ) -> anyhow::Result> { + let (sender, receiver) = oneshot::channel(); + + self.request_sender + .send(TaskRequest::GetBlock { + height, + channel: sender, + }) + .await?; + + receiver.await.map_err(|e| anyhow!("{}", e)) + } + + pub async fn get_sealed_block_header( + &self, + height: BlockHeight, + ) -> anyhow::Result, SealedBlockHeader)>> { let (sender, receiver) = oneshot::channel(); self.request_sender - .send(TaskRequest::GetBlock((height, sender))) + .send(TaskRequest::GetSealedHeader { + height, + channel: sender, + }) + .await?; + + receiver + .await + .map(|o| o.map(|(peer_id, header)| (peer_id.to_bytes(), header))) + .map_err(|e| anyhow!("{}", e)) + } + + pub async fn get_transactions_from_peer( + &self, + peer_id: Vec, + block_id: BlockId, + ) -> anyhow::Result>> { + let (sender, receiver) = oneshot::channel(); + let from_peer = PeerId::from_bytes(&peer_id).expect("Valid PeerId"); + + self.request_sender + .send(TaskRequest::GetTransactions { + block_id, + from_peer, + channel: sender, + }) .await?; receiver.await.map_err(|e| anyhow!("{}", e)) @@ -425,7 +497,6 @@ pub mod tests { use crate::ports::P2pDb; use super::*; - use async_trait::async_trait; use fuel_core_services::Service; use fuel_core_storage::Result as StorageResult; @@ -441,9 +512,8 @@ pub mod tests { #[derive(Clone, Debug)] struct FakeDb; - #[async_trait] impl P2pDb for FakeDb { - async fn get_sealed_block( + fn get_sealed_block( &self, _height: &BlockHeight, ) -> StorageResult> { @@ -454,6 +524,25 @@ pub mod tests { consensus: Consensus::PoA(PoAConsensus::new(Default::default())), })) } + + fn get_sealed_header( + &self, + _height: &BlockHeight, + ) -> StorageResult> { + let header = Default::default(); + + Ok(Some(SealedBlockHeader { + entity: header, + consensus: Consensus::PoA(PoAConsensus::new(Default::default())), + })) + } + + fn get_transactions( + &self, + _block_id: &fuel_core_types::blockchain::primitives::BlockId, + ) -> StorageResult>> { + Ok(Some(vec![])) + } } #[derive(Clone, Debug)] diff --git a/crates/types/src/blockchain/primitives.rs b/crates/types/src/blockchain/primitives.rs index 804a0f90786..78b810b2254 100644 --- a/crates/types/src/blockchain/primitives.rs +++ b/crates/types/src/blockchain/primitives.rs @@ -222,3 +222,15 @@ pub struct SecretKeyWrapper(SecretKey); impl CloneableSecret for SecretKeyWrapper {} impl DebugSecret for SecretKeyWrapper {} + +impl From for [u8; 32] { + fn from(id: BlockId) -> Self { + id.0.into() + } +} + +impl From<[u8; 32]> for BlockId { + fn from(bytes: [u8; 32]) -> Self { + Self(bytes.into()) + } +}