Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossipsub message validation #694

Merged
merged 26 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fba37e0
remove loop from next_event
leviathanbeak Oct 7, 2022
ee9618e
initial work
leviathanbeak Oct 11, 2022
4030736
more work
leviathanbeak Oct 11, 2022
2afb294
trying to use an alternative approach to message id cache
Voxelot Oct 11, 2022
7e691fb
quick break
leviathanbeak Oct 11, 2022
b676c5b
fix sized issue
Voxelot Oct 11, 2022
7acd4cc
finalize gossipsub validation
leviathanbeak Oct 12, 2022
a40daf5
remove unused
leviathanbeak Oct 12, 2022
68e51ee
update fast_gossip_message_id
leviathanbeak Oct 12, 2022
7d02dc3
check MessageId calculation
leviathanbeak Oct 12, 2022
d2456c8
cleanup network orchestrator
leviathanbeak Oct 12, 2022
65eb096
move types to p2p for reuse
leviathanbeak Oct 12, 2022
09a9525
Merge branch 'master' into leviathanbeak/gossipsub_validate_msg
leviathanbeak Oct 12, 2022
dcd5d02
add consensus gossip data instead
leviathanbeak Oct 12, 2022
b1bf7a7
add serde only feature
leviathanbeak Oct 14, 2022
50ce0de
update with suggestions
leviathanbeak Oct 14, 2022
c49f264
remove unused
leviathanbeak Oct 14, 2022
7ed6b7e
use gossipsub_config
leviathanbeak Oct 14, 2022
6e6193f
hash complete messages
leviathanbeak Oct 14, 2022
250ba46
Merge branch 'master' into leviathanbeak/gossipsub_validate_msg
leviathanbeak Oct 14, 2022
6beaec8
use gossipsub config and its builder
leviathanbeak Oct 17, 2022
8a6cc00
Merge branch 'master' into leviathanbeak/gossipsub_validate_msg
leviathanbeak Oct 17, 2022
cc3c3a7
import serde only as feature
leviathanbeak Oct 17, 2022
13e36a0
Merge branch 'master' of github.com:FuelLabs/fuel-core into leviathan…
leviathanbeak Oct 19, 2022
b618fe1
use test-helpers from fuel-core-interfaces
leviathanbeak Oct 19, 2022
4908019
to the right place in toml
leviathanbeak Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion fuel-core-interfaces/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,54 @@ use crate::{
model::ConsensusVote,
};
use async_trait::async_trait;
use std::sync::Arc;
use serde::{
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
Deserialize,
Serialize,
};
use std::{
fmt::Debug,
sync::Arc,
};
use tokio::sync::oneshot;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TransactionBroadcast {
NewTransaction(Transaction),
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ConsensusBroadcast {
NewVote(ConsensusVote),
}

#[derive(Debug, Clone)]
pub enum BlockBroadcast {
/// fuel block without consensus data
NewBlock(FuelBlock),
}

#[derive(Debug)]
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
pub enum GossipsubMessageAcceptance {
Accept,
Reject,
Ignore,
}

#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
pub struct GossipsubMessageInfo {
pub message_id: Vec<u8>,
pub peer_id: Vec<u8>,
}

impl<T> From<&GossipData<T>> for GossipsubMessageInfo {
fn from(gossip_data: &GossipData<T>) -> Self {
Self {
message_id: gossip_data.message_id.clone(),
peer_id: gossip_data.peer_id.clone(),
}
}
}

#[derive(Debug)]
pub enum P2pRequestEvent {
RequestBlock {
Expand All @@ -40,9 +71,48 @@ pub enum P2pRequestEvent {
BroadcastConsensusVote {
vote: Arc<ConsensusVote>,
},
GossipsubMessageReport {
message: GossipsubMessageInfo,
acceptance: GossipsubMessageAcceptance,
},
Stop,
}

#[derive(Debug, Clone)]
pub struct GossipData<T> {
pub data: Option<T>,
pub peer_id: Vec<u8>,
pub message_id: Vec<u8>,
}

pub type ConsensusGossipData = GossipData<ConsensusBroadcast>;
pub type TransactionGossipData = GossipData<TransactionBroadcast>;
pub type BlockGossipData = GossipData<BlockBroadcast>;

impl<T> GossipData<T> {
pub fn new(
data: T,
peer_id: impl Into<Vec<u8>>,
message_id: impl Into<Vec<u8>>,
) -> Self {
Self {
data: Some(data),
peer_id: peer_id.into(),
message_id: message_id.into(),
}
}
}

pub trait NetworkData<T>: Debug + Send {
fn take_data(&mut self) -> Option<T>;
}
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub trait NetworkData<T>: Debug + Send {
fn take_data(&mut self) -> Option<T>;
}
pub trait NetworkData<T>: Debug + Send {
type Metadata;
fn take_data(self) -> (Self::Metadata, T);
}

I'm thinking it would be better to actually split the type so that we don't have to handle the option?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm maybe we can defer this to a followup task? Ideally in a way that wouldn't require users of this trait to know or care about the concrete type for metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did raise this concern in the architecture PR but was told it was an implementation detail. Happy to do a follow up though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think it's just a matter of having a fully fleshed out proposal of this actually working


impl<T: Debug + Send + 'static> NetworkData<T> for GossipData<T> {
fn take_data(&mut self) -> Option<T> {
self.data.take()
}
}

#[async_trait]
pub trait P2pDb: Send + Sync {
async fn get_sealed_block(&self, height: BlockHeight)
Expand Down
16 changes: 15 additions & 1 deletion fuel-p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use libp2p::{
},
Gossipsub,
GossipsubEvent,
MessageAcceptance,
MessageId,
},
request_response::{
Expand Down Expand Up @@ -130,7 +131,7 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {

Self {
discovery: discovery_config.finish(),
gossipsub: build_gossipsub(&p2p_config.local_keypair, p2p_config),
gossipsub: build_gossipsub(p2p_config),
peer_info,
request_response,
}
Expand Down Expand Up @@ -190,6 +191,19 @@ impl<Codec: NetworkCodec> FuelBehaviour<Codec> {
self.request_response.send_response(channel, message)
}

pub fn report_message_validation_result(
&mut self,
msg_id: &MessageId,
propagation_source: &PeerId,
acceptance: MessageAcceptance,
) -> Result<bool, PublishError> {
self.gossipsub.report_message_validation_result(
msg_id,
propagation_source,
acceptance,
)
}

// Currently only used in testing, but should be useful for the NetworkOrchestrator API
#[allow(dead_code)]
pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
Expand Down
31 changes: 15 additions & 16 deletions fuel-p2p/src/gossipsub/builder.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
use libp2p::{
gossipsub::{
FastMessageId,
Gossipsub,
GossipsubConfigBuilder,
GossipsubMessage,
MessageAuthenticity,
MessageId,
PeerScoreParams,
PeerScoreThresholds,
RawGossipsubMessage,
},
identity::Keypair,
use libp2p::gossipsub::{
FastMessageId,
Gossipsub,
GossipsubConfigBuilder,
GossipsubMessage,
MessageAuthenticity,
MessageId,
PeerScoreParams,
PeerScoreThresholds,
RawGossipsubMessage,
};
use sha2::{
Digest,
Sha224,
Sha256,
};

use crate::config::P2PConfig;

pub fn build_gossipsub(local_key: &Keypair, p2p_config: &P2PConfig) -> Gossipsub {
pub fn build_gossipsub(p2p_config: &P2PConfig) -> Gossipsub {
let gossip_message_id = move |message: &GossipsubMessage| {
MessageId::from(&Sha256::digest(&message.data)[..20])
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
};

let fast_gossip_message_id = move |message: &RawGossipsubMessage| {
FastMessageId::from(&Sha256::digest(&message.data)[..8])
FastMessageId::from(&Sha224::digest(&message.data)[..16])
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
};

let gossipsub_config = GossipsubConfigBuilder::default()
Expand All @@ -35,11 +33,12 @@ pub fn build_gossipsub(local_key: &Keypair, p2p_config: &P2PConfig) -> Gossipsub
.mesh_n_high(p2p_config.max_mesh_size)
.message_id_fn(gossip_message_id)
.fast_message_id_fn(fast_gossip_message_id)
.validate_messages()
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
.build()
.expect("valid gossipsub configuration");

let mut gossipsub = Gossipsub::new(
MessageAuthenticity::Signed(local_key.clone()),
MessageAuthenticity::Signed(p2p_config.local_keypair.clone()),
gossipsub_config,
)
.expect("gossipsub initialized");
Expand Down
92 changes: 74 additions & 18 deletions fuel-p2p/src/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
use std::sync::Arc;

use anyhow::anyhow;

use fuel_core_interfaces::p2p::{
BlockBroadcast,
BlockGossipData,
ConsensusBroadcast,
ConsensusGossipData,
GossipData,
GossipsubMessageAcceptance,
GossipsubMessageInfo,
P2pDb,
P2pRequestEvent,
TransactionBroadcast,
TransactionGossipData,
};
use libp2p::{
gossipsub::MessageAcceptance,
request_response::RequestId,
};
use libp2p::request_response::RequestId;
use tokio::{
sync::{
broadcast,
Expand All @@ -20,10 +30,16 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::warn;
use tracing::{
info,
warn,
};

use crate::{
codecs::bincode::BincodeCodec,
codecs::{
bincode::BincodeCodec,
NetworkCodec,
},
config::P2PConfig,
gossipsub::messages::{
GossipsubBroadcastRequest,
Expand All @@ -48,11 +64,10 @@ pub struct NetworkOrchestrator {
rx_outbound_responses: Receiver<Option<(OutboundResponse, RequestId)>>,

// senders
tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,
tx_consensus: Sender<ConsensusGossipData>,
tx_transaction: broadcast::Sender<TransactionGossipData>,
tx_block: Sender<BlockGossipData>,
tx_outbound_responses: Sender<Option<(OutboundResponse, RequestId)>>,

db: Arc<dyn P2pDb>,
}

Expand All @@ -61,9 +76,9 @@ impl NetworkOrchestrator {
p2p_config: P2PConfig,
rx_request_event: Receiver<P2pRequestEvent>,

tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,
tx_consensus: Sender<ConsensusGossipData>,
tx_transaction: broadcast::Sender<TransactionGossipData>,
tx_block: Sender<BlockGossipData>,

db: Arc<dyn P2pDb>,
) -> Self {
Expand Down Expand Up @@ -97,20 +112,22 @@ impl NetworkOrchestrator {
},
p2p_event = p2p_service.next_event() => {
match p2p_event {
FuelP2PEvent::GossipsubMessage { message, .. } => {
Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => {
let message_id = message_id.0;

match message {
GossipsubMessage::NewTx(tx) => {
let _ = self.tx_transaction.send(TransactionBroadcast::NewTransaction(tx));
let _ = self.tx_transaction.send(GossipData::new(TransactionBroadcast::NewTransaction(tx), peer_id, message_id));
},
GossipsubMessage::NewBlock(block) => {
let _ = self.tx_block.send(BlockBroadcast::NewBlock(block));
let _ = self.tx_block.send(GossipData::new(BlockBroadcast::NewBlock(block), peer_id, message_id));
},
GossipsubMessage::ConsensusVote(vote) => {
let _ = self.tx_consensus.send(ConsensusBroadcast::NewVote(vote));
let _ = self.tx_consensus.send(GossipData::new(ConsensusBroadcast::NewVote(vote), peer_id, message_id));
},
}
},
FuelP2PEvent::RequestMessage { request_message, request_id } => {
Some(FuelP2PEvent::RequestMessage { request_message, request_id }) => {
match request_message {
RequestMessage::RequestBlock(block_height) => {
let db = self.db.clone();
Expand Down Expand Up @@ -146,6 +163,9 @@ impl NetworkOrchestrator {
let broadcast = GossipsubBroadcastRequest::ConsensusVote(vote);
let _ = p2p_service.publish_message(broadcast);
},
P2pRequestEvent::GossipsubMessageReport { message, acceptance } => {
report_message(message, acceptance, &mut p2p_service);
}
P2pRequestEvent::Stop => break,
}
} else {
Expand All @@ -159,6 +179,42 @@ impl NetworkOrchestrator {
}
}

fn report_message<T: NetworkCodec>(
message: GossipsubMessageInfo,
acceptance: GossipsubMessageAcceptance,
p2p_service: &mut FuelP2PService<T>,
) {
let GossipsubMessageInfo {
peer_id,
message_id,
} = message;

let msg_id = message_id.into();

if let Ok(peer_id) = peer_id.try_into() {
let acceptance = match acceptance {
GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
};

match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance)
{
Ok(true) => {
info!(target: "fuel-libp2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, peer_id);
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(false) => {
warn!(target: "fuel-libp2p", "Message with MessageId: {} not found in the Gossipsub Message Cache", msg_id);
}
Err(e) => {
warn!(target: "fuel-libp2p", "Failed to publish Message with MessageId: {} with Error: {:?}", msg_id, e);
leviathanbeak marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
warn!(target: "fuel-libp2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id);
}
}

pub struct Service {
/// Network Orchestrator that handles p2p network and inter-module communication
network_orchestrator: Arc<Mutex<Option<NetworkOrchestrator>>>,
Expand All @@ -174,9 +230,9 @@ impl Service {
db: Arc<dyn P2pDb>,
tx_request_event: Sender<P2pRequestEvent>,
rx_request_event: Receiver<P2pRequestEvent>,
tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,
tx_consensus: Sender<ConsensusGossipData>,
tx_transaction: broadcast::Sender<TransactionGossipData>,
tx_block: Sender<BlockGossipData>,
) -> Self {
let network_orchestrator = NetworkOrchestrator::new(
p2p_config,
Expand Down
Loading