Skip to content

Commit

Permalink
request response for get transactions & sealed header from p2p network (
Browse files Browse the repository at this point in the history
#919)

closes #878 #877 

The work has been done on 2 (now merged) branches, by @freesig and
myself.

Co-authored-by: green <xgreenx9999@gmail.com>
Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
Co-authored-by: Tom <tomrgowan@gmail.com>
Co-authored-by: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com>
  • Loading branch information
5 people authored Jan 24, 2023
1 parent 99a7d1c commit 84c2ebf
Show file tree
Hide file tree
Showing 12 changed files with 501 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 38 additions & 11 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@ use fuel_core_storage::{
StorageInspect,
StorageMutate,
};
use fuel_core_types::blockchain::{
consensus::{
Consensus,
Genesis,
use fuel_core_types::{
blockchain::{
consensus::{
Consensus,
Genesis,
Sealed,
},
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
fuel_tx::Transaction,
};
use std::borrow::Cow;

Expand Down Expand Up @@ -86,7 +90,10 @@ impl Database {
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlock>> {
let block_id = self.get_block_id(height)?.ok_or(not_found!("BlockId"))?;
let block_id = match self.get_block_id(height)? {
Some(i) => i,
None => return Ok(None),
};
self.get_sealed_block_by_id(&block_id)
}

Expand All @@ -104,6 +111,17 @@ impl Database {
}
}

pub fn get_sealed_block_header_by_height(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlockHeader>> {
let block_id = match self.get_block_id(height)? {
Some(i) => i,
None => return Ok(None),
};
self.get_sealed_block_header(&block_id)
}

pub fn get_sealed_block_header(
&self,
block_id: &BlockId,
Expand All @@ -122,4 +140,13 @@ impl Database {
Ok(None)
}
}

pub fn get_transactions_on_block(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
Ok(self
.get_sealed_block_by_id(block_id)?
.map(|Sealed { entity: block, .. }| block.into_inner().1))
}
}
30 changes: 25 additions & 5 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,39 @@ use fuel_core_p2p::ports::{
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::blockchain::{
primitives::BlockHeight,
SealedBlock,
use fuel_core_types::{
blockchain::{
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
};

#[async_trait::async_trait]
impl P2pDb for Database {
async fn get_sealed_block(
fn get_sealed_block(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlock>> {
self.get_sealed_block_by_height(height)
}

fn get_sealed_header(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlockHeader>> {
self.get_sealed_block_header_by_height(height)
}

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
self.get_transactions_on_block(block_id)
}
}

impl BlockHeightImporter for BlockImporterAdapter {
Expand Down
1 change: 1 addition & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ libp2p-yamux = "=0.42.0"
prometheus-client = "0.18"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.11"
sha2 = "0.10"
tokio = { version = "1.21", features = ["sync"] }
tracing = "0.1"
Expand Down
16 changes: 7 additions & 9 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
PeerManagerBehaviour,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
RequestMessage,
},
};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub enum FuelBehaviourEvent {
Discovery(DiscoveryEvent),
PeerInfo(PeerInfoEvent),
Gossipsub(GossipsubEvent),
RequestResponse(RequestResponseEvent<RequestMessage, IntermediateResponse>),
RequestResponse(RequestResponseEvent<RequestMessage, NetworkResponse>),
}

/// Handles all p2p protocols needed for Fuel.
Expand Down Expand Up @@ -180,9 +180,9 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {

pub fn send_response_msg(
&mut self,
channel: ResponseChannel<IntermediateResponse>,
message: IntermediateResponse,
) -> Result<(), IntermediateResponse> {
channel: ResponseChannel<NetworkResponse>,
message: NetworkResponse,
) -> Result<(), NetworkResponse> {
self.request_response.send_response(channel, message)
}

Expand Down Expand Up @@ -228,10 +228,8 @@ impl From<GossipsubEvent> for FuelBehaviourEvent {
}
}

impl From<RequestResponseEvent<RequestMessage, IntermediateResponse>>
for FuelBehaviourEvent
{
fn from(event: RequestResponseEvent<RequestMessage, IntermediateResponse>) -> Self {
impl From<RequestResponseEvent<RequestMessage, NetworkResponse>> for FuelBehaviourEvent {
fn from(event: RequestResponseEvent<RequestMessage, NetworkResponse>) -> Self {
FuelBehaviourEvent::RequestResponse(event)
}
}
22 changes: 11 additions & 11 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
Expand All @@ -31,22 +31,22 @@ pub trait GossipsubCodec {
}

pub trait RequestResponseConverter {
/// Response that is ready to be converted into IntermediateResponse
/// Response that is ready to be converted into NetworkResponse
type OutboundResponse;
/// Response that is sent over the network
type IntermediateResponse;
type NetworkResponse;
/// Final Response Message deserialized from IntermediateResponse
type ResponseMessage;

fn convert_to_response(
fn convert_to_network_response(
&self,
inter_msg: &Self::IntermediateResponse,
) -> Result<Self::ResponseMessage, io::Error>;
res_msg: &Self::OutboundResponse,
) -> Result<Self::NetworkResponse, io::Error>;

fn convert_to_intermediate(
fn convert_to_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::IntermediateResponse, io::Error>;
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error>;
}

/// Main Codec trait
Expand All @@ -55,9 +55,9 @@ pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + RequestResponseCodec<Request = RequestMessage, Response = IntermediateResponse>
> + RequestResponseCodec<Request = RequestMessage, Response = NetworkResponse>
+ RequestResponseConverter<
IntermediateResponse = IntermediateResponse,
NetworkResponse = NetworkResponse,
OutboundResponse = OutboundResponse,
ResponseMessage = ResponseMessage,
> + Clone
Expand Down
88 changes: 76 additions & 12 deletions crates/services/p2p/src/codecs/bincode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl BincodeCodec {
impl RequestResponseCodec for BincodeCodec {
type Protocol = MessageExchangeBincodeProtocol;
type Request = RequestMessage;
type Response = IntermediateResponse;
type Response = NetworkResponse;

async fn read_request<T>(
&mut self,
Expand Down Expand Up @@ -186,29 +186,77 @@ impl GossipsubCodec for BincodeCodec {
}

impl RequestResponseConverter for BincodeCodec {
type IntermediateResponse = IntermediateResponse;
type NetworkResponse = NetworkResponse;
type OutboundResponse = OutboundResponse;
type ResponseMessage = ResponseMessage;

fn convert_to_response(
&self,
inter_msg: &Self::IntermediateResponse,
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error> {
match inter_msg {
IntermediateResponse::ResponseBlock(block_bytes) => Ok(
ResponseMessage::ResponseBlock(self.deserialize(block_bytes)?),
),
NetworkResponse::Block(block_bytes) => {
let response = if let Some(block_bytes) = block_bytes {
Some(self.deserialize(block_bytes)?)
} else {
None
};

Ok(ResponseMessage::SealedBlock(response))
}
NetworkResponse::Header(header_bytes) => {
let response = if let Some(header_bytes) = header_bytes {
Some(self.deserialize(header_bytes)?)
} else {
None
};

Ok(ResponseMessage::SealedHeader(response))
}
NetworkResponse::Transactions(tx_bytes) => {
let response = if let Some(tx_bytes) = tx_bytes {
Some(self.deserialize(tx_bytes)?)
} else {
None
};

Ok(ResponseMessage::Transactions(response))
}
}
}

fn convert_to_intermediate(
fn convert_to_network_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::IntermediateResponse, io::Error> {
) -> Result<Self::NetworkResponse, io::Error> {
match res_msg {
OutboundResponse::ResponseBlock(sealed_block) => Ok(
IntermediateResponse::ResponseBlock(self.serialize(&**sealed_block)?),
),
OutboundResponse::Block(sealed_block) => {
let response = if let Some(sealed_block) = sealed_block {
Some(self.serialize(&**sealed_block)?)
} else {
None
};

Ok(NetworkResponse::Block(response))
}
OutboundResponse::SealedHeader(sealed_header) => {
let response = if let Some(sealed_header) = sealed_header {
Some(self.serialize(&**sealed_header)?)
} else {
None
};

Ok(NetworkResponse::Header(response))
}
OutboundResponse::Transactions(transactions) => {
let response = if let Some(transactions) = transactions {
Some(self.serialize(&**transactions)?)
} else {
None
};

Ok(NetworkResponse::Transactions(response))
}
}
}
}
Expand All @@ -227,3 +275,19 @@ impl ProtocolName for MessageExchangeBincodeProtocol {
REQUEST_RESPONSE_PROTOCOL_ID
}
}

#[cfg(test)]
mod tests {
use fuel_core_types::blockchain::primitives::BlockId;

use super::*;

#[test]
fn test_request_size_matches() {
let m = RequestMessage::Transactions(BlockId::default());
assert_eq!(
bincode::serialized_size(&m).unwrap() as usize,
MAX_REQUEST_SIZE
);
}
}
Loading

0 comments on commit 84c2ebf

Please sign in to comment.