From b73852a8c5b1de82eecc70dfa44d56f81bf8991d Mon Sep 17 00:00:00 2001 From: Loocapro Date: Tue, 3 Sep 2024 17:53:54 +0200 Subject: [PATCH 1/6] feat(oracle exex): cli_ext, discovery and exex first setup --- Cargo.lock | 19 +++++++ Cargo.toml | 3 +- oracle/Cargo.toml | 30 ++++++++++ oracle/src/cli_ext.rs | 15 +++++ oracle/src/disc/mod.rs | 122 +++++++++++++++++++++++++++++++++++++++++ oracle/src/exex.rs | 69 +++++++++++++++++++++++ oracle/src/main.rs | 30 ++++++++++ 7 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 oracle/Cargo.toml create mode 100644 oracle/src/cli_ext.rs create mode 100644 oracle/src/disc/mod.rs create mode 100644 oracle/src/exex.rs create mode 100644 oracle/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 8cbcad9..4d2b6aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,6 +4233,25 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "oracle" +version = "0.1.0" +dependencies = [ + "clap", + "discv5 0.6.0", + "enr", + "eyre", + "futures", + "reth", + "reth-discv5", + "reth-exex", + "reth-network-peers", + "reth-node-api", + "reth-node-ethereum", + "reth-tracing", + "tokio", +] + [[package]] name = "ordered-float" version = "4.2.2" diff --git a/Cargo.toml b/Cargo.toml index 62e778e..585c4f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "discv5", "in-memory-state", "minimal", - "op-bridge", + "oracle", + "op-bridge", "remote", "rollup", ] diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml new file mode 100644 index 0000000..c73e36e --- /dev/null +++ b/oracle/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "oracle" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] + +# reth +reth-node-ethereum.workspace = true +reth.workspace = true +reth-tracing.workspace = true +reth-network-peers.workspace = true +reth-discv5.workspace = true +reth-exex.workspace = true +reth-node-api.workspace = true + +# networking +discv5 = "0.6" +enr = "0.12" + +# async +futures.workspace = true +tokio.workspace = true + +# misc +clap = "4" +eyre.workspace = true diff --git a/oracle/src/cli_ext.rs b/oracle/src/cli_ext.rs new file mode 100644 index 0000000..86a212c --- /dev/null +++ b/oracle/src/cli_ext.rs @@ -0,0 +1,15 @@ +use clap::Args; + +pub const DEFAULT_DISCOVERY_PORT: u16 = 30304; +pub const DEFAULT_RLPX_PORT: u16 = 30303; + +#[derive(Debug, Clone, Args)] +pub(crate) struct OracleExt { + /// TCP port used by RLPx + #[clap(long = "disc.tcp-port", default_value_t = DEFAULT_RLPX_PORT)] + pub tcp_port: u16, + + /// UDP port used for discovery + #[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)] + pub udp_port: u16, +} diff --git a/oracle/src/disc/mod.rs b/oracle/src/disc/mod.rs new file mode 100644 index 0000000..92a3643 --- /dev/null +++ b/oracle/src/disc/mod.rs @@ -0,0 +1,122 @@ +#![allow(dead_code)] + +use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig}; +use reth::network::config::SecretKey; +use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5}; +use reth_network_peers::NodeRecord; +use reth_tracing::tracing::info; +use std::{ + future::Future, + net::SocketAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::mpsc; + +/// Helper struct to manage a discovery node using discv5. +pub(crate) struct Discovery { + /// The inner discv5 instance. + inner: Discv5, + /// The node record of the discv5 instance. + node_record: NodeRecord, + /// The events stream of the discv5 instance. + events: mpsc::Receiver, +} + +impl Discovery { + /// Starts a new discv5 node. + pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result { + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?; + let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?; + + let discv5_listen_config = ListenConfig::from(discv5_addr); + let discv5_config = Config::builder(rlpx_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?; + Ok(Self { inner: discv5, events, node_record }) + } + + /// Adds a node to the table if its not already present. + pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> { + let reth_enr: enr::Enr = EnrCombinedKeyWrapper(enr.clone()).into(); + self.inner.add_node(reth_enr)?; + Ok(()) + } + + /// Returns the local ENR of the discv5 node. + pub fn local_enr(&self) -> Enr { + self.inner.with_discv5(|discv5| discv5.local_enr()) + } +} + +impl Future for Discovery { + type Output = eyre::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut(); + loop { + match ready!(this.events.poll_recv(cx)) { + Some(evt) => { + if let Event::SessionEstablished(enr, socket_addr) = evt { + info!(?enr, ?socket_addr, "Session established with a new peer."); + } + } + None => return Poll::Ready(Ok(())), + } + } + } +} + +#[cfg(test)] +mod tests { + use reth_tracing::tracing::info; + + use crate::disc::Discovery; + + #[tokio::test] + async fn can_establish_discv5_session_with_peer() { + reth_tracing::init_test_tracing(); + let mut node_1 = Discovery::new(30301, 30303).await.unwrap(); + let node_1_enr = node_1.local_enr(); + + let mut node_2 = Discovery::new(30302, 30303).await.unwrap(); + + let node_2_enr = node_2.local_enr(); + + info!(?node_1_enr, ?node_2_enr, "Started discovery nodes."); + + // add node_2 to node_1 table + node_1.add_node(node_2_enr.clone()).unwrap(); + + // verify node_2 is in node_1 table + assert!(node_1 + .inner + .with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))); + + // send ping from node_1 to node_2 + node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap(); + + // verify they both established a session + let event_2_v5 = node_2.events.recv().await.unwrap(); + let event_1_v5 = node_1.events.recv().await.unwrap(); + assert!(matches!( + event_1_v5, + discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into() + )); + assert!(matches!( + event_2_v5, + discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into() + )); + + // verify node_1 is in + let event_2_v5 = node_2.events.recv().await.unwrap(); + assert!(matches!( + event_2_v5, + discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none() + )); + } +} diff --git a/oracle/src/exex.rs b/oracle/src/exex.rs new file mode 100644 index 0000000..f94e173 --- /dev/null +++ b/oracle/src/exex.rs @@ -0,0 +1,69 @@ +use eyre::Result; +use futures::{Future, FutureExt}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::FullNodeComponents; +use reth_tracing::tracing::{error, info}; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use crate::disc::Discovery; + +/// The ExEx struct, representing the initialization and execution of the ExEx. +pub struct ExEx { + exex: ExExContext, + disc: Discovery, +} + +impl ExEx { + pub fn new(exex: ExExContext, disc: Discovery) -> Self { + Self { exex, disc } + } +} + +impl Future for ExEx { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Poll the Discv5 future until its drained + loop { + match self.disc.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + info!("Discv5 task completed successfully"); + } + Poll::Ready(Err(e)) => { + error!(?e, "Discv5 task encountered an error"); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + // Exit match and continue to poll notifications + break; + } + } + } + + // Continuously poll the ExExContext notifications + loop { + if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) { + match ¬ification { + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); + } + ExExNotification::ChainReorged { old, new } => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + info!(reverted_chain = ?old.range(), "Received revert"); + } + } + + if let Some(committed_chain) = notification.committed_chain() { + self.exex + .events + .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; + } + } + } + } +} diff --git a/oracle/src/main.rs b/oracle/src/main.rs new file mode 100644 index 0000000..44fd8c1 --- /dev/null +++ b/oracle/src/main.rs @@ -0,0 +1,30 @@ +use clap::Parser; +use cli_ext::OracleExt; +use disc::Discovery; +use exex::ExEx; +use reth_node_ethereum::EthereumNode; + +mod cli_ext; +mod disc; +mod exex; + +fn main() -> eyre::Result<()> { + reth::cli::Cli::::parse().run(|builder, args| async move { + let tcp_port = args.tcp_port; + let udp_port = args.udp_port; + + let handle = builder + .node(EthereumNode::default()) + .install_exex("exex-oracle", move |ctx| async move { + // start Discv5 task + let disc = Discovery::new(tcp_port, udp_port).await?; + + // start exex task with discv5 + Ok(ExEx::new(ctx, disc)) + }) + .launch() + .await?; + + handle.wait_for_node_exit().await + }) +} From 17cbd82ee354b752195517eff239032faf95ca0e Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 4 Sep 2024 13:13:14 +0200 Subject: [PATCH 2/6] feat(oracle): general restructure on state machines --- oracle/src/exex.rs | 28 ++------- oracle/src/main.rs | 19 +++--- .../src/{disc/mod.rs => network/discovery.rs} | 41 ++++++++----- oracle/src/network/mod.rs | 44 ++++++++++++++ oracle/src/oracle.rs | 58 +++++++++++++++++++ 5 files changed, 142 insertions(+), 48 deletions(-) rename oracle/src/{disc/mod.rs => network/discovery.rs} (74%) create mode 100644 oracle/src/network/mod.rs create mode 100644 oracle/src/oracle.rs diff --git a/oracle/src/exex.rs b/oracle/src/exex.rs index f94e173..08a15c0 100644 --- a/oracle/src/exex.rs +++ b/oracle/src/exex.rs @@ -1,24 +1,21 @@ use eyre::Result; -use futures::{Future, FutureExt}; +use futures::Future; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::FullNodeComponents; -use reth_tracing::tracing::{error, info}; +use reth_tracing::tracing::info; use std::{ pin::Pin, task::{ready, Context, Poll}, }; -use crate::disc::Discovery; - /// The ExEx struct, representing the initialization and execution of the ExEx. pub struct ExEx { exex: ExExContext, - disc: Discovery, } impl ExEx { - pub fn new(exex: ExExContext, disc: Discovery) -> Self { - Self { exex, disc } + pub fn new(exex: ExExContext) -> Self { + Self { exex } } } @@ -26,23 +23,6 @@ impl Future for ExEx { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Poll the Discv5 future until its drained - loop { - match self.disc.poll_unpin(cx) { - Poll::Ready(Ok(())) => { - info!("Discv5 task completed successfully"); - } - Poll::Ready(Err(e)) => { - error!(?e, "Discv5 task encountered an error"); - return Poll::Ready(Err(e)); - } - Poll::Pending => { - // Exit match and continue to poll notifications - break; - } - } - } - // Continuously poll the ExExContext notifications loop { if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) { diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 44fd8c1..bb9a747 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -1,12 +1,16 @@ use clap::Parser; use cli_ext::OracleExt; -use disc::Discovery; use exex::ExEx; +use network::Network; +use oracle::Oracle; use reth_node_ethereum::EthereumNode; mod cli_ext; -mod disc; mod exex; +mod network; +mod oracle; + +const ORACLE_EXEX_ID: &str = "exex-oracle"; fn main() -> eyre::Result<()> { reth::cli::Cli::::parse().run(|builder, args| async move { @@ -15,12 +19,11 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) - .install_exex("exex-oracle", move |ctx| async move { - // start Discv5 task - let disc = Discovery::new(tcp_port, udp_port).await?; - - // start exex task with discv5 - Ok(ExEx::new(ctx, disc)) + .install_exex(ORACLE_EXEX_ID, move |ctx| async move { + let exex = ExEx::new(ctx); + let network = Network::new(tcp_port, udp_port).await?; + let oracle = Oracle::new(exex, network).await?; + Ok(oracle) }) .launch() .await?; diff --git a/oracle/src/disc/mod.rs b/oracle/src/network/discovery.rs similarity index 74% rename from oracle/src/disc/mod.rs rename to oracle/src/network/discovery.rs index 92a3643..9d5fc8d 100644 --- a/oracle/src/disc/mod.rs +++ b/oracle/src/network/discovery.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig}; use reth::network::config::SecretKey; use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5}; @@ -7,7 +5,7 @@ use reth_network_peers::NodeRecord; use reth_tracing::tracing::info; use std::{ future::Future, - net::SocketAddr, + net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, task::{ready, Context, Poll}, }; @@ -24,16 +22,16 @@ pub(crate) struct Discovery { } impl Discovery { - /// Starts a new discv5 node. - pub async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result { + /// Starts a new discovery node. + pub(crate) async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port}").parse()?; - let rlpx_addr: SocketAddr = format!("127.0.0.1:{tcp_port}").parse()?; + let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); + let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); - let discv5_listen_config = ListenConfig::from(discv5_addr); + let config = ListenConfig::from(disc_addr); let discv5_config = Config::builder(rlpx_addr) - .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .discv5_config(discv5::ConfigBuilder::new(config).build()) .build(); let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?; @@ -41,16 +39,21 @@ impl Discovery { } /// Adds a node to the table if its not already present. - pub fn add_node(&mut self, enr: Enr) -> eyre::Result<()> { + pub(crate) fn add_node(&mut self, enr: Enr) -> eyre::Result<()> { let reth_enr: enr::Enr = EnrCombinedKeyWrapper(enr.clone()).into(); self.inner.add_node(reth_enr)?; Ok(()) } /// Returns the local ENR of the discv5 node. - pub fn local_enr(&self) -> Enr { + pub(crate) fn local_enr(&self) -> Enr { self.inner.with_discv5(|discv5| discv5.local_enr()) } + + /// Returns true if the discv5 node has connected peers. + pub(crate) fn has_peers(&self) -> bool { + self.inner.with_discv5(|discv5| discv5.connected_peers() > 0) + } } impl Future for Discovery { @@ -60,11 +63,18 @@ impl Future for Discovery { let mut this = self.as_mut(); loop { match ready!(this.events.poll_recv(cx)) { - Some(evt) => { - if let Event::SessionEstablished(enr, socket_addr) = evt { + Some(evt) => match evt { + Event::Discovered(enr) => { + info!(?enr, "Discovered a new peer."); + this.add_node(enr)?; + } + Event::SessionEstablished(enr, socket_addr) => { info!(?enr, ?socket_addr, "Session established with a new peer."); } - } + evt => { + info!(?evt, "New discovery event."); + } + }, None => return Poll::Ready(Ok(())), } } @@ -73,10 +83,9 @@ impl Future for Discovery { #[cfg(test)] mod tests { + use crate::network::discovery::Discovery; use reth_tracing::tracing::info; - use crate::disc::Discovery; - #[tokio::test] async fn can_establish_discv5_session_with_peer() { reth_tracing::init_test_tracing(); diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs new file mode 100644 index 0000000..89e2777 --- /dev/null +++ b/oracle/src/network/mod.rs @@ -0,0 +1,44 @@ +use futures::FutureExt; +use reth_tracing::tracing::{error, info}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +mod discovery; + +/// The Network struct is a long running task that orchestrates discovery of new peers and network +/// gossiping via the RLPx subprotocol. +pub(crate) struct Network { + /// The discovery task for this node. + discovery: discovery::Discovery, +} + +impl Network { + pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { + let discovery = discovery::Discovery::new(tcp_port, udp_port).await?; + Ok(Self { discovery }) + } +} + +impl Future for Network { + type Output = eyre::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut(); + // Poll the discovery future until its drained + loop { + match this.discovery.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + info!("Discv5 task completed successfully"); + } + Poll::Ready(Err(e)) => { + error!(?e, "Discv5 task encountered an error"); + return Poll::Ready(Err(e)); + } + Poll::Pending => {} + } + } + } +} diff --git a/oracle/src/oracle.rs b/oracle/src/oracle.rs new file mode 100644 index 0000000..9903fad --- /dev/null +++ b/oracle/src/oracle.rs @@ -0,0 +1,58 @@ +use futures::FutureExt; +use reth_node_api::FullNodeComponents; +use reth_tracing::tracing::{error, info}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::{exex::ExEx, network::Network}; + +/// The Oracle struct is a long running task that orchestrates discovery of new peers, +/// decoding data from chain events (ExEx) and gossiping it to peers. +pub(crate) struct Oracle { + /// The network task for this node. + /// It is composed by a discovery task and a sub protocol RLPx task. + network: Network, + /// The execution extension task for this node. + exex: ExEx, +} + +impl Oracle { + pub(crate) async fn new(exex: ExEx, network: Network) -> eyre::Result { + Ok(Self { exex, network }) + } +} + +impl Future for Oracle { + type Output = eyre::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.as_mut(); + // Poll the network future until its drained + loop { + match this.network.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + info!("Discv5 task completed successfully"); + } + Poll::Ready(Err(e)) => { + error!(?e, "Discv5 task encountered an error"); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + // Exit match and continue to poll exex + break; + } + } + } + + // Poll the exex future until its drained + loop { + match this.exex.poll_unpin(cx)? { + Poll::Ready(t) => t, + Poll::Pending => return Poll::Pending, + }; + } + } +} From 7b6a1119c64bbd1ae17e1a28751d2593931fe6a9 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Wed, 4 Sep 2024 18:10:33 +0200 Subject: [PATCH 3/6] feat: added rlpx subprotocol --- Cargo.lock | 161 ++++++++++++++++++++++--- Cargo.toml | 24 ++-- oracle/Cargo.toml | 14 ++- oracle/src/main.rs | 6 +- oracle/src/network/mod.rs | 10 +- oracle/src/network/proto/connection.rs | 121 +++++++++++++++++++ oracle/src/network/proto/mod.rs | 156 ++++++++++++++++++++++++ 7 files changed, 459 insertions(+), 33 deletions(-) create mode 100644 oracle/src/network/proto/connection.rs create mode 100644 oracle/src/network/proto/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 4d2b6aa..8c9b87e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,15 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + [[package]] name = "aead" version = "0.5.2" @@ -33,6 +42,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if", + "cipher 0.3.0", + "cpufeatures", + "ctr 0.8.0", + "opaque-debug", +] + [[package]] name = "aes" version = "0.8.4" @@ -40,21 +62,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", - "cipher", + "cipher 0.4.4", "cpufeatures", ] +[[package]] +name = "aes-gcm" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc3be92e19a7ef47457b8e6f90707e12b6ac5d20c6f3866584fa3be0787d839f" +dependencies = [ + "aead 0.4.3", + "aes 0.7.5", + "cipher 0.3.0", + "ctr 0.7.0", + "ghash 0.4.4", + "subtle", +] + [[package]] name = "aes-gcm" version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" dependencies = [ - "aead", - "aes", - "cipher", - "ctr", - "ghash", + "aead 0.5.2", + "aes 0.8.4", + "cipher 0.4.4", + "ctr 0.9.2", + "ghash 0.5.1", "subtle", ] @@ -1483,6 +1519,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1811,13 +1856,31 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a232f92a03f37dd7d7dd2adc67166c77e9cd88de5b019b9a9eecfaeaf7bfd481" +dependencies = [ + "cipher 0.3.0", +] + +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher 0.3.0", +] + [[package]] name = "ctr" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher", + "cipher 0.4.4", ] [[package]] @@ -2095,17 +2158,47 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "discv5" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cafb8ed8d460b7d1c8d4c970270d45ecb5e283179a3945143196624c55cda6ac" +dependencies = [ + "aes 0.7.5", + "aes-gcm 0.9.2", + "alloy-rlp", + "arrayvec", + "delay_map", + "enr", + "fnv", + "futures", + "hashlink 0.8.4", + "hex", + "hkdf", + "lazy_static", + "lru", + "more-asserts", + "parking_lot 0.11.2", + "rand", + "smallvec", + "socket2 0.4.10", + "tokio", + "tracing", + "uint", + "zeroize", +] + [[package]] name = "discv5" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f569b8c367554666c8652305621e8bae3634a2ff5c6378081d5bd8c399c99f23" dependencies = [ - "aes", - "aes-gcm", + "aes 0.8.4", + "aes-gcm 0.10.3", "alloy-rlp", "arrayvec", - "ctr", + "ctr 0.9.2", "delay_map", "enr", "fnv", @@ -2561,6 +2654,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval 0.5.3", +] + [[package]] name = "ghash" version = "0.5.1" @@ -2568,7 +2671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" dependencies = [ "opaque-debug", - "polyval", + "polyval 0.6.2", ] [[package]] @@ -4244,12 +4347,18 @@ dependencies = [ "futures", "reth", "reth-discv5", + "reth-eth-wire", "reth-exex", + "reth-network", + "reth-network-api", "reth-network-peers", "reth-node-api", "reth-node-ethereum", + "reth-primitives", + "reth-rpc-types", "reth-tracing", "tokio", + "tokio-stream", ] [[package]] @@ -4462,6 +4571,18 @@ dependencies = [ "crunchy", ] +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash 0.4.0", +] + [[package]] name = "polyval" version = "0.6.2" @@ -4471,7 +4592,7 @@ dependencies = [ "cfg-if", "cpufeatures", "opaque-debug", - "universal-hash", + "universal-hash 0.5.1", ] [[package]] @@ -5665,14 +5786,14 @@ name = "reth-ecies" version = "1.0.6" source = "git+https://github.com/paradigmxyz/reth#a89de219c97e470f77f9833dc170ed1344f84a21" dependencies = [ - "aes", + "aes 0.8.4", "alloy-primitives", "alloy-rlp", "block-padding", "byteorder", - "cipher", + "cipher 0.4.4", "concat-kdf", - "ctr", + "ctr 0.9.2", "digest 0.10.7", "futures", "generic-array", @@ -8975,6 +9096,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229730647fbc343e3a80e463c1db7f78f3855d3f3739bee0dda773c9a037c90a" +[[package]] +name = "universal-hash" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "universal-hash" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 585c4f5..0d65d3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,13 @@ [workspace] members = [ - "backfill", - "discv5", - "in-memory-state", - "minimal", - "oracle", - "op-bridge", - "remote", - "rollup", + "backfill", + "discv5", + "in-memory-state", + "minimal", + "oracle", + "op-bridge", + "remote", + "rollup", ] resolver = "2" @@ -26,15 +26,21 @@ reth-discv5 = { git = "https://github.com/paradigmxyz/reth" } reth-execution-errors = { git = "https://github.com/paradigmxyz/reth" } reth-execution-types = { git = "https://github.com/paradigmxyz/reth" } reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] } +reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" } +reth-evm = { git = "https://github.com/paradigmxyz/reth" } +reth-network = { git = "https://github.com/paradigmxyz/reth" } +reth-network-api = { git = "https://github.com/paradigmxyz/reth" } reth-network-peers = { git = "https://github.com/paradigmxyz/reth" } reth-node-api = { git = "https://github.com/paradigmxyz/reth" } reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth" } reth-primitives = { git = "https://github.com/paradigmxyz/reth" } reth-provider = { git = "https://github.com/paradigmxyz/reth" } reth-revm = { git = "https://github.com/paradigmxyz/reth" } -reth-evm = { git = "https://github.com/paradigmxyz/reth" } +reth-rpc-types = { git = "https://github.com/paradigmxyz/reth" } reth-tracing = { git = "https://github.com/paradigmxyz/reth" } + + # alloy alloy-sol-types = { version = "0.8", features = ["json"] } diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index c73e36e..98b9dd0 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -9,13 +9,18 @@ publish.workspace = true [dependencies] # reth -reth-node-ethereum.workspace = true -reth.workspace = true -reth-tracing.workspace = true -reth-network-peers.workspace = true reth-discv5.workspace = true +reth-eth-wire.workspace = true reth-exex.workspace = true +reth-network-api.workspace = true +reth-network-peers.workspace = true +reth-network.workspace = true reth-node-api.workspace = true +reth-node-ethereum.workspace = true +reth-primitives.workspace = true +reth-rpc-types.workspace = true +reth-tracing.workspace = true +reth.workspace = true # networking discv5 = "0.6" @@ -24,6 +29,7 @@ enr = "0.12" # async futures.workspace = true tokio.workspace = true +tokio-stream.workspace = true # misc clap = "4" diff --git a/oracle/src/main.rs b/oracle/src/main.rs index bb9a747..32878a5 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -1,8 +1,9 @@ use clap::Parser; use cli_ext::OracleExt; use exex::ExEx; -use network::Network; +use network::{proto::OracleProtoHandler, Network}; use oracle::Oracle; +use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; mod cli_ext; @@ -20,6 +21,9 @@ fn main() -> eyre::Result<()> { let handle = builder .node(EthereumNode::default()) .install_exex(ORACLE_EXEX_ID, move |ctx| async move { + let subproto = OracleProtoHandler::new(); + ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol()); + let exex = ExEx::new(ctx); let network = Network::new(tcp_port, udp_port).await?; let oracle = Oracle::new(exex, network).await?; diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 89e2777..727c49b 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -1,3 +1,4 @@ +use discovery::Discovery; use futures::FutureExt; use reth_tracing::tracing::{error, info}; use std::{ @@ -7,17 +8,18 @@ use std::{ }; mod discovery; +pub(crate) mod proto; /// The Network struct is a long running task that orchestrates discovery of new peers and network /// gossiping via the RLPx subprotocol. pub(crate) struct Network { /// The discovery task for this node. - discovery: discovery::Discovery, + discovery: Discovery, } impl Network { pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { - let discovery = discovery::Discovery::new(tcp_port, udp_port).await?; + let discovery = Discovery::new(tcp_port, udp_port).await?; Ok(Self { discovery }) } } @@ -31,10 +33,10 @@ impl Future for Network { loop { match this.discovery.poll_unpin(cx) { Poll::Ready(Ok(())) => { - info!("Discv5 task completed successfully"); + info!("Discovery task completed"); } Poll::Ready(Err(e)) => { - error!(?e, "Discv5 task encountered an error"); + error!(?e, "Discovery task encountered an error"); return Poll::Ready(Err(e)); } Poll::Pending => {} diff --git a/oracle/src/network/proto/connection.rs b/oracle/src/network/proto/connection.rs new file mode 100644 index 0000000..46a3b2f --- /dev/null +++ b/oracle/src/network/proto/connection.rs @@ -0,0 +1,121 @@ +use super::{OracleProtoMessage, OracleProtoMessageKind, ProtocolEvent, ProtocolState}; +use futures::{Stream, StreamExt}; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; +use reth_network::protocol::{ConnectionHandler, OnNotSupported}; +use reth_network_api::Direction; +use reth_primitives::BytesMut; +use reth_rpc_types::PeerId; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// The commands supported by the OracleConnection. +pub(crate) enum OracleCommand { + /// Sends a message to the peer + Message { + msg: String, + /// The response will be sent to this channel. + response: oneshot::Sender, + }, +} + +/// This struct defines the connection object for the Oracle subprotocol. +pub(crate) struct OracleConnection { + conn: ProtocolConnection, + commands: UnboundedReceiverStream, + pending_pong: Option>, + initial_ping: Option, +} + +impl Stream for OracleConnection { + type Item = BytesMut; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + if let Some(initial_ping) = this.initial_ping.take() { + return Poll::Ready(Some(initial_ping.encoded())); + } + + loop { + if let Poll::Ready(Some(cmd)) = this.commands.poll_next_unpin(cx) { + return match cmd { + OracleCommand::Message { msg, response } => { + this.pending_pong = Some(response); + Poll::Ready(Some(OracleProtoMessage::ping_message(msg).encoded())) + } + }; + } + + let Some(msg) = ready!(this.conn.poll_next_unpin(cx)) else { return Poll::Ready(None) }; + + let Some(msg) = OracleProtoMessage::decode_message(&mut &msg[..]) else { + return Poll::Ready(None); + }; + + match msg.message { + OracleProtoMessageKind::Ping => { + return Poll::Ready(Some(OracleProtoMessage::pong().encoded())) + } + OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::PingMessage(msg) => { + return Poll::Ready(Some(OracleProtoMessage::pong_message(msg).encoded())) + } + OracleProtoMessageKind::PongMessage(msg) => { + if let Some(sender) = this.pending_pong.take() { + sender.send(msg).ok(); + } + continue; + } + } + + return Poll::Pending; + } + } +} + +/// The connection handler for the RLPx subprotocol. +pub(crate) struct OracleConnHandler { + pub(crate) state: ProtocolState, +} + +impl ConnectionHandler for OracleConnHandler { + type Connection = OracleConnection; + + fn protocol(&self) -> Protocol { + OracleProtoMessage::protocol() + } + + fn on_unsupported_by_peer( + self, + _supported: &SharedCapabilities, + _direction: Direction, + _peer_id: PeerId, + ) -> OnNotSupported { + OnNotSupported::KeepAlive + } + + fn into_connection( + self, + direction: Direction, + peer_id: PeerId, + conn: ProtocolConnection, + ) -> Self::Connection { + let (tx, rx) = mpsc::unbounded_channel(); + self.state + .events + .send(ProtocolEvent::Established { direction, peer_id, to_connection: tx }) + .ok(); + OracleConnection { + conn, + initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping), + commands: UnboundedReceiverStream::new(rx), + pending_pong: None, + } + } +} diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs new file mode 100644 index 0000000..af3ada7 --- /dev/null +++ b/oracle/src/network/proto/mod.rs @@ -0,0 +1,156 @@ +use connection::{OracleCommand, OracleConnHandler}; +use reth_eth_wire::{protocol::Protocol, Capability}; +use reth_network::{protocol::ProtocolHandler, Direction}; +use reth_network_api::PeerId; +use reth_primitives::{Buf, BufMut, BytesMut}; +use std::net::SocketAddr; +use tokio::sync::mpsc; + +pub(crate) mod connection; + +#[repr(u8)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum OracleProtoMessageId { + Ping = 0x00, + Pong = 0x01, + PingMessage = 0x02, + PongMessage = 0x03, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum OracleProtoMessageKind { + Ping, + Pong, + PingMessage(String), + PongMessage(String), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct OracleProtoMessage { + pub(crate) message_type: OracleProtoMessageId, + pub(crate) message: OracleProtoMessageKind, +} + +impl OracleProtoMessage { + /// Returns the capability for the `custom_rlpx` protocol. + pub(crate) fn capability() -> Capability { + Capability::new_static("custom_rlpx", 1) + } + + /// Returns the protocol for the `custom_rlpx` protocol. + pub(crate) fn protocol() -> Protocol { + Protocol::new(Self::capability(), 4) + } + + /// Creates a ping message + pub(crate) fn ping_message(msg: impl Into) -> Self { + Self { + message_type: OracleProtoMessageId::PingMessage, + message: OracleProtoMessageKind::PingMessage(msg.into()), + } + } + /// Creates a ping message + pub(crate) fn pong_message(msg: impl Into) -> Self { + Self { + message_type: OracleProtoMessageId::PongMessage, + message: OracleProtoMessageKind::PongMessage(msg.into()), + } + } + + /// Creates a ping message + pub(crate) fn ping() -> Self { + Self { message_type: OracleProtoMessageId::Ping, message: OracleProtoMessageKind::Ping } + } + + /// Creates a pong message + pub(crate) fn pong() -> Self { + Self { message_type: OracleProtoMessageId::Pong, message: OracleProtoMessageKind::Pong } + } + + /// Creates a new `OracleProtoMessage` with the given message ID and payload. + pub(crate) fn encoded(&self) -> BytesMut { + let mut buf = BytesMut::new(); + buf.put_u8(self.message_type as u8); + match &self.message { + OracleProtoMessageKind::Ping | OracleProtoMessageKind::Pong => {} + OracleProtoMessageKind::PingMessage(msg) | OracleProtoMessageKind::PongMessage(msg) => { + buf.put(msg.as_bytes()); + } + } + buf + } + + /// Decodes a `OracleProtoMessage` from the given message buffer. + pub(crate) fn decode_message(buf: &mut &[u8]) -> Option { + if buf.is_empty() { + return None; + } + let id = buf[0]; + buf.advance(1); + let message_type = match id { + 0x00 => OracleProtoMessageId::Ping, + 0x01 => OracleProtoMessageId::Pong, + 0x02 => OracleProtoMessageId::PingMessage, + 0x03 => OracleProtoMessageId::PongMessage, + _ => return None, + }; + let message = match message_type { + OracleProtoMessageId::Ping => OracleProtoMessageKind::Ping, + OracleProtoMessageId::Pong => OracleProtoMessageKind::Pong, + OracleProtoMessageId::PingMessage => { + OracleProtoMessageKind::PingMessage(String::from_utf8_lossy(&buf[..]).into_owned()) + } + OracleProtoMessageId::PongMessage => { + OracleProtoMessageKind::PongMessage(String::from_utf8_lossy(&buf[..]).into_owned()) + } + }; + + Some(Self { message_type, message }) + } +} + +/// This struct is responsible of incoming and outgoing connections. +#[derive(Debug)] +pub(crate) struct OracleProtoHandler { + state: ProtocolState, +} + +impl OracleProtoHandler { + /// Creates a new `OracleProtoHandler` with the given protocol state. + pub(crate) fn new() -> Self { + let (tx, _) = mpsc::unbounded_channel(); + Self { state: ProtocolState { events: tx } } + } +} + +impl ProtocolHandler for OracleProtoHandler { + type ConnectionHandler = OracleConnHandler; + + fn on_incoming(&self, _socket_addr: SocketAddr) -> Option { + Some(OracleConnHandler { state: self.state.clone() }) + } + + fn on_outgoing( + &self, + _socket_addr: SocketAddr, + _peer_id: PeerId, + ) -> Option { + Some(OracleConnHandler { state: self.state.clone() }) + } +} + +/// Protocol state is an helper struct to store the protocol events. +#[derive(Clone, Debug)] +pub(crate) struct ProtocolState { + pub(crate) events: mpsc::UnboundedSender, +} + +/// The events that can be emitted by our custom protocol. +#[derive(Debug)] +pub(crate) enum ProtocolEvent { + Established { + direction: Direction, + peer_id: PeerId, + to_connection: mpsc::UnboundedSender, + }, +} From cb89730dd5434476b448c2a2d85d3c8f65a4a006 Mon Sep 17 00:00:00 2001 From: Loocapro Date: Sun, 8 Sep 2024 16:49:41 +0200 Subject: [PATCH 4/6] updated to latest reth --- Cargo.lock | 157 ++++---------------------------- oracle/Cargo.toml | 2 +- oracle/src/exex.rs | 50 +++++----- oracle/src/main.rs | 3 +- oracle/src/network/discovery.rs | 2 + oracle/src/network/proto/mod.rs | 2 + 6 files changed, 51 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8c9b87e..eb92281 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,15 +23,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" -[[package]] -name = "aead" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" -dependencies = [ - "generic-array", -] - [[package]] name = "aead" version = "0.5.2" @@ -42,19 +33,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "aes" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" -dependencies = [ - "cfg-if", - "cipher 0.3.0", - "cpufeatures", - "ctr 0.8.0", - "opaque-debug", -] - [[package]] name = "aes" version = "0.8.4" @@ -62,35 +40,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", - "cipher 0.4.4", + "cipher", "cpufeatures", ] -[[package]] -name = "aes-gcm" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc3be92e19a7ef47457b8e6f90707e12b6ac5d20c6f3866584fa3be0787d839f" -dependencies = [ - "aead 0.4.3", - "aes 0.7.5", - "cipher 0.3.0", - "ctr 0.7.0", - "ghash 0.4.4", - "subtle", -] - [[package]] name = "aes-gcm" version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" dependencies = [ - "aead 0.5.2", - "aes 0.8.4", - "cipher 0.4.4", - "ctr 0.9.2", - "ghash 0.5.1", + "aead", + "aes", + "cipher", + "ctr", + "ghash", "subtle", ] @@ -1519,15 +1483,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "cipher" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" -dependencies = [ - "generic-array", -] - [[package]] name = "cipher" version = "0.4.4" @@ -1856,31 +1811,13 @@ dependencies = [ "subtle", ] -[[package]] -name = "ctr" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a232f92a03f37dd7d7dd2adc67166c77e9cd88de5b019b9a9eecfaeaf7bfd481" -dependencies = [ - "cipher 0.3.0", -] - -[[package]] -name = "ctr" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" -dependencies = [ - "cipher 0.3.0", -] - [[package]] name = "ctr" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" dependencies = [ - "cipher 0.4.4", + "cipher", ] [[package]] @@ -2158,47 +2095,17 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "discv5" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cafb8ed8d460b7d1c8d4c970270d45ecb5e283179a3945143196624c55cda6ac" -dependencies = [ - "aes 0.7.5", - "aes-gcm 0.9.2", - "alloy-rlp", - "arrayvec", - "delay_map", - "enr", - "fnv", - "futures", - "hashlink 0.8.4", - "hex", - "hkdf", - "lazy_static", - "lru", - "more-asserts", - "parking_lot 0.11.2", - "rand", - "smallvec", - "socket2 0.4.10", - "tokio", - "tracing", - "uint", - "zeroize", -] - [[package]] name = "discv5" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f569b8c367554666c8652305621e8bae3634a2ff5c6378081d5bd8c399c99f23" dependencies = [ - "aes 0.8.4", - "aes-gcm 0.10.3", + "aes", + "aes-gcm", "alloy-rlp", "arrayvec", - "ctr 0.9.2", + "ctr", "delay_map", "enr", "fnv", @@ -2654,16 +2561,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "ghash" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" -dependencies = [ - "opaque-debug", - "polyval 0.5.3", -] - [[package]] name = "ghash" version = "0.5.1" @@ -2671,7 +2568,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" dependencies = [ "opaque-debug", - "polyval 0.6.2", + "polyval", ] [[package]] @@ -4341,7 +4238,7 @@ name = "oracle" version = "0.1.0" dependencies = [ "clap", - "discv5 0.6.0", + "discv5 0.7.0", "enr", "eyre", "futures", @@ -4571,18 +4468,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "polyval" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" -dependencies = [ - "cfg-if", - "cpufeatures", - "opaque-debug", - "universal-hash 0.4.0", -] - [[package]] name = "polyval" version = "0.6.2" @@ -4592,7 +4477,7 @@ dependencies = [ "cfg-if", "cpufeatures", "opaque-debug", - "universal-hash 0.5.1", + "universal-hash", ] [[package]] @@ -5786,14 +5671,14 @@ name = "reth-ecies" version = "1.0.6" source = "git+https://github.com/paradigmxyz/reth#a89de219c97e470f77f9833dc170ed1344f84a21" dependencies = [ - "aes 0.8.4", + "aes", "alloy-primitives", "alloy-rlp", "block-padding", "byteorder", - "cipher 0.4.4", + "cipher", "concat-kdf", - "ctr 0.9.2", + "ctr", "digest 0.10.7", "futures", "generic-array", @@ -9096,16 +8981,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "229730647fbc343e3a80e463c1db7f78f3855d3f3739bee0dda773c9a037c90a" -[[package]] -name = "universal-hash" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" -dependencies = [ - "generic-array", - "subtle", -] - [[package]] name = "universal-hash" version = "0.5.1" diff --git a/oracle/Cargo.toml b/oracle/Cargo.toml index 98b9dd0..419bfc4 100644 --- a/oracle/Cargo.toml +++ b/oracle/Cargo.toml @@ -23,7 +23,7 @@ reth-tracing.workspace = true reth.workspace = true # networking -discv5 = "0.6" +discv5 = "0.7" enr = "0.12" # async diff --git a/oracle/src/exex.rs b/oracle/src/exex.rs index 08a15c0..259d7f3 100644 --- a/oracle/src/exex.rs +++ b/oracle/src/exex.rs @@ -1,5 +1,6 @@ use eyre::Result; -use futures::Future; +use futures::{Future, StreamExt}; +use reth::providers::ExecutionOutcome; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::FullNodeComponents; use reth_tracing::tracing::info; @@ -10,40 +11,45 @@ use std::{ /// The ExEx struct, representing the initialization and execution of the ExEx. pub struct ExEx { - exex: ExExContext, + ctx: ExExContext, + /// Execution outcome of the chain + execution_outcome: ExecutionOutcome, } impl ExEx { - pub fn new(exex: ExExContext) -> Self { - Self { exex } + pub(crate) fn new(ctx: ExExContext) -> Self { + Self { ctx, execution_outcome: ExecutionOutcome::default() } } } impl Future for ExEx { type Output = Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Continuously poll the ExExContext notifications - loop { - if let Some(notification) = ready!(self.exex.notifications.poll_recv(cx)) { - match ¬ification { - ExExNotification::ChainCommitted { new } => { - info!(committed_chain = ?new.range(), "Received commit"); - } - ExExNotification::ChainReorged { old, new } => { - info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); - } - ExExNotification::ChainReverted { old } => { - info!(reverted_chain = ?old.range(), "Received revert"); - } + let this = self.get_mut(); + while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) { + match ¬ification { + ExExNotification::ChainCommitted { new } => { + info!(committed_chain = ?new.range(), "Received commit"); } - - if let Some(committed_chain) = notification.committed_chain() { - self.exex - .events - .send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; + ExExNotification::ChainReorged { old, new } => { + // revert to block before the reorg + this.execution_outcome.revert_to(new.first().number - 1); + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + } + ExExNotification::ChainReverted { old } => { + this.execution_outcome.revert_to(old.first().number - 1); + info!(reverted_chain = ?old.range(), "Received revert"); } + }; + + if let Some(committed_chain) = notification.committed_chain() { + // extend the state with the new chain + this.execution_outcome.extend(committed_chain.execution_outcome().clone()); + this.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?; } } + Poll::Ready(Ok(())) } } diff --git a/oracle/src/main.rs b/oracle/src/main.rs index 32878a5..e2683dc 100644 --- a/oracle/src/main.rs +++ b/oracle/src/main.rs @@ -3,6 +3,7 @@ use cli_ext::OracleExt; use exex::ExEx; use network::{proto::OracleProtoHandler, Network}; use oracle::Oracle; +use reth::args::utils::DefaultChainSpecParser; use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols}; use reth_node_ethereum::EthereumNode; @@ -14,7 +15,7 @@ mod oracle; const ORACLE_EXEX_ID: &str = "exex-oracle"; fn main() -> eyre::Result<()> { - reth::cli::Cli::::parse().run(|builder, args| async move { + reth::cli::Cli::::parse().run(|builder, args| async move { let tcp_port = args.tcp_port; let udp_port = args.udp_port; diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index 9d5fc8d..4a73e01 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig}; use reth::network::config::SecretKey; use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5}; diff --git a/oracle/src/network/proto/mod.rs b/oracle/src/network/proto/mod.rs index af3ada7..91f617d 100644 --- a/oracle/src/network/proto/mod.rs +++ b/oracle/src/network/proto/mod.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use connection::{OracleCommand, OracleConnHandler}; use reth_eth_wire::{protocol::Protocol, Capability}; use reth_network::{protocol::ProtocolHandler, Direction}; From 321ac10d40243f72a1f7cfc987d70fa1afd73bc7 Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Mon, 9 Sep 2024 13:32:02 +0200 Subject: [PATCH 5/6] fix: test --- oracle/src/network/discovery.rs | 19 ++++++++++++------- oracle/src/network/mod.rs | 5 ++++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index 4a73e01..f17308a 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -7,7 +7,7 @@ use reth_network_peers::NodeRecord; use reth_tracing::tracing::info; use std::{ future::Future, - net::{IpAddr, Ipv4Addr, SocketAddr}, + net::SocketAddr, pin::Pin, task::{ready, Context, Poll}, }; @@ -25,12 +25,12 @@ pub(crate) struct Discovery { impl Discovery { /// Starts a new discovery node. - pub(crate) async fn new(udp_port: u16, tcp_port: u16) -> eyre::Result { + pub(crate) async fn new( + disc_addr: SocketAddr, + rlpx_addr: SocketAddr, + ) -> eyre::Result { let secret_key = SecretKey::new(&mut rand::thread_rng()); - let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); - let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); - let config = ListenConfig::from(disc_addr); let discv5_config = Config::builder(rlpx_addr) .discv5_config(discv5::ConfigBuilder::new(config).build()) @@ -87,14 +87,19 @@ impl Future for Discovery { mod tests { use crate::network::discovery::Discovery; use reth_tracing::tracing::info; + use std::net::SocketAddr; #[tokio::test] async fn can_establish_discv5_session_with_peer() { reth_tracing::init_test_tracing(); - let mut node_1 = Discovery::new(30301, 30303).await.unwrap(); + let discv5_addr: SocketAddr = format!("127.0.0.1:30301").parse().unwrap(); + let rlpx_addr: SocketAddr = format!("127.0.0.1:30303").parse().unwrap(); + let mut node_1 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap(); let node_1_enr = node_1.local_enr(); - let mut node_2 = Discovery::new(30302, 30303).await.unwrap(); + let discv5_addr: SocketAddr = format!("127.0.0.1:30302").parse().unwrap(); + let rlpx_addr: SocketAddr = format!("127.0.0.1:30303").parse().unwrap(); + let mut node_2 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap(); let node_2_enr = node_2.local_enr(); diff --git a/oracle/src/network/mod.rs b/oracle/src/network/mod.rs index 727c49b..1a0f42d 100644 --- a/oracle/src/network/mod.rs +++ b/oracle/src/network/mod.rs @@ -3,6 +3,7 @@ use futures::FutureExt; use reth_tracing::tracing::{error, info}; use std::{ future::Future, + net::{IpAddr, Ipv4Addr, SocketAddr}, pin::Pin, task::{Context, Poll}, }; @@ -19,7 +20,9 @@ pub(crate) struct Network { impl Network { pub(crate) async fn new(tcp_port: u16, udp_port: u16) -> eyre::Result { - let discovery = Discovery::new(tcp_port, udp_port).await?; + let disc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port); + let rlpx_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port); + let discovery = Discovery::new(disc_addr, rlpx_addr).await?; Ok(Self { discovery }) } } From c25bce5d5ff2ce4949a8886bfee3284642ad3ba9 Mon Sep 17 00:00:00 2001 From: Luca Provini Date: Mon, 9 Sep 2024 13:44:23 +0200 Subject: [PATCH 6/6] chore: clippy in tests --- oracle/src/network/discovery.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/oracle/src/network/discovery.rs b/oracle/src/network/discovery.rs index f17308a..8113759 100644 --- a/oracle/src/network/discovery.rs +++ b/oracle/src/network/discovery.rs @@ -92,13 +92,13 @@ mod tests { #[tokio::test] async fn can_establish_discv5_session_with_peer() { reth_tracing::init_test_tracing(); - let discv5_addr: SocketAddr = format!("127.0.0.1:30301").parse().unwrap(); - let rlpx_addr: SocketAddr = format!("127.0.0.1:30303").parse().unwrap(); + let discv5_addr: SocketAddr = "127.0.0.1:30301".to_string().parse().unwrap(); + let rlpx_addr: SocketAddr = "127.0.0.1:30303".to_string().parse().unwrap(); let mut node_1 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap(); let node_1_enr = node_1.local_enr(); - let discv5_addr: SocketAddr = format!("127.0.0.1:30302").parse().unwrap(); - let rlpx_addr: SocketAddr = format!("127.0.0.1:30303").parse().unwrap(); + let discv5_addr: SocketAddr = "127.0.0.1:30302".to_string().parse().unwrap(); + let rlpx_addr: SocketAddr = "127.0.0.1:30303".to_string().parse().unwrap(); let mut node_2 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap(); let node_2_enr = node_2.local_enr();