Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

request response for get transactions & sealed header from p2p network #919

Merged
merged 43 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7f52684
initial heartbeat and connection handler
leviathanbeak Jan 17, 2023
c675cd6
remove ping protocol from deps
leviathanbeak Jan 17, 2023
516fa59
replace ping with heartbeat
leviathanbeak Jan 17, 2023
0072af0
expose broadcast of block height data
leviathanbeak Jan 17, 2023
95eb701
expose udpate with block height for p2p service
leviathanbeak Jan 17, 2023
dacc3e3
remove loop {}
leviathanbeak Jan 17, 2023
b0aac2c
update test for heartbeat
leviathanbeak Jan 17, 2023
d1f3afc
clean comparison
leviathanbeak Jan 17, 2023
6aae43a
remove update block
leviathanbeak Jan 18, 2023
926e14a
add block importer port
leviathanbeak Jan 18, 2023
094670b
naive protection
leviathanbeak Jan 18, 2023
ccddb1f
use heartbeat data - dont ban
leviathanbeak Jan 19, 2023
cd29abb
add tracing
leviathanbeak Jan 19, 2023
14d073e
use StreamExt from futures instead
leviathanbeak Jan 19, 2023
9c8381f
create adapters for p2p
leviathanbeak Jan 19, 2023
87f2b20
hide behind p2p feature
leviathanbeak Jan 19, 2023
9d1452a
include blockheight correctly
leviathanbeak Jan 19, 2023
1aab373
get transactions from peer
leviathanbeak Jan 19, 2023
910a893
Merge branch 'master' into leviathanbeak/heartbeat_protocol
xgreenx Jan 20, 2023
7a317d2
Merge with master
xgreenx Jan 20, 2023
a2d69fd
Re-used existing `BlockImportAdapter`.
xgreenx Jan 20, 2023
f4005df
Merge branch 'master' into leviathanbeak/heartbeat_protocol
Voxelot Jan 20, 2023
eee0285
Merge branch 'master' into leviathanbeak/heartbeat_protocol
Voxelot Jan 21, 2023
bcf1cbb
Update debug comment
leviathanbeak Jan 21, 2023
475d4e9
Merge branch 'leviathanbeak/heartbeat_protocol' into leviathanbeak/ge…
freesig Jan 23, 2023
00e7dd3
Merge branch 'master' of github.com:FuelLabs/fuel-core into leviathan…
freesig Jan 23, 2023
46205e2
Merge branch 'leviathanbeak/heartbeat_protocol' into leviathanbeak/ge…
freesig Jan 23, 2023
c2cbece
Merge branch 'master' into leviathanbeak/get_transactions_from_peer
Voxelot Jan 23, 2023
4258c8d
Freesig/get sealed header (#920)
freesig Jan 23, 2023
f90476e
use Option<T> for every request/response message
leviathanbeak Jan 23, 2023
93ee4c4
use Option<T> for all possible enum variants in messages
leviathanbeak Jan 23, 2023
a660f96
cleanup enum names and document message flow
leviathanbeak Jan 23, 2023
16e0ec0
fix typo
leviathanbeak Jan 23, 2023
9501e79
implement p2p adapter
leviathanbeak Jan 23, 2023
92a4161
Merge branch 'master' into leviathanbeak/get_transactions_from_peer
leviathanbeak Jan 23, 2023
7d37a83
rename to sealed header
leviathanbeak Jan 23, 2023
f615d45
request response reusable
leviathanbeak Jan 23, 2023
8652ad1
Merge branch 'leviathanbeak/get_transactions_from_peer' of github.com…
leviathanbeak Jan 23, 2023
b2fe838
fix peer typo
leviathanbeak Jan 23, 2023
006d2f8
fix recursive calls
freesig Jan 23, 2023
864c790
fix issue where channel can deadlock and bug in serialization len
freesig Jan 24, 2023
15c2d76
make max peers test run more efficently
freesig Jan 24, 2023
6b3762c
remove unused async
freesig Jan 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 38 additions & 11 deletions crates/fuel-core/src/database/sealed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@ use fuel_core_storage::{
StorageInspect,
StorageMutate,
};
use fuel_core_types::blockchain::{
consensus::{
Consensus,
Genesis,
use fuel_core_types::{
blockchain::{
consensus::{
Consensus,
Genesis,
Sealed,
},
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
fuel_tx::Transaction,
};
use std::borrow::Cow;

Expand Down Expand Up @@ -86,7 +90,10 @@ impl Database {
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlock>> {
let block_id = self.get_block_id(height)?.ok_or(not_found!("BlockId"))?;
let block_id = match self.get_block_id(height)? {
Some(i) => i,
None => return Ok(None),
};
self.get_sealed_block_by_id(&block_id)
}

Expand All @@ -104,6 +111,17 @@ impl Database {
}
}

pub fn get_sealed_block_header_by_height(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlockHeader>> {
let block_id = match self.get_block_id(height)? {
Some(i) => i,
None => return Ok(None),
};
self.get_sealed_block_header(&block_id)
}

pub fn get_sealed_block_header(
&self,
block_id: &BlockId,
Expand All @@ -122,4 +140,13 @@ impl Database {
Ok(None)
}
}

pub fn get_transactions_on_block(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
Ok(self
.get_sealed_block_by_id(block_id)?
.map(|Sealed { entity: block, .. }| block.into_inner().1))
}
}
30 changes: 25 additions & 5 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,39 @@ use fuel_core_p2p::ports::{
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
use fuel_core_types::blockchain::{
primitives::BlockHeight,
SealedBlock,
use fuel_core_types::{
blockchain::{
primitives::{
BlockHeight,
BlockId,
},
SealedBlock,
SealedBlockHeader,
},
fuel_tx::Transaction,
};

#[async_trait::async_trait]
impl P2pDb for Database {
async fn get_sealed_block(
fn get_sealed_block(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlock>> {
self.get_sealed_block_by_height(height)
}

fn get_sealed_header(
&self,
height: &BlockHeight,
) -> StorageResult<Option<SealedBlockHeader>> {
self.get_sealed_block_header_by_height(height)
}

fn get_transactions(
&self,
block_id: &BlockId,
) -> StorageResult<Option<Vec<Transaction>>> {
self.get_transactions_on_block(block_id)
}
}

impl BlockHeightImporter for BlockImporterAdapter {
Expand Down
1 change: 1 addition & 0 deletions crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ libp2p-yamux = "=0.42.0"
prometheus-client = "0.18"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_with = "1.11"
sha2 = "0.10"
tokio = { version = "1.21", features = ["sync"] }
tracing = "0.1"
Expand Down
16 changes: 7 additions & 9 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
PeerManagerBehaviour,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
RequestMessage,
},
};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub enum FuelBehaviourEvent {
Discovery(DiscoveryEvent),
PeerInfo(PeerInfoEvent),
Gossipsub(GossipsubEvent),
RequestResponse(RequestResponseEvent<RequestMessage, IntermediateResponse>),
RequestResponse(RequestResponseEvent<RequestMessage, NetworkResponse>),
}

/// Handles all p2p protocols needed for Fuel.
Expand Down Expand Up @@ -180,9 +180,9 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {

pub fn send_response_msg(
&mut self,
channel: ResponseChannel<IntermediateResponse>,
message: IntermediateResponse,
) -> Result<(), IntermediateResponse> {
channel: ResponseChannel<NetworkResponse>,
message: NetworkResponse,
) -> Result<(), NetworkResponse> {
self.request_response.send_response(channel, message)
}

Expand Down Expand Up @@ -228,10 +228,8 @@ impl From<GossipsubEvent> for FuelBehaviourEvent {
}
}

impl From<RequestResponseEvent<RequestMessage, IntermediateResponse>>
for FuelBehaviourEvent
{
fn from(event: RequestResponseEvent<RequestMessage, IntermediateResponse>) -> Self {
impl From<RequestResponseEvent<RequestMessage, NetworkResponse>> for FuelBehaviourEvent {
fn from(event: RequestResponseEvent<RequestMessage, NetworkResponse>) -> Self {
FuelBehaviourEvent::RequestResponse(event)
}
}
22 changes: 11 additions & 11 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
Expand All @@ -31,22 +31,22 @@ pub trait GossipsubCodec {
}

pub trait RequestResponseConverter {
/// Response that is ready to be converted into IntermediateResponse
/// Response that is ready to be converted into NetworkResponse
type OutboundResponse;
/// Response that is sent over the network
type IntermediateResponse;
type NetworkResponse;
/// Final Response Message deserialized from IntermediateResponse
type ResponseMessage;

fn convert_to_response(
fn convert_to_network_response(
&self,
inter_msg: &Self::IntermediateResponse,
) -> Result<Self::ResponseMessage, io::Error>;
res_msg: &Self::OutboundResponse,
) -> Result<Self::NetworkResponse, io::Error>;

fn convert_to_intermediate(
fn convert_to_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::IntermediateResponse, io::Error>;
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error>;
}

/// Main Codec trait
Expand All @@ -55,9 +55,9 @@ pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + RequestResponseCodec<Request = RequestMessage, Response = IntermediateResponse>
> + RequestResponseCodec<Request = RequestMessage, Response = NetworkResponse>
+ RequestResponseConverter<
IntermediateResponse = IntermediateResponse,
NetworkResponse = NetworkResponse,
OutboundResponse = OutboundResponse,
ResponseMessage = ResponseMessage,
> + Clone
Expand Down
88 changes: 76 additions & 12 deletions crates/services/p2p/src/codecs/bincode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
GossipsubMessage,
},
request_response::messages::{
IntermediateResponse,
NetworkResponse,
OutboundResponse,
RequestMessage,
ResponseMessage,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl BincodeCodec {
impl RequestResponseCodec for BincodeCodec {
type Protocol = MessageExchangeBincodeProtocol;
type Request = RequestMessage;
type Response = IntermediateResponse;
type Response = NetworkResponse;

async fn read_request<T>(
&mut self,
Expand Down Expand Up @@ -186,29 +186,77 @@ impl GossipsubCodec for BincodeCodec {
}

impl RequestResponseConverter for BincodeCodec {
type IntermediateResponse = IntermediateResponse;
type NetworkResponse = NetworkResponse;
type OutboundResponse = OutboundResponse;
type ResponseMessage = ResponseMessage;

fn convert_to_response(
&self,
inter_msg: &Self::IntermediateResponse,
inter_msg: &Self::NetworkResponse,
) -> Result<Self::ResponseMessage, io::Error> {
match inter_msg {
IntermediateResponse::ResponseBlock(block_bytes) => Ok(
ResponseMessage::ResponseBlock(self.deserialize(block_bytes)?),
),
NetworkResponse::Block(block_bytes) => {
let response = if let Some(block_bytes) = block_bytes {
Some(self.deserialize(block_bytes)?)
} else {
None
};

Ok(ResponseMessage::SealedBlock(response))
}
NetworkResponse::Header(header_bytes) => {
let response = if let Some(header_bytes) = header_bytes {
Some(self.deserialize(header_bytes)?)
} else {
None
};

Ok(ResponseMessage::SealedHeader(response))
}
NetworkResponse::Transactions(tx_bytes) => {
let response = if let Some(tx_bytes) = tx_bytes {
Some(self.deserialize(tx_bytes)?)
} else {
None
};

Ok(ResponseMessage::Transactions(response))
}
}
}

fn convert_to_intermediate(
fn convert_to_network_response(
&self,
res_msg: &Self::OutboundResponse,
) -> Result<Self::IntermediateResponse, io::Error> {
) -> Result<Self::NetworkResponse, io::Error> {
match res_msg {
OutboundResponse::ResponseBlock(sealed_block) => Ok(
IntermediateResponse::ResponseBlock(self.serialize(&**sealed_block)?),
),
OutboundResponse::Block(sealed_block) => {
let response = if let Some(sealed_block) = sealed_block {
Some(self.serialize(&**sealed_block)?)
} else {
None
};

Ok(NetworkResponse::Block(response))
}
OutboundResponse::SealedHeader(sealed_header) => {
let response = if let Some(sealed_header) = sealed_header {
Some(self.serialize(&**sealed_header)?)
} else {
None
};

Ok(NetworkResponse::Header(response))
}
OutboundResponse::Transactions(transactions) => {
let response = if let Some(transactions) = transactions {
Some(self.serialize(&**transactions)?)
} else {
None
};

Ok(NetworkResponse::Transactions(response))
}
}
}
}
Expand All @@ -227,3 +275,19 @@ impl ProtocolName for MessageExchangeBincodeProtocol {
REQUEST_RESPONSE_PROTOCOL_ID
}
}

#[cfg(test)]
mod tests {
use fuel_core_types::blockchain::primitives::BlockId;

use super::*;

#[test]
fn test_request_size_matches() {
let m = RequestMessage::Transactions(BlockId::default());
assert_eq!(
bincode::serialized_size(&m).unwrap() as usize,
MAX_REQUEST_SIZE
);
}
}
Loading