From f53cd7c98699a0c6ea496131934f487a15645be7 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Thu, 13 Oct 2022 12:29:24 -0700 Subject: [PATCH 1/4] use finalized_da_height in the naive block producer until the real one is wired up --- fuel-core/src/executor.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fuel-core/src/executor.rs b/fuel-core/src/executor.rs index 83e406deb1b..066f6838c28 100644 --- a/fuel-core/src/executor.rs +++ b/fuel-core/src/executor.rs @@ -60,6 +60,7 @@ use fuel_core_interfaces::{ FuelBlockHeader, Message, }, + relayer::RelayerDb, }; use fuel_storage::{ StorageAsMut, @@ -103,6 +104,7 @@ impl Executor { // setup and execute block let current_height = db.get_block_height()?.unwrap_or_default(); let current_hash = db.get_block_id(current_height)?.unwrap_or_default(); + let da_height = db.get_finalized_da_height().await.unwrap_or_default(); let new_block_height = current_height + 1u32.into(); let mut block = FuelBlock { @@ -110,6 +112,7 @@ impl Executor { height: new_block_height, parent_hash: current_hash, time: Utc::now(), + da_height, ..Default::default() }, transactions: txs.into_iter().map(|t| t.as_ref().clone()).collect(), From ef9f0da676cb86320cba48fac3017a4054626d9f Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Thu, 13 Oct 2022 14:38:44 -0700 Subject: [PATCH 2/4] update integ tests to verify downloaded msgs can be spent --- fuel-core/src/service.rs | 18 ++ fuel-tests/tests/relayer.rs | 355 +++++++++++++++++++++++++++--------- 2 files changed, 286 insertions(+), 87 deletions(-) diff --git a/fuel-core/src/service.rs b/fuel-core/src/service.rs index a9536e06914..6165b30ff25 100644 --- a/fuel-core/src/service.rs +++ b/fuel-core/src/service.rs @@ -105,4 +105,22 @@ impl FuelService { } self.modules.stop().await; } + + /// Wait for the [`Relayer`] to be in sync with + /// the data availability layer. + /// + /// Yields until the relayer reaches a point where it + /// considered up to date. Note that there's no guarantee + /// the relayer will ever catch up to the da layer and + /// may fall behind immediately after this future completes. + /// + /// The only guarantee is that if this future completes then + /// the relayer did reach consistency with the da layer for + /// some period of time. + pub async fn await_relayer_synced(&self) -> anyhow::Result<()> { + if let Some(relayer_handle) = &self.modules.relayer { + relayer_handle.await_synced().await?; + } + Ok(()) + } } diff --git a/fuel-tests/tests/relayer.rs b/fuel-tests/tests/relayer.rs index fa282058228..d999c5578b4 100644 --- a/fuel-tests/tests/relayer.rs +++ b/fuel-tests/tests/relayer.rs @@ -1,10 +1,7 @@ -use std::{ - net::Ipv4Addr, - sync::Arc, - time::Duration, +use ethers::{ + providers::Middleware, + types::Log, }; - -use ethers::providers::Middleware; use fuel_core::{ database::Database, service::{ @@ -12,13 +9,41 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_interfaces::db::Messages; -use fuel_gql_client::prelude::StorageAsRef; - -use fuel_relayer::test_helpers::{ - middleware::MockMiddleware, - EvtToLog, - LogTestHelper, +use fuel_core_interfaces::{ + common::{ + fuel_crypto::SecretKey, + fuel_tx::{ + Input, + TransactionBuilder, + }, + fuel_types::MessageId, + prelude::{ + Address, + Output, + }, + }, + db::Messages, +}; +use fuel_gql_client::{ + client::{ + types::TransactionStatus, + FuelClient, + PageDirection, + PaginationRequest, + }, + fuel_tx::AssetId, + prelude::{ + Opcode, + StorageAsRef, + }, +}; +use fuel_relayer::{ + test_helpers::{ + middleware::MockMiddleware, + EvtToLog, + LogTestHelper, + }, + H160, }; use hyper::{ service::{ @@ -30,62 +55,21 @@ use hyper::{ Response, Server, }; +use rand::{ + prelude::StdRng, + Rng, + SeedableRng, +}; use serde_json::json; use std::{ convert::Infallible, - net::SocketAddr, + net::{ + Ipv4Addr, + SocketAddr, + }, + sync::Arc, }; - -async fn handle( - mock: Arc, - req: Request, -) -> Result, Infallible> { - let body = hyper::body::to_bytes(req).await.unwrap(); - - let v: serde_json::Value = serde_json::from_slice(body.as_ref()).unwrap(); - let mut o = match v { - serde_json::Value::Object(o) => o, - _ => unreachable!(), - }; - let id = o.get("id").unwrap().as_u64().unwrap(); - let method = o.get("method").unwrap().as_str().unwrap(); - let r = match method { - "eth_blockNumber" => { - let r = mock.get_block_number().await.unwrap(); - json!({ "id": id, "jsonrpc": "2.0", "result": r }) - } - "eth_syncing" => { - let r = mock.syncing().await.unwrap(); - match r { - ethers::providers::SyncingStatus::IsFalse => { - json!({ "id": id, "jsonrpc": "2.0", "result": false }) - } - ethers::providers::SyncingStatus::IsSyncing { - starting_block, - current_block, - highest_block, - } => { - json!({ "id": id, "jsonrpc": "2.0", "result": { - "starting_block": starting_block, - "current_block": current_block, - "highest_block": highest_block, - } }) - } - } - } - "eth_getLogs" => { - let params = o.remove("params").unwrap(); - let params: Vec<_> = serde_json::from_value(params).unwrap(); - let r = mock.get_logs(¶ms[0]).await.unwrap(); - json!({ "id": id, "jsonrpc": "2.0", "result": r }) - } - _ => unreachable!(), - }; - - let r = serde_json::to_vec(&r).unwrap(); - - Ok(Response::new(Body::from(r))) -} +use tokio::sync::oneshot::Sender; #[tokio::test(flavor = "multi_thread")] async fn relayer_can_download_logs() { @@ -93,14 +77,15 @@ async fn relayer_can_download_logs() { let eth_node = MockMiddleware::default(); let contract_address = config.relayer.eth_v2_listening_contracts[0]; let message = |nonce, block_number: u64| { - let message = fuel_relayer::bridge::SentMessageFilter { + make_message_event( nonce, - ..Default::default() - }; - let mut log = message.into_log(); - log.address = contract_address; - log.block_number = Some(block_number.into()); - log + block_number, + contract_address, + None, + None, + None, + None, + ) }; let logs = vec![message(1, 3), message(2, 5)]; @@ -110,7 +95,161 @@ async fn relayer_can_download_logs() { // will be some finalized blocks. eth_node.update_data(|data| data.best_block.number = Some(200.into())); let eth_node = Arc::new(eth_node); + let eth_node_handle = spawn_eth_node(eth_node).await; + + config.relayer.eth_client = Some( + format!("http://{}", eth_node_handle.address) + .as_str() + .try_into() + .unwrap(), + ); + let db = Database::in_memory(); + + let srv = FuelService::from_database(db.clone(), config) + .await + .unwrap(); + + // wait for relayer to catch up + srv.await_relayer_synced().await.unwrap(); + + // check the db for downloaded messages + for msg in expected_messages { + assert_eq!( + &*db.storage::().get(msg.id()).unwrap().unwrap(), + &*msg + ); + } + srv.stop().await; + eth_node_handle.shutdown.send(()).unwrap(); +} +#[tokio::test(flavor = "multi_thread")] +async fn messages_are_spendable_after_relayer_is_synced() { + let mut rng = StdRng::seed_from_u64(1234); + let mut config = Config::local_node(); + let eth_node = MockMiddleware::default(); + let contract_address = config.relayer.eth_v2_listening_contracts[0]; + + // setup a real spendable message + let secret_key: SecretKey = rng.gen(); + let pk = secret_key.public_key(); + let recipient = Input::owner(&pk); + let sender = Address::zeroed(); + let amount = 100; + let nonce = 2; + let logs = vec![make_message_event( + nonce, + 5, + contract_address, + Some(sender.clone().into()), + Some(recipient.clone().into()), + Some(amount), + None, + )]; + eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]); + // Setup the eth node with a block high enough that there + // will be some finalized blocks. + eth_node.update_data(|data| data.best_block.number = Some(200.into())); + let eth_node = Arc::new(eth_node); + let eth_node_handle = spawn_eth_node(eth_node).await; + + config.relayer.eth_client = Some( + format!("http://{}", eth_node_handle.address) + .as_str() + .try_into() + .unwrap(), + ); + + config.utxo_validation = true; + + // setup fuel node with mocked eth url + let db = Database::in_memory(); + + let srv = FuelService::from_database(db.clone(), config) + .await + .unwrap(); + + let client = FuelClient::from(srv.bound_address); + + // wait for relayer to catch up to eth node + srv.await_relayer_synced().await.unwrap(); + + // attempt to spend the message downloaded from the relayer + let tx = + TransactionBuilder::script(vec![Opcode::RET(0)].into_iter().collect(), vec![]) + .gas_limit(10_000) + .gas_price(0) + .add_unsigned_message_input(secret_key, sender, nonce, amount, vec![]) + .add_output(Output::change(rng.gen(), 0, AssetId::BASE)) + .finalize(); + + let tx_id = client.submit(&tx).await.unwrap(); + let status = client.transaction_status(&tx_id.to_string()).await.unwrap(); + + // verify transaction executed successfully + assert!( + matches!(&status, &TransactionStatus::Success { .. }), + "{:?}", + &status + ); + + // verify message state is spent + let query = client + .messages( + None, + PaginationRequest { + cursor: None, + results: 1, + direction: PageDirection::Forward, + }, + ) + .await + .unwrap(); + assert_eq!(query.results.len(), 1); + + // verify that the message id matches what we spent + let message_id = tx.inputs()[0] + .message_id() + .expect("first input should be a message") + .clone(); + assert_eq!( + MessageId::from(query.results[0].message_id.clone()), + message_id + ); + + // verify the spent status of the message + assert_eq!( + query.results[0].fuel_block_spend.clone().map(u64::from), + Some(1u64) + ); + + srv.stop().await; + eth_node_handle.shutdown.send(()).unwrap(); +} + +fn make_message_event( + nonce: u64, + block_number: u64, + contract_address: H160, + sender: Option<[u8; 32]>, + recipient: Option<[u8; 32]>, + amount: Option, + data: Option>, +) -> Log { + let message = fuel_relayer::bridge::SentMessageFilter { + nonce, + sender: sender.unwrap_or_default(), + recipient: recipient.unwrap_or_default(), + amount: amount.unwrap_or_default(), + data: data.map(Into::into).unwrap_or_default(), + }; + let mut log = message.into_log(); + log.address = contract_address; + log.block_number = Some(block_number.into()); + log +} + +async fn spawn_eth_node(eth_node: Arc) -> EthNodeHandle { // Construct our SocketAddr to listen on... let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); @@ -140,22 +279,64 @@ async fn relayer_can_download_logs() { eprintln!("server error: {}", e); } }); + EthNodeHandle { + shutdown, + address: addr, + } +} - config.relayer.eth_client = - Some(format!("http://{}", addr).as_str().try_into().unwrap()); - let db = Database::in_memory(); +pub(crate) struct EthNodeHandle { + pub(crate) shutdown: Sender<()>, + pub(crate) address: SocketAddr, +} - let srv = FuelService::from_database(db.clone(), config) - .await - .unwrap(); +async fn handle( + mock: Arc, + req: Request, +) -> Result, Infallible> { + let body = hyper::body::to_bytes(req).await.unwrap(); - tokio::time::sleep(Duration::from_secs(10)).await; - for msg in expected_messages { - assert_eq!( - &*db.storage::().get(msg.id()).unwrap().unwrap(), - &*msg - ); - } - srv.stop().await; - shutdown.send(()).unwrap(); + let v: serde_json::Value = serde_json::from_slice(body.as_ref()).unwrap(); + let mut o = match v { + serde_json::Value::Object(o) => o, + _ => unreachable!(), + }; + let id = o.get("id").unwrap().as_u64().unwrap(); + let method = o.get("method").unwrap().as_str().unwrap(); + let r = match method { + "eth_blockNumber" => { + let r = mock.get_block_number().await.unwrap(); + json!({ "id": id, "jsonrpc": "2.0", "result": r }) + } + "eth_syncing" => { + let r = mock.syncing().await.unwrap(); + match r { + ethers::providers::SyncingStatus::IsFalse => { + json!({ "id": id, "jsonrpc": "2.0", "result": false }) + } + ethers::providers::SyncingStatus::IsSyncing { + starting_block, + current_block, + highest_block, + } => { + json!({ "id": id, "jsonrpc": "2.0", "result": { + "starting_block": starting_block, + "current_block": current_block, + "highest_block": highest_block, + } }) + } + } + } + "eth_getLogs" => { + let params = o.remove("params").unwrap(); + let params: Vec<_> = serde_json::from_value(params).unwrap(); + let r = mock.get_logs(¶ms[0]).await.unwrap(); + json!({ "id": id, "jsonrpc": "2.0", "result": r }) + } + _ => unreachable!(), + }; + + let r = serde_json::to_vec(&r).unwrap(); + + Ok(Response::new(Body::from(r))) } From 3ecaf1d78576ebc933a4e1904ae3a12805062285 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Thu, 13 Oct 2022 14:54:43 -0700 Subject: [PATCH 3/4] fix feature flag --- fuel-core/src/service.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/fuel-core/src/service.rs b/fuel-core/src/service.rs index 6165b30ff25..90d37e30a0b 100644 --- a/fuel-core/src/service.rs +++ b/fuel-core/src/service.rs @@ -106,6 +106,7 @@ impl FuelService { self.modules.stop().await; } + #[cfg(feature = "relayer")] /// Wait for the [`Relayer`] to be in sync with /// the data availability layer. /// From 020ccda3bbd306defafe5e35ab968afee846482a Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Thu, 13 Oct 2022 15:11:38 -0700 Subject: [PATCH 4/4] appease clippy --- fuel-tests/tests/relayer.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/fuel-tests/tests/relayer.rs b/fuel-tests/tests/relayer.rs index d999c5578b4..3d034966e87 100644 --- a/fuel-tests/tests/relayer.rs +++ b/fuel-tests/tests/relayer.rs @@ -141,8 +141,8 @@ async fn messages_are_spendable_after_relayer_is_synced() { nonce, 5, contract_address, - Some(sender.clone().into()), - Some(recipient.clone().into()), + Some(sender.into()), + Some(recipient.into()), Some(amount), None, )]; @@ -210,11 +210,10 @@ async fn messages_are_spendable_after_relayer_is_synced() { // verify that the message id matches what we spent let message_id = tx.inputs()[0] .message_id() - .expect("first input should be a message") - .clone(); + .expect("first input should be a message"); assert_eq!( MessageId::from(query.results[0].message_id.clone()), - message_id + *message_id ); // verify the spent status of the message