Skip to content

Commit

Permalink
feat: Forward only verified and new HeaderSub messages
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique committed Oct 2, 2023
1 parent 4620849 commit 8bf0ed0
Showing 1 changed file with 87 additions and 25 deletions.
112 changes: 87 additions & 25 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use libp2p::{
Multiaddr, PeerId, TransportError,
};
use tokio::select;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tracing::{debug, info, instrument, trace, warn};

use crate::exchange::{ExchangeBehaviour, ExchangeConfig};
Expand Down Expand Up @@ -76,6 +76,7 @@ impl From<oneshot::error::RecvError> for P2pError {
#[derive(Debug)]
pub struct P2p<S> {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
_store: PhantomData<S>,
}

Expand Down Expand Up @@ -120,14 +121,16 @@ where

async fn start(args: P2pArgs<S>) -> Result<Self, P2pError> {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cmd_rx)?;
let (header_sub_tx, header_sub_rx) = watch::channel(None);
let mut worker = Worker::new(args, cmd_rx, header_sub_tx)?;

spawn(async move {
worker.run().await;
});

Ok(P2p {
cmd_tx,
header_sub_watcher: header_sub_rx,
_store: PhantomData,
})
}
Expand All @@ -151,6 +154,8 @@ pub trait P2pService:
{
type Store: Store;

fn new_header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>>;

async fn wait_connected(&self) -> Result<()> {
let (tx, rx) = oneshot::channel();

Expand Down Expand Up @@ -254,6 +259,10 @@ where
S: Store + 'static,
{
type Store = S;

fn new_header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
self.header_sub_watcher.clone()
}
}

/// Our network behaviour.
Expand All @@ -280,13 +289,19 @@ where
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
wait_connected_tx: Option<Vec<oneshot::Sender<()>>>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
store: Arc<S>,
}

impl<S> Worker<S>
where
S: Store + 'static,
{
fn new(args: P2pArgs<S>, cmd_rx: mpsc::Receiver<P2pCmd>) -> Result<Self, P2pError> {
fn new(
args: P2pArgs<S>,
cmd_rx: mpsc::Receiver<P2pCmd>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
) -> Result<Self, P2pError> {
let peer_tracker = Arc::new(PeerTracker::new());
let local_peer_id = PeerId::from(args.local_keypair.public());

Expand All @@ -307,7 +322,7 @@ where
let header_ex = ExchangeBehaviour::new(ExchangeConfig {
network_id: &args.network_id,
peer_tracker: peer_tracker.clone(),
header_store: args.store,
header_store: args.store.clone(),
});

let behaviour = Behaviour {
Expand Down Expand Up @@ -337,6 +352,8 @@ where
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
wait_connected_tx: None,
header_sub_watcher,
store: args.store.clone(),
})
}

Expand Down Expand Up @@ -364,7 +381,7 @@ where
match ev {
SwarmEvent::Behaviour(ev) => match ev {
BehaviourEvent::Identify(ev) => self.on_identify_event(ev).await?,
BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await?,
BehaviourEvent::Gossipsub(ev) => self.on_gossip_sub_event(ev).await,
BehaviourEvent::Kademlia(ev) => self.on_kademlia_event(ev).await?,
BehaviourEvent::Autonat(_)
| BehaviourEvent::KeepAlive(_)
Expand Down Expand Up @@ -450,24 +467,36 @@ where
}

#[instrument(level = "trace", skip(self))]
async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) -> Result<()> {
async fn on_gossip_sub_event(&mut self, ev: gossipsub::Event) {
match ev {
gossipsub::Event::Message { message, .. } => {
gossipsub::Event::Message {
message,
message_id,
..
} => {
let Some(peer) = message.source else {
// Validation mode is `strict` so this will never happen
return;
};

// We may discovered a new peer
if let Some(peer) = message.source {
self.on_peer_discovered(peer);
}
self.peer_maybe_discovered(peer);

if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]);
let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
} else {
trace!("Unhandled gossipsub message");
}
gossipsub::MessageAcceptance::Ignore
};

let _ = self
.swarm
.behaviour_mut()
.gossipsub
.report_message_validation_result(&message_id, &peer, acceptance);
}
_ => trace!("Unhandled gossipsub event"),
}

Ok(())
}

#[instrument(level = "trace", skip(self))]
Expand All @@ -492,7 +521,7 @@ where
}

#[instrument(skip_all, fields(peer_id = %peer_id))]
fn on_peer_discovered(&mut self, peer_id: PeerId) {
fn peer_maybe_discovered(&mut self, peer_id: PeerId) {
if !self.peer_tracker.maybe_discovered(peer_id) {
return;
}
Expand Down Expand Up @@ -547,17 +576,45 @@ where
}

#[instrument(skip_all)]
fn on_header_sub_message(&mut self, data: &[u8]) {
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
trace!("Malformed or invalid header from header-sub");
return;
return gossipsub::MessageAcceptance::Reject;
};

debug!("New header from header-sub ({header})");
// TODO: inform syncer about it
//
// TODO: Verify header against store. If it's verified propagate
// it to other peers. If not, discard it.
trace!("Received header from header-sub ({header})");

// We care only for headers newer than head
let Ok(head) = self.store.get_head().await else {
trace!("Store not initialized yet. Ignore header from header-sub ({header}).");
return gossipsub::MessageAcceptance::Ignore;
};

// Verify header
if head.verify(&header).is_err() {
trace!("Verification failed. Ignore header from header-sub ({header}).");
return gossipsub::MessageAcceptance::Ignore;
}

// Update it only if needed
let updated = self.header_sub_watcher.send_if_modified(|state| {
if let Some(known_header) = state {
if known_header.height() >= header.height() {
trace!("Header-sub already up-to-date. Ignore header ({header}).");
return false;
}
}

debug!("New header from header-sub ({header})");
*state = Some(header);
true
});

if updated {
gossipsub::MessageAcceptance::Accept
} else {
gossipsub::MessageAcceptance::Ignore
}
}
}

Expand All @@ -569,10 +626,15 @@ fn init_gossipsub<'a, S>(
// Here we expect the publisher to sign the message with their key.
let message_authenticity = gossipsub::MessageAuthenticity::Signed(args.local_keypair.clone());

let config = gossipsub::ConfigBuilder::default()
.validation_mode(gossipsub::ValidationMode::Strict)
.validate_messages()
.build()
.map_err(P2pError::GossipsubInit)?;

// build a gossipsub network behaviour
let mut gossipsub: gossipsub::Behaviour =
gossipsub::Behaviour::new(message_authenticity, gossipsub::Config::default())
.map_err(P2pError::GossipsubInit)?;
gossipsub::Behaviour::new(message_authenticity, config).map_err(P2pError::GossipsubInit)?;

for topic in topics {
gossipsub.subscribe(topic)?;
Expand Down

0 comments on commit 8bf0ed0

Please sign in to comment.