Skip to content

Commit

Permalink
Relayer integration test (#678)
Browse files Browse the repository at this point in the history
* add new relayer clone

* delete a bunch of stuff not needed for POA

* started integration test with middleware

* add some more test stuff

* adds await synced

* use watch instead of notifier

* added middleware

* add some basic provider usage

* test passes for gettting da height

* add test for downloading messages and adding them to the database

* adds test for committed block

* add state transitions and sync updates

* test state and mock events and calls

* add ability to track pending transactions

* fix future

* cleanup and refactor

* remove old relayer

* add flake to generate bindings

* update to new abi

* add missing commit block calls

* remove whitespace

* lint toml

* refactor to for upcoming contract spec

* commit before removing publish

* remove publish and add pagination to get logs

* Add comments

* remove debug

* fix toml

* handle relayer errors properly and test syncing

* clippy

* make tokio tests run in paused mode

* only construct relayer if the relayer url is set

* update readme

* fix range and put logs into db inside stream

* move nix to another pr

* documentation

* add deploy height to relayer

* fix db

* record height on pages

* make pages type consume if empty

* make syncing eth loop configurable and only log every N seconds

* Nix flake to generate contract abi bindings

* add relayer integration test

* fix feature flag

Co-authored-by: Brandon Kite <brandonkite92@gmail.com>
  • Loading branch information
freesig and Voxelot authored Oct 12, 2022
1 parent 4eb8da6 commit cd82801
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 12 deletions.
54 changes: 43 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion fuel-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ harness = true
[dependencies]
async-std = "1.12"
chrono = { version = "0.4", features = ["serde"] }
ethers = "0.17"
fuel-core = { path = "../fuel-core", default-features = false }
fuel-core-interfaces = { path = "../fuel-core-interfaces", features = ["test-helpers"] }
fuel-gql-client = { path = "../fuel-client", features = ["test-helpers"] }
fuel-relayer = { path = "../fuel-relayer", features = ["test-helpers"] }
fuel-txpool = { path = "../fuel-txpool" }
futures = "0.3"
hyper = { version = "0.14", features = ["server"] }
insta = "1.8"
itertools = "0.10"
rand = "0.8"
Expand All @@ -32,10 +35,12 @@ rstest = "0.15"
serde_json = "1.0"
tempfile = "3.3"
tokio = { version = "1.21", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = "0.3"

[features]
metrics = ["fuel-core/rocksdb", "fuel-core/metrics"]
default = ["fuel-core/default", "metrics"]
default = ["fuel-core/default", "metrics", "relayer"]
debug = ["fuel-core-interfaces/debug"]
p2p = ["fuel-core/p2p"]
relayer = ["fuel-core/relayer"]
2 changes: 2 additions & 0 deletions fuel-tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ mod messages;
#[cfg(feature = "metrics")]
mod metrics;
mod node_info;
#[cfg(feature = "relayer")]
mod relayer;
mod resource;
mod snapshot;
mod tx;
Expand Down
161 changes: 161 additions & 0 deletions fuel-tests/tests/relayer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
use std::{
net::Ipv4Addr,
sync::Arc,
time::Duration,
};

use ethers::providers::Middleware;
use fuel_core::{
database::Database,
service::{
Config,
FuelService,
},
};
use fuel_core_interfaces::db::Messages;
use fuel_gql_client::prelude::StorageAsRef;

use fuel_relayer::test_helpers::{
middleware::MockMiddleware,
EvtToLog,
LogTestHelper,
};
use hyper::{
service::{
make_service_fn,
service_fn,
},
Body,
Request,
Response,
Server,
};
use serde_json::json;
use std::{
convert::Infallible,
net::SocketAddr,
};

async fn handle(
mock: Arc<MockMiddleware>,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
let body = hyper::body::to_bytes(req).await.unwrap();

let v: serde_json::Value = serde_json::from_slice(body.as_ref()).unwrap();
let mut o = match v {
serde_json::Value::Object(o) => o,
_ => unreachable!(),
};
let id = o.get("id").unwrap().as_u64().unwrap();
let method = o.get("method").unwrap().as_str().unwrap();
let r = match method {
"eth_blockNumber" => {
let r = mock.get_block_number().await.unwrap();
json!({ "id": id, "jsonrpc": "2.0", "result": r })
}
"eth_syncing" => {
let r = mock.syncing().await.unwrap();
match r {
ethers::providers::SyncingStatus::IsFalse => {
json!({ "id": id, "jsonrpc": "2.0", "result": false })
}
ethers::providers::SyncingStatus::IsSyncing {
starting_block,
current_block,
highest_block,
} => {
json!({ "id": id, "jsonrpc": "2.0", "result": {
"starting_block": starting_block,
"current_block": current_block,
"highest_block": highest_block,
} })
}
}
}
"eth_getLogs" => {
let params = o.remove("params").unwrap();
let params: Vec<_> = serde_json::from_value(params).unwrap();
let r = mock.get_logs(&params[0]).await.unwrap();
json!({ "id": id, "jsonrpc": "2.0", "result": r })
}
_ => unreachable!(),
};

let r = serde_json::to_vec(&r).unwrap();

Ok(Response::new(Body::from(r)))
}

#[tokio::test(flavor = "multi_thread")]
async fn relayer_can_download_logs() {
let mut config = Config::local_node();
let eth_node = MockMiddleware::default();
let contract_address = config.relayer.eth_v2_listening_contracts[0];
let message = |nonce, block_number: u64| {
let message = fuel_relayer::bridge::SentMessageFilter {
nonce,
..Default::default()
};
let mut log = message.into_log();
log.address = contract_address;
log.block_number = Some(block_number.into());
log
};

let logs = vec![message(1, 3), message(2, 5)];
let expected_messages: Vec<_> = logs.iter().map(|l| l.to_msg()).collect();
eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]);
// Setup the eth node with a block high enough that there
// will be some finalized blocks.
eth_node.update_data(|data| data.best_block.number = Some(200.into()));
let eth_node = Arc::new(eth_node);

// Construct our SocketAddr to listen on...
let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));

// And a MakeService to handle each connection...
let make_service = make_service_fn(move |_conn| {
let eth_node = eth_node.clone();
async move {
Ok::<_, Infallible>(service_fn({
let eth_node = eth_node.clone();
move |req| handle(eth_node.clone(), req)
}))
}
});

// Then bind and serve...
let server = Server::bind(&addr).serve(make_service);
let addr = server.local_addr();

let (shutdown, rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async {
rx.await.ok();
});
// And run forever...
if let Err(e) = graceful.await {
eprintln!("server error: {}", e);
}
});

config.relayer.eth_client =
Some(format!("http://{}", addr).as_str().try_into().unwrap());
let db = Database::in_memory();

let srv = FuelService::from_database(db.clone(), config)
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(10)).await;
for msg in expected_messages {
assert_eq!(
&*db.storage::<Messages>().get(msg.id()).unwrap().unwrap(),
&*msg
);
}
srv.stop().await;
shutdown.send(()).unwrap();
}

0 comments on commit cd82801

Please sign in to comment.