Skip to content

Commit

Permalink
added a basic test to run the oracle (#26)
Browse files Browse the repository at this point in the history
* added a basic test to run the oracle

* moving on

* remote reth

* oracle can peer

* mock data feed

* feat: e2e tests

* Update Cargo.toml

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>

* Update Cargo.toml

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>

* chore: tokio tungstenite as workspace dep and 2 crates under misc crates

---------

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
loocapro and shekhirin authored Oct 29, 2024
1 parent 4b6fb9b commit 5af288e
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 161 deletions.
332 changes: 209 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"
tokio-tungstenite = { version = "0.23", features = ["native-tls"] }

# serde
serde = "1"
Expand Down
11 changes: 7 additions & 4 deletions oracle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-tracing.workspace = true
reth.workspace = true
reth-tokio-util = { git = "https://github.com/paradigmxyz/reth" }

# alloy
alloy-primitives.workspace = true
Expand All @@ -34,17 +35,19 @@ enr = "0.12"
# async
futures.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.23"
tokio-tungstenite.workspace = true
tokio.workspace = true


# misc
clap = "4"
eyre.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror = "1"
uuid = "1.10.0"
rand = "0.8.5"
thiserror = "1"

[dev-dependencies]
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
160 changes: 139 additions & 21 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use futures::FutureExt;
use futures::{FutureExt, Stream};
use network::{proto::OracleProtoHandler, OracleNetwork};
use offchain_data::DataFeederStream;
use offchain_data::{DataFeederError, DataFeederStream, DataFeeds};
use oracle::Oracle;
use reth::chainspec::EthereumChainSpecParser;
use reth_exex::ExExContext;
use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;

mod cli_ext;
Expand All @@ -17,6 +19,35 @@ mod oracle;

const ORACLE_EXEX_ID: &str = "exex-oracle";

/// Helper function to start the oracle stack.
async fn start<Node: FullNodeComponents, D>(
ctx: ExExContext<Node>,
tcp_port: u16,
udp_port: u16,
data_feeder: D,
) -> eyre::Result<(Oracle<Node, D>, Node::Network)>
where
Node::Network: NetworkProtocols,
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
// Define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
// Add it to the network as a subprotocol
let net = ctx.network().clone();
net.add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

// The instance of the execution extension that will handle chain events
let exex = ExEx::new(ctx);

// The instance of the oracle network that will handle discovery and gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;

// The oracle instance that will orchestrate the network, the execution extensions,
// the off-chain data stream, and the gossiping
let oracle = Oracle::new(exex, network, data_feeder, to_peers);
Ok((oracle, net.clone()))
}

fn main() -> eyre::Result<()> {
reth::cli::Cli::<EthereumChainSpecParser, OracleExt>::parse().run(|builder, args| async move {
let tcp_port = args.tcp_port;
Expand All @@ -36,26 +67,8 @@ fn main() -> eyre::Result<()> {
// Source: https://github.com/vados-cosmonic/wasmCloud/commit/440e8c377f6b02f45eacb02692e4d2fabd53a0ec
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
// define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
// add it to the network as a subprotocol
ctx.network().add_rlpx_sub_protocol(subproto.into_rlpx_sub_protocol());

// the instance of the execution extension that will handle chain events
let exex = ExEx::new(ctx);

// the instance of the oracle network that will handle discovery and
// gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
// the off-chain data feed stream
let data_feed = DataFeederStream::new(args.binance_symbols).await?;

// the oracle instance that will orchestrate the network, the execution
// extensions, the offchain data stream and the
// gossiping the oracle will always sign and
// broadcast data via the channel until a peer is
// subcribed to it
let oracle = Oracle::new(exex, network, data_feed, to_peers);
let (oracle, _net) = start(ctx, tcp_port, udp_port, data_feed).await?;
Ok(oracle)
})
})
Expand All @@ -67,3 +80,108 @@ fn main() -> eyre::Result<()> {
handle.wait_for_node_exit().await
})
}

#[cfg(test)]
mod tests {
use crate::{offchain_data::binance::ticker::Ticker, start};
use futures::{Stream, StreamExt};
use reth_exex_test_utils::test_exex_context;
use reth_network::{NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers};
use reth_network_api::PeerId;
use reth_tokio_util::EventStream;
use reth_tracing::tracing::info;
use tokio_stream::wrappers::BroadcastStream;

async fn wait_for_session(mut events: EventStream<NetworkEvent>) -> PeerId {
while let Some(event) = events.next().await {
if let NetworkEvent::SessionEstablished { peer_id, .. } = event {
info!("Session established with {}", peer_id);
return peer_id;
}
info!("Unexpected event: {:?}", event);
}

unreachable!()
}

use crate::offchain_data::{DataFeederError, DataFeeds};
use futures::stream::{self};
use std::pin::Pin;

fn mock_stream() -> Pin<Box<dyn Stream<Item = Result<DataFeeds, DataFeederError>> + Send>> {
let ticker = Ticker {
event_type: "24hrTicker".to_string(),
event_time: 1698323450000,
symbol: "BTCUSDT".to_string(),
price_change: "100.00".to_string(),
price_change_percent: "2.5".to_string(),
weighted_avg_price: "40200.00".to_string(),
prev_close_price: "40000.00".to_string(),
last_price: "40100.00".to_string(),
last_quantity: "0.5".to_string(),
best_bid_price: "40095.00".to_string(),
best_bid_quantity: "1.0".to_string(),
best_ask_price: "40105.00".to_string(),
best_ask_quantity: "1.0".to_string(),
open_price: "39900.00".to_string(),
high_price: "40500.00".to_string(),
low_price: "39800.00".to_string(),
volume: "1500".to_string(),
quote_volume: "60000000".to_string(),
open_time: 1698237050000,
close_time: 1698323450000,
first_trade_id: 1,
last_trade_id: 2000,
num_trades: 2000,
};

// Wrap the Ticker in DataFeeds::Binance
let data_feed = DataFeeds::Binance(ticker);

// Create a stream that sends a single item and then ends, boxed and pinned
Box::pin(stream::once(async { Ok(data_feed) }))
}

#[tokio::test]
async fn e2e_oracles() {
reth_tracing::init_test_tracing();

// spawn first instance
let (ctx_1, _handle) = test_exex_context().await.unwrap();
let data_feed1 = mock_stream();
let (oracle_1, network_1) = start(ctx_1, 30303, 30304, data_feed1).await.unwrap();
let mut broadcast_stream_1 = BroadcastStream::new(oracle_1.signed_ticks().subscribe());
let signer_1 = oracle_1.signer().address();
tokio::spawn(oracle_1);
let net_1_events = network_1.event_listener();

// spawn second instance
let (ctx_2, _handle) = test_exex_context().await.unwrap();
let data_feed2 = mock_stream();
let (oracle_2, network_2) = start(ctx_2, 30305, 30306, data_feed2).await.unwrap();
let mut broadcast_stream_2 = BroadcastStream::new(oracle_2.signed_ticks().subscribe());
let signer_2 = oracle_2.signer().address();
tokio::spawn(oracle_2);
let net_2_events = network_2.event_listener();

// expect peers connected
let (peer_2, addr_2) = (network_2.peer_id(), network_2.local_addr());
network_1.add_peer(*peer_2, addr_2);
let expected_peer_2 = wait_for_session(net_1_events).await;
assert_eq!(expected_peer_2, *peer_2);

let (peer_1, addr_1) = (network_1.peer_id(), network_1.local_addr());
network_2.add_peer(*peer_1, addr_1);
let expected_peer_1 = wait_for_session(net_2_events).await;
assert_eq!(expected_peer_1, *peer_1);

// expect signed data
let signed_ticker_1 = broadcast_stream_1.next().await.unwrap().unwrap();
assert_eq!(signed_ticker_1.ticker.symbol, "BTCUSDT");
assert_eq!(signed_ticker_1.signer, signer_1);

let signed_ticker_2 = broadcast_stream_2.next().await.unwrap().unwrap();
assert_eq!(signed_ticker_2.ticker.symbol, "BTCUSDT");
assert_eq!(signed_ticker_2.signer, signer_2);
}
}
2 changes: 1 addition & 1 deletion oracle/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Future for OracleNetwork {
"Established connection, will start gossiping"
);
}
None => return Poll::Ready(Ok(())),
None => return Poll::Pending,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions oracle/src/network/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use reth_eth_wire::{
use reth_network::protocol::{ConnectionHandler, OnNotSupported};
use reth_network_api::Direction;
use reth_network_peers::PeerId;
use reth_tracing::tracing::trace;
use std::{
collections::HashMap,
pin::Pin,
Expand Down Expand Up @@ -59,6 +60,8 @@ impl Stream for OracleConnection {
}

if let Poll::Ready(Some(Ok(tick))) = this.signed_ticks.poll_next_unpin(cx) {
let signer = tick.signer;
trace!(target: "oracle::conn", ?signer, "Received signed tick data.");
return Poll::Ready(Some(
OracleProtoMessage::signed_ticker(Box::new(tick)).encoded(),
));
Expand Down Expand Up @@ -142,6 +145,7 @@ impl ConnectionHandler for OracleConnHandler {
.events
.send(ProtocolEvent::Established { direction, peer_id, to_connection: tx })
.ok();
trace!(target: "oracle::conn", "Connection established.");
OracleConnection {
conn,
initial_ping: direction.is_outgoing().then(OracleProtoMessage::ping),
Expand Down
4 changes: 2 additions & 2 deletions oracle/src/offchain_data/binance/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{ready, Stream, StreamExt};
use reth_tracing::tracing::error;
use reth_tracing::tracing::{error, trace};
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Stream for BinanceDataFeeder {
return Poll::Pending;
}
};

trace!(target: "oracle::binance", ?msg, "Received message");
Poll::Ready(Some(Ok(msg.data)))
}
Some(Err(e)) => {
Expand Down
42 changes: 32 additions & 10 deletions oracle/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{
exex::ExEx,
network::{proto::data::SignedTicker, OracleNetwork},
offchain_data::{DataFeederStream, DataFeeds},
offchain_data::{DataFeederError, DataFeeds},
};
use alloy_rlp::{BytesMut, Encodable};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info};
use reth_tracing::tracing::{error, info, trace};
use std::{
future::Future,
pin::Pin,
Expand All @@ -17,32 +17,53 @@ use std::{

/// The Oracle struct is a long running task that orchestrates discovery of new peers,
/// decoding data from chain events (ExEx) and gossiping it to peers.
pub(crate) struct Oracle<Node: FullNodeComponents> {
pub(crate) struct Oracle<Node: FullNodeComponents, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
/// The network task for this node.
/// It is composed by a discovery task and a sub protocol RLPx task.
network: OracleNetwork,
/// The execution extension task for this node.
exex: ExEx<Node>,
/// The offchain data feed stream.
data_feed: DataFeederStream,
data_feed: D,
/// The signer to sign the data feed.
signer: PrivateKeySigner,
/// Half of the broadcast channel to send data to connected peers.
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
}

impl<Node: FullNodeComponents> Oracle<Node> {
impl<Node: FullNodeComponents, D> Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
pub(crate) fn new(
exex: ExEx<Node>,
network: OracleNetwork,
data_feed: DataFeederStream,
data_feed: D,
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
) -> Self {
Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers }
}

/// Returns the signer used by the oracle.
#[allow(dead_code)]
pub(crate) fn signer(&self) -> &PrivateKeySigner {
&self.signer
}

/// Returns the signed ticker broadcast channel.
#[allow(dead_code)]
pub(crate) fn signed_ticks(&self) -> &tokio::sync::broadcast::Sender<SignedTicker> {
&self.to_peers
}
}

impl<Node: FullNodeComponents> Future for Oracle<Node> {
impl<Node: FullNodeComponents, D> Future for Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static + std::marker::Unpin,
{
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -74,8 +95,9 @@ impl<Node: FullNodeComponents> Future for Oracle<Node> {
let signature = this.signer.sign_message_sync(&buffer)?;
let signed_ticker = SignedTicker::new(ticker, signature, this.signer.address());

if let Err(err) = this.to_peers.send(signed_ticker.clone()) {
error!(?err, "Failed to send ticker to gossip, no peers connected");
if this.to_peers.send(signed_ticker.clone()).is_ok() {
let signer = signed_ticker.signer;
trace!(target: "oracle", ?signer, "Sent signed ticker");
}
}
Some(Err(e)) => {
Expand Down

0 comments on commit 5af288e

Please sign in to comment.