Skip to content

Commit

Permalink
Merge pull request #18 from loocapro/exex-oracle
Browse files Browse the repository at this point in the history
Oracle ExEx
  • Loading branch information
mattsse authored Sep 9, 2024
2 parents 52bc6dc + c25bce5 commit 123eb77
Show file tree
Hide file tree
Showing 11 changed files with 708 additions and 8 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 15 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[workspace]
members = [
"backfill",
"discv5",
"in-memory-state",
"minimal",
"op-bridge",
"remote",
"rollup",
"backfill",
"discv5",
"in-memory-state",
"minimal",
"oracle",
"op-bridge",
"remote",
"rollup",
]
resolver = "2"

Expand All @@ -25,15 +26,21 @@ reth-discv5 = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-errors = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-types = { git = "https://github.com/paradigmxyz/reth" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] }
reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" }
reth-evm = { git = "https://github.com/paradigmxyz/reth" }
reth-network = { git = "https://github.com/paradigmxyz/reth" }
reth-network-api = { git = "https://github.com/paradigmxyz/reth" }
reth-network-peers = { git = "https://github.com/paradigmxyz/reth" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth" }
reth-provider = { git = "https://github.com/paradigmxyz/reth" }
reth-revm = { git = "https://github.com/paradigmxyz/reth" }
reth-evm = { git = "https://github.com/paradigmxyz/reth" }
reth-rpc-types = { git = "https://github.com/paradigmxyz/reth" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth" }



# alloy
alloy-sol-types = { version = "0.8", features = ["json"] }

Expand Down
36 changes: 36 additions & 0 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "oracle"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish.workspace = true

[dependencies]

# reth
reth-discv5.workspace = true
reth-eth-wire.workspace = true
reth-exex.workspace = true
reth-network-api.workspace = true
reth-network-peers.workspace = true
reth-network.workspace = true
reth-node-api.workspace = true
reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-rpc-types.workspace = true
reth-tracing.workspace = true
reth.workspace = true

# networking
discv5 = "0.7"
enr = "0.12"

# async
futures.workspace = true
tokio.workspace = true
tokio-stream.workspace = true

# misc
clap = "4"
eyre.workspace = true
15 changes: 15 additions & 0 deletions oracle/src/cli_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use clap::Args;

pub const DEFAULT_DISCOVERY_PORT: u16 = 30304;
pub const DEFAULT_RLPX_PORT: u16 = 30303;

#[derive(Debug, Clone, Args)]
pub(crate) struct OracleExt {
/// TCP port used by RLPx
#[clap(long = "disc.tcp-port", default_value_t = DEFAULT_RLPX_PORT)]
pub tcp_port: u16,

/// UDP port used for discovery
#[clap(long = "disc.udp-port", default_value_t = DEFAULT_DISCOVERY_PORT)]
pub udp_port: u16,
}
55 changes: 55 additions & 0 deletions oracle/src/exex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use eyre::Result;
use futures::{Future, StreamExt};
use reth::providers::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::info;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};

/// The ExEx struct, representing the initialization and execution of the ExEx.
pub struct ExEx<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
/// Execution outcome of the chain
execution_outcome: ExecutionOutcome,
}

impl<Node: FullNodeComponents> ExEx<Node> {
pub(crate) fn new(ctx: ExExContext<Node>) -> Self {
Self { ctx, execution_outcome: ExecutionOutcome::default() }
}
}

impl<Node: FullNodeComponents> Future for ExEx<Node> {
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Continuously poll the ExExContext notifications
let this = self.get_mut();
while let Some(notification) = ready!(this.ctx.notifications.poll_next_unpin(cx)) {
match &notification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
// revert to block before the reorg
this.execution_outcome.revert_to(new.first().number - 1);
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
this.execution_outcome.revert_to(old.first().number - 1);
info!(reverted_chain = ?old.range(), "Received revert");
}
};

if let Some(committed_chain) = notification.committed_chain() {
// extend the state with the new chain
this.execution_outcome.extend(committed_chain.execution_outcome().clone());
this.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Poll::Ready(Ok(()))
}
}
38 changes: 38 additions & 0 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use network::{proto::OracleProtoHandler, Network};
use oracle::Oracle;
use reth::args::utils::DefaultChainSpecParser;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_node_ethereum::EthereumNode;

mod cli_ext;
mod exex;
mod network;
mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";

fn main() -> eyre::Result<()> {
reth::cli::Cli::<DefaultChainSpecParser, OracleExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
let udp_port = args.udp_port;

let handle = builder
.node(EthereumNode::default())
.install_exex(ORACLE_EXEX_ID, move |ctx| async move {
let subproto = OracleProtoHandler::new();
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

let exex = ExEx::new(ctx);
let network = Network::new(tcp_port, udp_port).await?;
let oracle = Oracle::new(exex, network).await?;
Ok(oracle)
})
.launch()
.await?;

handle.wait_for_node_exit().await
})
}
138 changes: 138 additions & 0 deletions oracle/src/network/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#![allow(dead_code)]

use discv5::{enr::secp256k1::rand, Enr, Event, ListenConfig};
use reth::network::config::SecretKey;
use reth_discv5::{enr::EnrCombinedKeyWrapper, Config, Discv5};
use reth_network_peers::NodeRecord;
use reth_tracing::tracing::info;
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;

/// Helper struct to manage a discovery node using discv5.
pub(crate) struct Discovery {
/// The inner discv5 instance.
inner: Discv5,
/// The node record of the discv5 instance.
node_record: NodeRecord,
/// The events stream of the discv5 instance.
events: mpsc::Receiver<discv5::Event>,
}

impl Discovery {
/// Starts a new discovery node.
pub(crate) async fn new(
disc_addr: SocketAddr,
rlpx_addr: SocketAddr,
) -> eyre::Result<Discovery> {
let secret_key = SecretKey::new(&mut rand::thread_rng());

let config = ListenConfig::from(disc_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(config).build())
.build();

let (discv5, events, node_record) = Discv5::start(&secret_key, discv5_config).await?;
Ok(Self { inner: discv5, events, node_record })
}

/// Adds a node to the table if its not already present.
pub(crate) fn add_node(&mut self, enr: Enr) -> eyre::Result<()> {
let reth_enr: enr::Enr<SecretKey> = EnrCombinedKeyWrapper(enr.clone()).into();
self.inner.add_node(reth_enr)?;
Ok(())
}

/// Returns the local ENR of the discv5 node.
pub(crate) fn local_enr(&self) -> Enr {
self.inner.with_discv5(|discv5| discv5.local_enr())
}

/// Returns true if the discv5 node has connected peers.
pub(crate) fn has_peers(&self) -> bool {
self.inner.with_discv5(|discv5| discv5.connected_peers() > 0)
}
}

impl Future for Discovery {
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut();
loop {
match ready!(this.events.poll_recv(cx)) {
Some(evt) => match evt {
Event::Discovered(enr) => {
info!(?enr, "Discovered a new peer.");
this.add_node(enr)?;
}
Event::SessionEstablished(enr, socket_addr) => {
info!(?enr, ?socket_addr, "Session established with a new peer.");
}
evt => {
info!(?evt, "New discovery event.");
}
},
None => return Poll::Ready(Ok(())),
}
}
}
}

#[cfg(test)]
mod tests {
use crate::network::discovery::Discovery;
use reth_tracing::tracing::info;
use std::net::SocketAddr;

#[tokio::test]
async fn can_establish_discv5_session_with_peer() {
reth_tracing::init_test_tracing();
let discv5_addr: SocketAddr = "127.0.0.1:30301".to_string().parse().unwrap();
let rlpx_addr: SocketAddr = "127.0.0.1:30303".to_string().parse().unwrap();
let mut node_1 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap();
let node_1_enr = node_1.local_enr();

let discv5_addr: SocketAddr = "127.0.0.1:30302".to_string().parse().unwrap();
let rlpx_addr: SocketAddr = "127.0.0.1:30303".to_string().parse().unwrap();
let mut node_2 = Discovery::new(discv5_addr, rlpx_addr).await.unwrap();

let node_2_enr = node_2.local_enr();

info!(?node_1_enr, ?node_2_enr, "Started discovery nodes.");

// add node_2 to node_1 table
node_1.add_node(node_2_enr.clone()).unwrap();

// verify node_2 is in node_1 table
assert!(node_1
.inner
.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id())));

// send ping from node_1 to node_2
node_1.inner.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();

// verify they both established a session
let event_2_v5 = node_2.events.recv().await.unwrap();
let event_1_v5 = node_1.events.recv().await.unwrap();
assert!(matches!(
event_1_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
));
assert!(matches!(
event_2_v5,
discv5::Event::SessionEstablished(node, socket) if node == node_1_enr && socket == node_1_enr.udp4_socket().unwrap().into()
));

// verify node_1 is in
let event_2_v5 = node_2.events.recv().await.unwrap();
assert!(matches!(
event_2_v5,
discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
));
}
}
Loading

0 comments on commit 123eb77

Please sign in to comment.