Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor Network Subcommands #108

Merged
merged 4 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions bin/hera/src/disc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Discovery subcommand for Hera.

use crate::globals::GlobalArgs;
use clap::Args;
use eyre::Result;
use op_net::discovery::builder::DiscoveryBuilder;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

/// The Hera discovery subcommand.
#[derive(Debug, Clone, Args)]
#[non_exhaustive]
pub struct DiscCommand {
/// Port to listen for gossip on.
#[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")]
pub gossip_port: u16,
/// Interval to send discovery packets.
#[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")]
pub interval: u64,
}

impl DiscCommand {
/// Run the discovery subcommand.
pub async fn run(self, args: &GlobalArgs) -> Result<()> {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
tracing::info!("Starting discovery service on {:?}", socket);

let mut discovery_builder =
DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id);
let mut discovery = discovery_builder.build()?;
discovery.interval = std::time::Duration::from_secs(self.interval);
let mut peer_recv = discovery.start()?;
tracing::info!("Discovery service started, receiving peers.");

loop {
match peer_recv.recv().await {
Some(peer) => {
tracing::info!("Received peer: {:?}", peer);
}
None => {
tracing::warn!("Failed to receive peer");
}
}
}
}
}
52 changes: 15 additions & 37 deletions bin/hera/src/network.rs → bin/hera/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,27 @@
//! Networking subcommand for Hera.
//! Gossip subcommand for Hera.

use crate::globals::GlobalArgs;
use clap::Args;
use eyre::Result;
use op_net::{discovery::builder::DiscoveryBuilder, driver::NetworkDriver};
use op_net::driver::NetworkDriver;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use superchain::ROLLUP_CONFIGS;

/// The Hera network subcommand.
/// The Hera gossip subcommand.
#[derive(Debug, Clone, Args)]
#[non_exhaustive]
pub struct NetworkCommand {
/// Run the peer discovery service.
#[clap(long, short = 'p', help = "Runs only peer discovery")]
pub only_disc: bool,
pub struct GossipCommand {
/// Port to listen for gossip on.
#[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")]
pub gossip_port: u16,
/// Interval to send discovery packets.
#[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")]
pub interval: u64,
}

impl NetworkCommand {
/// Run the network subcommand.
impl GossipCommand {
/// Run the gossip subcommand.
pub async fn run(self, args: &GlobalArgs) -> Result<()> {
if self.only_disc {
self.run_discovery(args).await
} else {
self.run_network(args)
}
}

/// Runs the full network.
pub fn run_network(&self, args: &GlobalArgs) -> Result<()> {
let signer = ROLLUP_CONFIGS
.get(&args.l2_chain_id)
.ok_or(eyre::eyre!("No rollup config found for chain ID"))?
Expand All @@ -39,15 +30,21 @@ impl NetworkCommand {
.as_ref()
.ok_or(eyre::eyre!("No system config found for chain ID"))?
.batcher_address;
tracing::info!("Gossip configured with signer: {:?}", signer);

let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
tracing::info!("Starting gossip driver on {:?}", socket);

let mut driver = NetworkDriver::builder()
.with_chain_id(args.l2_chain_id)
.with_unsafe_block_signer(signer)
.with_gossip_addr(socket)
.with_interval(std::time::Duration::from_secs(self.interval))
.build()?;
let recv =
driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?;
driver.start()?;
tracing::info!("Gossip driver started, receiving blocks.");
loop {
match recv.recv() {
Ok(block) => {
Expand All @@ -59,23 +56,4 @@ impl NetworkCommand {
}
}
}

/// Runs only the discovery service.
pub async fn run_discovery(&self, args: &GlobalArgs) -> Result<()> {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
let mut discovery_builder =
DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id);
let discovery = discovery_builder.build()?;
let mut peer_recv = discovery.start()?;
loop {
match peer_recv.recv().await {
Some(peer) => {
tracing::info!("Received peer: {:?}", peer);
}
None => {
tracing::warn!("Failed to receive peer");
}
}
}
}
}
12 changes: 8 additions & 4 deletions bin/hera/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
use clap::{Parser, Subcommand};
use eyre::Result;

mod disc;
mod globals;
mod network;
mod gossip;
mod node;

/// The Hera CLI Arguments.
Expand All @@ -30,8 +31,10 @@ pub(crate) struct HeraArgs {
pub(crate) enum HeraSubcommand {
/// Run the standalone Hera node.
Node(node::NodeCommand),
/// Networking utility commands.
Network(network::NetworkCommand),
/// Discovery service command.
Disc(disc::DiscCommand),
/// Gossip service command.
Gossip(gossip::GossipCommand),
}

#[tokio::main]
Expand All @@ -45,6 +48,7 @@ async fn main() -> Result<()> {
// Dispatch on subcommand.
match args.subcommand {
HeraSubcommand::Node(node) => node.run(&args.global).await,
HeraSubcommand::Network(network) => network.run(&args.global).await,
HeraSubcommand::Disc(disc) => disc.run(&args.global).await,
HeraSubcommand::Gossip(gossip) => gossip.run(&args.global).await,
}
}
15 changes: 8 additions & 7 deletions crates/net/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Contains a gossipsub driver to run discv5 peer discovery and block gossip.

### Example

> **Warning**
>
> Notice, the socket address uses `0.0.0.0`.
> If you are experiencing issues connecting to peers for discovery,
> check to make sure you are not using the loopback address,
> `127.0.0.1` aka "localhost", which can prevent outward facing connections.

```rust,no_run
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use alloy_primitives::address;
Expand All @@ -25,15 +32,9 @@ driver.start().expect("Failed to start network driver");
println!("NetworkDriver started.");
```

> [!WARNING]
>
> Notice, the socket address uses `0.0.0.0`.
> If you are experiencing issues connecting to peers for discovery,
> check to make sure you are not using the loopback address,
> `127.0.0.1` aka "localhost", which can prevent outward facing connections.

[!WARNING]: ###example

### Acknowledgements

Largely based off [magi](https://github.com/a16z/magi)'s [p2p module](https://github.com/a16z/magi/tree/master/src/network).

12 changes: 10 additions & 2 deletions crates/net/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub struct NetworkDriverBuilder {
pub discovery_addr: Option<ListenConfig>,
/// The [GossipConfig] constructs the config for `gossipsub`.
pub gossip_config: Option<GossipConfig>,

/// The interval to discovery random nodes.
pub interval: Option<Duration>,
/// The [Config] constructs the config for `discv5`.
pub discovery_config: Option<Config>,
/// The [Keypair] for the node.
Expand Down Expand Up @@ -67,6 +68,12 @@ impl NetworkDriverBuilder {
self
}

/// Specifies the interval to discovery random nodes.
pub fn with_interval(&mut self, interval: Duration) -> &mut Self {
self.interval = Some(interval);
self
}

/// Specifies the socket address that the gossip service is listening on.
pub fn with_gossip_addr(&mut self, socket: SocketAddr) -> &mut Self {
self.gossip_addr = Some(socket);
Expand Down Expand Up @@ -278,7 +285,8 @@ impl NetworkDriverBuilder {
discovery_builder = discovery_builder.with_discovery_config(discovery_config);
}

let discovery = discovery_builder.build()?;
let mut discovery = discovery_builder.build()?;
discovery.interval = self.interval.unwrap_or(Duration::from_secs(10));

Ok(NetworkDriver {
discovery,
Expand Down
6 changes: 4 additions & 2 deletions crates/net/src/discovery/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct DiscoveryDriver {
pub disc: Discv5,
/// The chain ID of the network.
pub chain_id: u64,
/// The interval to discovery random nodes.
pub interval: Duration,
}

impl DiscoveryDriver {
Expand All @@ -34,7 +36,7 @@ impl DiscoveryDriver {

/// Instantiates a new [DiscoveryDriver].
pub fn new(disc: Discv5, chain_id: u64) -> Self {
Self { disc, chain_id }
Self { disc, chain_id, interval: Duration::from_secs(10) }
}

/// Spawns a new [Discv5] discovery service in a new tokio task.
Expand Down Expand Up @@ -108,7 +110,7 @@ impl DiscoveryDriver {
}
}

sleep(Duration::from_secs(10)).await;
sleep(self.interval).await;
}
});

Expand Down
4 changes: 3 additions & 1 deletion crates/net/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ impl NetworkDriver {
loop {
select! {
peer = peer_recv.recv() => {
self.gossip.dial_opt(peer).await;
self.gossip.dial_opt(peer.clone()).await;
tracing::info!("Received peer: {:?} | Connected peers: {:?}", peer, self.gossip.connected_peers());
},
event = self.gossip.select_next_some() => {
tracing::debug!("Received event: {:?}", event);
self.gossip.handle_event(event);
},
}
Expand Down
10 changes: 8 additions & 2 deletions crates/net/src/gossip/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ impl GossipDriver {
self.swarm.select_next_some().await
}

/// Returns the number of connected peers.
pub fn connected_peers(&self) -> usize {
self.swarm.connected_peers().count()
}

/// Dials the given [`Option<Multiaddr>`].
pub async fn dial_opt(&mut self, peer: Option<impl Into<Multiaddr>>) {
let Some(addr) = peer else {
return;
};
if let Err(e) = self.dial(addr).await {
error!("Failed to dial peer: {:?}", e);
match self.dial(addr).await {
Ok(_) => info!("Dialed peer"),
Err(e) => error!("Failed to dial peer: {:?}", e),
}
}

Expand Down
10 changes: 7 additions & 3 deletions crates/net/src/gossip/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ impl Handler for BlockHandler {
tracing::debug!("received block");

let decoded = if msg.topic == self.blocks_v1_topic.hash() {
tracing::debug!("received v1 block");
OpNetworkPayloadEnvelope::decode_v1(&msg.data)
} else if msg.topic == self.blocks_v2_topic.hash() {
tracing::debug!("received v2 block");
OpNetworkPayloadEnvelope::decode_v2(&msg.data)
} else if msg.topic == self.blocks_v3_topic.hash() {
tracing::debug!("received v3 block");
OpNetworkPayloadEnvelope::decode_v3(&msg.data)
} else {
return MessageAcceptance::Reject;
Expand Down Expand Up @@ -115,11 +118,12 @@ impl BlockHandler {

let msg = envelope.payload_hash.signature_message(self.chain_id);
let block_signer = *self.unsafe_signer_recv.borrow();
let Ok(msg_signer) = envelope.signature.recover_address_from_msg(msg) else {
// TODO: add telemetry here if this happens.
let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else {
tracing::warn!("Failed to recover address from message");
return false;
};

time_valid && msg_signer == block_signer
let signer_valid = msg_signer == block_signer;
time_valid && signer_valid
}
}