Skip to content

Commit

Permalink
mock data feed
Browse files Browse the repository at this point in the history
  • Loading branch information
loocapro committed Oct 26, 2024
1 parent 68b0398 commit 997c715
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 25 deletions.
37 changes: 19 additions & 18 deletions oracle/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use clap::Parser;
use cli_ext::OracleExt;
use exex::ExEx;
use futures::FutureExt;
use futures::{FutureExt, Stream};
use network::{proto::OracleProtoHandler, OracleNetwork};
use offchain_data::DataFeederStream;
use offchain_data::{DataFeederError, DataFeederStream, DataFeeds};
use oracle::Oracle;
use reth::chainspec::EthereumChainSpecParser;
use reth_exex::ExExContext;
Expand All @@ -20,14 +20,15 @@ mod oracle;
const ORACLE_EXEX_ID: &str = "exex-oracle";

/// Helper function to start the oracle stack.
async fn start<Node: FullNodeComponents>(
async fn start<Node: FullNodeComponents, D>(
ctx: ExExContext<Node>,
tcp_port: u16,
udp_port: u16,
binance_symbols: Vec<String>,
) -> eyre::Result<(Oracle<Node>, Node::Network)>
data_feeder: D,
) -> eyre::Result<(Oracle<Node, D>, Node::Network)>
where
Node::Network: NetworkProtocols,
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
// Define the oracle subprotocol
let (subproto, proto_events, to_peers) = OracleProtoHandler::new();
Expand All @@ -40,12 +41,10 @@ where

// The instance of the oracle network that will handle discovery and gossiping of data
let network = OracleNetwork::new(proto_events, tcp_port, udp_port).await?;
// The off-chain data feed stream
let data_feed = DataFeederStream::new(binance_symbols).await?;

// The oracle instance that will orchestrate the network, the execution extensions,
// the off-chain data stream, and the gossiping
let oracle = Oracle::new(exex, network, data_feed, to_peers);
let oracle = Oracle::new(exex, network, data_feeder, to_peers);
Ok((oracle, net.clone()))
}

Expand All @@ -68,8 +67,8 @@ fn main() -> eyre::Result<()> {
// Source: https://github.com/vados-cosmonic/wasmCloud/commit/440e8c377f6b02f45eacb02692e4d2fabd53a0ec
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
let (oracle, _net) =
start(ctx, tcp_port, udp_port, args.binance_symbols).await?;
let data_feed = DataFeederStream::new(args.binance_symbols).await?;
let (oracle, _net) = start(ctx, tcp_port, udp_port, data_feed).await?;
Ok(oracle)
})
})
Expand Down Expand Up @@ -104,25 +103,27 @@ mod tests {
unreachable!()
}

fn mock_stream() -> impl StreamExt<
Item = Result<crate::offchain_data::DataFeeds, crate::offchain_data::DataFeederError>,
> {
futures::stream::empty()
}

#[tokio::test]
async fn oracles_can_peer() {
reth_tracing::init_test_tracing();

// spawn first instance
let (ctx_1, _handle) = test_exex_context().await.unwrap();
let (oracle_1, network_1) =
start(ctx_1, 30303, 30304, vec!["btcusdc".to_string(), "ethusdc".to_string()])
.await
.unwrap();
let data_feed1 = mock_stream();
let (oracle_1, network_1) = start(ctx_1, 30303, 30304, data_feed1).await.unwrap();
tokio::spawn(oracle_1);
let net_1_events = network_1.event_listener();

// spawn second instance
let (ctx_2, _handle) = test_exex_context().await.unwrap();
let (oracle_2, network_2) =
start(ctx_2, 30305, 30306, vec!["btcusdc".to_string(), "ethusdc".to_string()])
.await
.unwrap();
let data_feed2 = mock_stream();
let (oracle_2, network_2) = start(ctx_2, 30305, 30306, data_feed2).await.unwrap();
tokio::spawn(oracle_2);
let net_2_events = network_2.event_listener();

Expand Down
23 changes: 16 additions & 7 deletions oracle/src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
exex::ExEx,
network::{proto::data::SignedTicker, OracleNetwork},
offchain_data::{DataFeederStream, DataFeeds},
offchain_data::{DataFeederError, DataFeeds},
};
use alloy_rlp::{BytesMut, Encodable};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use reth_node_api::FullNodeComponents;
use reth_tracing::tracing::{error, info, trace};
use std::{
Expand All @@ -17,32 +17,41 @@ use std::{

/// The Oracle struct is a long running task that orchestrates discovery of new peers,
/// decoding data from chain events (ExEx) and gossiping it to peers.
pub(crate) struct Oracle<Node: FullNodeComponents> {
pub(crate) struct Oracle<Node: FullNodeComponents, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
/// The network task for this node.
/// It is composed by a discovery task and a sub protocol RLPx task.
network: OracleNetwork,
/// The execution extension task for this node.
exex: ExEx<Node>,
/// The offchain data feed stream.
data_feed: DataFeederStream,
data_feed: D,
/// The signer to sign the data feed.
signer: PrivateKeySigner,
/// Half of the broadcast channel to send data to connected peers.
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
}

impl<Node: FullNodeComponents> Oracle<Node> {
impl<Node: FullNodeComponents, D> Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static,
{
pub(crate) fn new(
exex: ExEx<Node>,
network: OracleNetwork,
data_feed: DataFeederStream,
data_feed: D,
to_peers: tokio::sync::broadcast::Sender<SignedTicker>,
) -> Self {
Self { exex, network, data_feed, signer: PrivateKeySigner::random(), to_peers }
}
}

impl<Node: FullNodeComponents> Future for Oracle<Node> {
impl<Node: FullNodeComponents, D> Future for Oracle<Node, D>
where
D: Stream<Item = Result<DataFeeds, DataFeederError>> + Send + 'static + std::marker::Unpin,
{
type Output = eyre::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down

0 comments on commit 997c715

Please sign in to comment.