From 8cdad0bf2145a60e5b40fd588c814eba686232fa Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 23 Oct 2024 20:00:37 -0400 Subject: [PATCH 1/4] refactor hera subcomamnds --- bin/hera/src/disc.rs | 45 +++++++++++++++++++++++ bin/hera/src/gossip.rs | 59 ++++++++++++++++++++++++++++++ bin/hera/src/main.rs | 12 ++++-- bin/hera/src/network.rs | 1 + crates/net/README.md | 15 ++++---- crates/net/src/builder.rs | 12 +++++- crates/net/src/discovery/driver.rs | 6 ++- crates/net/src/driver.rs | 4 ++ crates/net/src/gossip/driver.rs | 10 ++++- crates/net/src/gossip/handler.rs | 21 +++++++++-- 10 files changed, 165 insertions(+), 20 deletions(-) create mode 100644 bin/hera/src/disc.rs create mode 100644 bin/hera/src/gossip.rs diff --git a/bin/hera/src/disc.rs b/bin/hera/src/disc.rs new file mode 100644 index 0000000..2a2aa3d --- /dev/null +++ b/bin/hera/src/disc.rs @@ -0,0 +1,45 @@ +//! Discovery subcommand for Hera. + +use crate::globals::GlobalArgs; +use clap::Args; +use eyre::Result; +use op_net::discovery::builder::DiscoveryBuilder; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +/// The Hera discovery subcommand. +#[derive(Debug, Clone, Args)] +#[non_exhaustive] +pub struct DiscCommand { + /// Port to listen for gossip on. + #[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")] + pub gossip_port: u16, + /// Interval to send discovery packets. + #[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")] + pub interval: u64, +} + +impl DiscCommand { + /// Run the discovery subcommand. + pub async fn run(self, args: &GlobalArgs) -> Result<()> { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); + tracing::info!("Starting discovery service on {:?}", socket); + + let mut discovery_builder = + DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id); + let mut discovery = discovery_builder.build()?; + discovery.interval = std::time::Duration::from_secs(self.interval); + let mut peer_recv = discovery.start()?; + tracing::info!("Discovery service started, receiving peers."); + + loop { + match peer_recv.recv().await { + Some(peer) => { + tracing::info!("Received peer: {:?}", peer); + } + None => { + tracing::warn!("Failed to receive peer"); + } + } + } + } +} diff --git a/bin/hera/src/gossip.rs b/bin/hera/src/gossip.rs new file mode 100644 index 0000000..ceb1c30 --- /dev/null +++ b/bin/hera/src/gossip.rs @@ -0,0 +1,59 @@ +//! Gossip subcommand for Hera. + +use crate::globals::GlobalArgs; +use clap::Args; +use eyre::Result; +use op_net::driver::NetworkDriver; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use superchain::ROLLUP_CONFIGS; + +/// The Hera gossip subcommand. +#[derive(Debug, Clone, Args)] +#[non_exhaustive] +pub struct GossipCommand { + /// Port to listen for gossip on. + #[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")] + pub gossip_port: u16, + /// Interval to send discovery packets. + #[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")] + pub interval: u64, +} + +impl GossipCommand { + /// Run the discovery subcommand. + pub async fn run(self, args: &GlobalArgs) -> Result<()> { + let signer = ROLLUP_CONFIGS + .get(&args.l2_chain_id) + .ok_or(eyre::eyre!("No rollup config found for chain ID"))? + .genesis + .system_config + .as_ref() + .ok_or(eyre::eyre!("No system config found for chain ID"))? + .batcher_address; + tracing::info!("Gossip configured with signer: {:?}", signer); + + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); + tracing::info!("Starting gossip driver on {:?}", socket); + + let mut driver = NetworkDriver::builder() + .with_chain_id(args.l2_chain_id) + .with_unsafe_block_signer(signer) + .with_gossip_addr(socket) + .with_interval(std::time::Duration::from_secs(self.interval)) + .build()?; + let recv = + driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?; + driver.start()?; + tracing::info!("Gossip driver started, receiving blocks."); + loop { + match recv.recv() { + Ok(block) => { + tracing::info!("Received unsafe block: {:?}", block); + } + Err(e) => { + tracing::warn!("Failed to receive unsafe block: {:?}", e); + } + } + } + } +} diff --git a/bin/hera/src/main.rs b/bin/hera/src/main.rs index 24342e9..65dc574 100644 --- a/bin/hera/src/main.rs +++ b/bin/hera/src/main.rs @@ -9,7 +9,8 @@ use clap::{Parser, Subcommand}; use eyre::Result; mod globals; -mod network; +mod disc; +mod gossip; mod node; /// The Hera CLI Arguments. @@ -30,8 +31,10 @@ pub(crate) struct HeraArgs { pub(crate) enum HeraSubcommand { /// Run the standalone Hera node. Node(node::NodeCommand), - /// Networking utility commands. - Network(network::NetworkCommand), + /// Discovery service command. + Disc(disc::DiscCommand), + /// Gossip service command. + Gossip(gossip::GossipCommand), } #[tokio::main] @@ -45,6 +48,7 @@ async fn main() -> Result<()> { // Dispatch on subcommand. match args.subcommand { HeraSubcommand::Node(node) => node.run(&args.global).await, - HeraSubcommand::Network(network) => network.run(&args.global).await, + HeraSubcommand::Disc(disc) => disc.run(&args.global).await, + HeraSubcommand::Gossip(gossip) => gossip.run(&args.global).await, } } diff --git a/bin/hera/src/network.rs b/bin/hera/src/network.rs index b35528d..4085a2d 100644 --- a/bin/hera/src/network.rs +++ b/bin/hera/src/network.rs @@ -48,6 +48,7 @@ impl NetworkCommand { let recv = driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?; driver.start()?; + tracing::info!("Running network service"); loop { match recv.recv() { Ok(block) => { diff --git a/crates/net/README.md b/crates/net/README.md index 06087da..20b6527 100644 --- a/crates/net/README.md +++ b/crates/net/README.md @@ -4,6 +4,13 @@ Contains a gossipsub driver to run discv5 peer discovery and block gossip. ### Example +> **Warning** +> +> Notice, the socket address uses `0.0.0.0`. +> If you are experiencing issues connecting to peers for discovery, +> check to make sure you are not using the loopback address, +> `127.0.0.1` aka "localhost", which can prevent outward facing connections. + ```rust,no_run use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use alloy_primitives::address; @@ -25,15 +32,9 @@ driver.start().expect("Failed to start network driver"); println!("NetworkDriver started."); ``` -> [!WARNING] -> -> Notice, the socket address uses `0.0.0.0`. -> If you are experiencing issues connecting to peers for discovery, -> check to make sure you are not using the loopback address, -> `127.0.0.1` aka "localhost", which can prevent outward facing connections. - [!WARNING]: ###example ### Acknowledgements Largely based off [magi](https://github.com/a16z/magi)'s [p2p module](https://github.com/a16z/magi/tree/master/src/network). + diff --git a/crates/net/src/builder.rs b/crates/net/src/builder.rs index b8d6eaf..ba7b92e 100644 --- a/crates/net/src/builder.rs +++ b/crates/net/src/builder.rs @@ -34,7 +34,8 @@ pub struct NetworkDriverBuilder { pub discovery_addr: Option, /// The [GossipConfig] constructs the config for `gossipsub`. pub gossip_config: Option, - + /// The interval to discovery random nodes. + pub interval: Option, /// The [Config] constructs the config for `discv5`. pub discovery_config: Option, /// The [Keypair] for the node. @@ -67,6 +68,12 @@ impl NetworkDriverBuilder { self } + /// Specifies the interval to discovery random nodes. + pub fn with_interval(&mut self, interval: Duration) -> &mut Self { + self.interval = Some(interval); + self + } + /// Specifies the socket address that the gossip service is listening on. pub fn with_gossip_addr(&mut self, socket: SocketAddr) -> &mut Self { self.gossip_addr = Some(socket); @@ -278,7 +285,8 @@ impl NetworkDriverBuilder { discovery_builder = discovery_builder.with_discovery_config(discovery_config); } - let discovery = discovery_builder.build()?; + let mut discovery = discovery_builder.build()?; + discovery.interval = self.interval.unwrap_or(Duration::from_secs(10)); Ok(NetworkDriver { discovery, diff --git a/crates/net/src/discovery/driver.rs b/crates/net/src/discovery/driver.rs index 489ca41..1229557 100644 --- a/crates/net/src/discovery/driver.rs +++ b/crates/net/src/discovery/driver.rs @@ -24,6 +24,8 @@ pub struct DiscoveryDriver { pub disc: Discv5, /// The chain ID of the network. pub chain_id: u64, + /// The interval to discovery random nodes. + pub interval: Duration, } impl DiscoveryDriver { @@ -34,7 +36,7 @@ impl DiscoveryDriver { /// Instantiates a new [DiscoveryDriver]. pub fn new(disc: Discv5, chain_id: u64) -> Self { - Self { disc, chain_id } + Self { disc, chain_id, interval: Duration::from_secs(10) } } /// Spawns a new [Discv5] discovery service in a new tokio task. @@ -108,7 +110,7 @@ impl DiscoveryDriver { } } - sleep(Duration::from_secs(10)).await; + sleep(self.interval).await; } }); diff --git a/crates/net/src/driver.rs b/crates/net/src/driver.rs index a10d81e..0f1bc78 100644 --- a/crates/net/src/driver.rs +++ b/crates/net/src/driver.rs @@ -53,9 +53,13 @@ impl NetworkDriver { loop { select! { peer = peer_recv.recv() => { + tracing::info!("Received peer from discovery: {:?}", peer); + tracing::info!("Dialing peer: {:?}", peer); self.gossip.dial_opt(peer).await; + tracing::info!("Connected peers: {:?}", self.gossip.connected_peers()); }, event = self.gossip.select_next_some() => { + tracing::info!("Received event: {:?}", event); self.gossip.handle_event(event); }, } diff --git a/crates/net/src/gossip/driver.rs b/crates/net/src/gossip/driver.rs index bd47a4a..054f4d5 100644 --- a/crates/net/src/gossip/driver.rs +++ b/crates/net/src/gossip/driver.rs @@ -43,13 +43,19 @@ impl GossipDriver { self.swarm.select_next_some().await } + /// Returns the number of connected peers. + pub fn connected_peers(&self) -> usize { + self.swarm.connected_peers().count() + } + /// Dials the given [`Option`]. pub async fn dial_opt(&mut self, peer: Option>) { let Some(addr) = peer else { return; }; - if let Err(e) = self.dial(addr).await { - error!("Failed to dial peer: {:?}", e); + match self.dial(addr).await { + Ok(_) => info!("Dialed peer"), + Err(e) => error!("Failed to dial peer: {:?}", e), } } diff --git a/crates/net/src/gossip/handler.rs b/crates/net/src/gossip/handler.rs index a204e17..681f3d1 100644 --- a/crates/net/src/gossip/handler.rs +++ b/crates/net/src/gossip/handler.rs @@ -49,10 +49,13 @@ impl Handler for BlockHandler { tracing::debug!("received block"); let decoded = if msg.topic == self.blocks_v1_topic.hash() { + tracing::info!("received v1 block"); OpNetworkPayloadEnvelope::decode_v1(&msg.data) } else if msg.topic == self.blocks_v2_topic.hash() { + tracing::info!("received v2 block"); OpNetworkPayloadEnvelope::decode_v2(&msg.data) } else if msg.topic == self.blocks_v3_topic.hash() { + tracing::info!("received v3 block"); OpNetworkPayloadEnvelope::decode_v3(&msg.data) } else { return MessageAcceptance::Reject; @@ -108,18 +111,30 @@ impl BlockHandler { fn block_valid(&self, envelope: &OpNetworkPayloadEnvelope) -> bool { let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + println!("current_timestamp: {}", current_timestamp); + println!("envelope.payload.timestamp(): {}", envelope.payload.timestamp()); let is_future = envelope.payload.timestamp() > current_timestamp + 5; + println!("is_future: {}", is_future); let is_past = envelope.payload.timestamp() < current_timestamp - 60; + println!("is_past: {}", is_past); let time_valid = !(is_future || is_past); + println!("time_valid: {}", time_valid); let msg = envelope.payload_hash.signature_message(self.chain_id); + println!("msg: {:?}", msg); let block_signer = *self.unsafe_signer_recv.borrow(); - let Ok(msg_signer) = envelope.signature.recover_address_from_msg(msg) else { - // TODO: add telemetry here if this happens. + println!("block_signer: {:?}", block_signer); + println!("signature: {:?}", envelope.signature); + let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else { + tracing::warn!("Failed to recover address from message"); return false; }; - time_valid && msg_signer == block_signer + println!("msg_signer: {:?}", msg_signer); + println!("block_signer: {:?}", block_signer); + let signer_valid = msg_signer == block_signer; + println!("signer_valid: {}", signer_valid); + time_valid && signer_valid } } From e970f317692d18e19211ad64ec8ff8ccecb8f2b3 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 23 Oct 2024 20:01:03 -0400 Subject: [PATCH 2/4] remove hera network command --- bin/hera/src/network.rs | 82 ----------------------------------------- 1 file changed, 82 deletions(-) delete mode 100644 bin/hera/src/network.rs diff --git a/bin/hera/src/network.rs b/bin/hera/src/network.rs deleted file mode 100644 index 4085a2d..0000000 --- a/bin/hera/src/network.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! Networking subcommand for Hera. - -use crate::globals::GlobalArgs; -use clap::Args; -use eyre::Result; -use op_net::{discovery::builder::DiscoveryBuilder, driver::NetworkDriver}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use superchain::ROLLUP_CONFIGS; - -/// The Hera network subcommand. -#[derive(Debug, Clone, Args)] -#[non_exhaustive] -pub struct NetworkCommand { - /// Run the peer discovery service. - #[clap(long, short = 'p', help = "Runs only peer discovery")] - pub only_disc: bool, - /// Port to listen for gossip on. - #[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")] - pub gossip_port: u16, -} - -impl NetworkCommand { - /// Run the network subcommand. - pub async fn run(self, args: &GlobalArgs) -> Result<()> { - if self.only_disc { - self.run_discovery(args).await - } else { - self.run_network(args) - } - } - - /// Runs the full network. - pub fn run_network(&self, args: &GlobalArgs) -> Result<()> { - let signer = ROLLUP_CONFIGS - .get(&args.l2_chain_id) - .ok_or(eyre::eyre!("No rollup config found for chain ID"))? - .genesis - .system_config - .as_ref() - .ok_or(eyre::eyre!("No system config found for chain ID"))? - .batcher_address; - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); - let mut driver = NetworkDriver::builder() - .with_chain_id(args.l2_chain_id) - .with_unsafe_block_signer(signer) - .with_gossip_addr(socket) - .build()?; - let recv = - driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?; - driver.start()?; - tracing::info!("Running network service"); - loop { - match recv.recv() { - Ok(block) => { - tracing::info!("Received unsafe block: {:?}", block); - } - Err(e) => { - tracing::warn!("Failed to receive unsafe block: {:?}", e); - } - } - } - } - - /// Runs only the discovery service. - pub async fn run_discovery(&self, args: &GlobalArgs) -> Result<()> { - let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port); - let mut discovery_builder = - DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id); - let discovery = discovery_builder.build()?; - let mut peer_recv = discovery.start()?; - loop { - match peer_recv.recv().await { - Some(peer) => { - tracing::info!("Received peer: {:?}", peer); - } - None => { - tracing::warn!("Failed to receive peer"); - } - } - } - } -} From fd99d0bf435ac6db7c135ec00252186b932fbdfd Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 24 Oct 2024 10:33:43 -0400 Subject: [PATCH 3/4] chore: cleanup logging --- bin/hera/src/gossip.rs | 2 +- bin/hera/src/main.rs | 2 +- crates/net/src/driver.rs | 8 +++----- crates/net/src/gossip/handler.rs | 17 +++-------------- 4 files changed, 8 insertions(+), 21 deletions(-) diff --git a/bin/hera/src/gossip.rs b/bin/hera/src/gossip.rs index ceb1c30..424114e 100644 --- a/bin/hera/src/gossip.rs +++ b/bin/hera/src/gossip.rs @@ -22,7 +22,7 @@ pub struct GossipCommand { impl GossipCommand { /// Run the discovery subcommand. pub async fn run(self, args: &GlobalArgs) -> Result<()> { - let signer = ROLLUP_CONFIGS + let signer = ROLLUP_CONFIGS .get(&args.l2_chain_id) .ok_or(eyre::eyre!("No rollup config found for chain ID"))? .genesis diff --git a/bin/hera/src/main.rs b/bin/hera/src/main.rs index 65dc574..0ae1f51 100644 --- a/bin/hera/src/main.rs +++ b/bin/hera/src/main.rs @@ -8,8 +8,8 @@ use clap::{Parser, Subcommand}; use eyre::Result; -mod globals; mod disc; +mod globals; mod gossip; mod node; diff --git a/crates/net/src/driver.rs b/crates/net/src/driver.rs index 0f1bc78..efd26fb 100644 --- a/crates/net/src/driver.rs +++ b/crates/net/src/driver.rs @@ -53,13 +53,11 @@ impl NetworkDriver { loop { select! { peer = peer_recv.recv() => { - tracing::info!("Received peer from discovery: {:?}", peer); - tracing::info!("Dialing peer: {:?}", peer); - self.gossip.dial_opt(peer).await; - tracing::info!("Connected peers: {:?}", self.gossip.connected_peers()); + self.gossip.dial_opt(peer.clone()).await; + tracing::info!("Received peer: {:?} | Connected peers: {:?}", peer, self.gossip.connected_peers()); }, event = self.gossip.select_next_some() => { - tracing::info!("Received event: {:?}", event); + tracing::debug!("Received event: {:?}", event); self.gossip.handle_event(event); }, } diff --git a/crates/net/src/gossip/handler.rs b/crates/net/src/gossip/handler.rs index 681f3d1..60e80b8 100644 --- a/crates/net/src/gossip/handler.rs +++ b/crates/net/src/gossip/handler.rs @@ -49,13 +49,13 @@ impl Handler for BlockHandler { tracing::debug!("received block"); let decoded = if msg.topic == self.blocks_v1_topic.hash() { - tracing::info!("received v1 block"); + tracing::debug!("received v1 block"); OpNetworkPayloadEnvelope::decode_v1(&msg.data) } else if msg.topic == self.blocks_v2_topic.hash() { - tracing::info!("received v2 block"); + tracing::debug!("received v2 block"); OpNetworkPayloadEnvelope::decode_v2(&msg.data) } else if msg.topic == self.blocks_v3_topic.hash() { - tracing::info!("received v3 block"); + tracing::debug!("received v3 block"); OpNetworkPayloadEnvelope::decode_v3(&msg.data) } else { return MessageAcceptance::Reject; @@ -111,30 +111,19 @@ impl BlockHandler { fn block_valid(&self, envelope: &OpNetworkPayloadEnvelope) -> bool { let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - println!("current_timestamp: {}", current_timestamp); - println!("envelope.payload.timestamp(): {}", envelope.payload.timestamp()); let is_future = envelope.payload.timestamp() > current_timestamp + 5; - println!("is_future: {}", is_future); let is_past = envelope.payload.timestamp() < current_timestamp - 60; - println!("is_past: {}", is_past); let time_valid = !(is_future || is_past); - println!("time_valid: {}", time_valid); let msg = envelope.payload_hash.signature_message(self.chain_id); - println!("msg: {:?}", msg); let block_signer = *self.unsafe_signer_recv.borrow(); - println!("block_signer: {:?}", block_signer); - println!("signature: {:?}", envelope.signature); let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else { tracing::warn!("Failed to recover address from message"); return false; }; - println!("msg_signer: {:?}", msg_signer); - println!("block_signer: {:?}", block_signer); let signer_valid = msg_signer == block_signer; - println!("signer_valid: {}", signer_valid); time_valid && signer_valid } } From 105b36b777b07e014d65155c7f885aa5beec1880 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 24 Oct 2024 10:34:10 -0400 Subject: [PATCH 4/4] Update bin/hera/src/gossip.rs Co-authored-by: nicolas <48695862+merklefruit@users.noreply.github.com> --- bin/hera/src/gossip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/hera/src/gossip.rs b/bin/hera/src/gossip.rs index 424114e..a55fbee 100644 --- a/bin/hera/src/gossip.rs +++ b/bin/hera/src/gossip.rs @@ -20,7 +20,7 @@ pub struct GossipCommand { } impl GossipCommand { - /// Run the discovery subcommand. + /// Run the gossip subcommand. pub async fn run(self, args: &GlobalArgs) -> Result<()> { let signer = ROLLUP_CONFIGS .get(&args.l2_chain_id)