diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4b7562331..c168f0155 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -22,8 +22,10 @@ through github via pull requests. * Mark unfinished pull requests with the "Work in Progress" label. * Before submitting a pr for review, you should run the following commands locally and make sure they are passing, otherwise CI will raise an error. - * `cargo fmt` and `cargo clippy` for linting checks - * `cargo test --workspace` to run all tests + * `cargo fmt --all -- --check` and `cargo clippy --all -- --deny warnings` for linting checks + * `RUSTFLAGS='-D warnings' cargo test --workspace` to run all tests + * Run the `ethportal-peertest` harness against a locally running node. Instructions + can be found in [README](ethportal-peertest/README.md). * Pull requests **should** always be reviewed by another member of the team prior to being merged. * Obvious exceptions include very small pull requests. diff --git a/Cargo.lock b/Cargo.lock index 4c2a6e018..9f103265a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -698,9 +698,12 @@ version = "0.1.0" dependencies = [ "clap", "discv5", + "hyper", "log 0.4.14", "rocksdb", + "serde_json", "structopt", + "thiserror", "tokio", "tracing", "tracing-subscriber", @@ -945,6 +948,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c06815895acec637cd6ed6e9662c935b866d20a106f8361892893a7d9234964" +dependencies = [ + "bytes 1.1.0", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "handlebars" version = "0.29.1" @@ -1040,18 +1062,70 @@ dependencies = [ "sha1", ] +[[package]] +name = "http" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" +dependencies = [ + "bytes 1.1.0", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" +dependencies = [ + "bytes 1.1.0", + "http", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +[[package]] +name = "httpdate" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" + [[package]] name = "humantime" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.4.2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -1107,6 +1181,16 @@ dependencies = [ "syn", ] +[[package]] +name = "indexmap" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" +dependencies = [ + "autocfg 1.0.1", + "hashbrown 0.11.2", +] + [[package]] name = "instant" version = "0.1.9" @@ -1147,7 +1231,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7e2f18aece9709094573a9f24f483c4f65caa4298e2f7ae1b71cc65d853fad7" dependencies = [ - "socket2", + "socket2 0.3.19", "widestring", "winapi 0.3.9", "winreg", @@ -2367,6 +2451,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "socket2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" +dependencies = [ + "libc", + "winapi 0.3.9", +] + [[package]] name = "spin" version = "0.5.2" @@ -2747,6 +2841,12 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + [[package]] name = "tracing" version = "0.1.26" @@ -2952,6 +3052,12 @@ dependencies = [ "trin-core", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typenum" version = "1.13.0" @@ -3143,6 +3249,16 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log 0.4.14", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/ethportal-peertest/Cargo.toml b/ethportal-peertest/Cargo.toml index b6453a3a7..7aa81f66c 100644 --- a/ethportal-peertest/Cargo.toml +++ b/ethportal-peertest/Cargo.toml @@ -7,9 +7,12 @@ edition = "2018" [dependencies] clap = "2.33.3" +hyper = { version = "0.14", features = ["full"] } log = "0.4.14" rocksdb = "0.16.0" +serde_json = "1.0.59" structopt = "0.3" +thiserror = "1.0.29" tokio = {version = "1.8.0", features = ["full"]} tracing = "0.1.26" tracing-subscriber = "0.2.18" diff --git a/ethportal-peertest/README.md b/ethportal-peertest/README.md index f2580dc33..38085c386 100644 --- a/ethportal-peertest/README.md +++ b/ethportal-peertest/README.md @@ -5,6 +5,9 @@ Run a portal network node that you want to test and pass node's Enr as a target ```sh cd ethportal-peertest -RUST_LOG=debug cargo run -p ethportal-peertest -- --target_node enr:-IS4QBDHCSMoYoC5UziAwKSyTmMPrhMaEpaE52L8DDAkipqvZQe9fgLy2wVuuEJwO9l1KsYrRoFGCsNjylbd0CDNw60BgmlkgnY0gmlwhMCoXUSJc2VjcDI1NmsxoQJPAZUFErHK1DZYRTLjk3SCNgye9sS-MxoQI-gLiUdwc4N1ZHCCIyk +RUST_LOG=debug cargo run -p ethportal-peertest -- --target-node enr:-IS4QBDHCSMoYoC5UziAwKSyTmMPrhMaEpaE52L8DDAkipqvZQe9fgLy2wVuuEJwO9l1KsYrRoFGCsNjylbd0CDNw60BgmlkgnY0gmlwhMCoXUSJc2VjcDI1NmsxoQJPAZUFErHK1DZYRTLjk3SCNgye9sS-MxoQI-gLiUdwc4N1ZHCCIyk +``` + +## Transport selection +Running the test harness will by default test all jsonrpc endpoints over IPC to the target node. To make sure these pass, please make sure that the target node is running with `--web3-transport ipc`. To test jsonrpc over http, use the `--target-transport http` cli argument for the harness, and make sure the target node is running with `--web3-transport http`. Ideally, both transport methods are tested before PRs. -``` \ No newline at end of file diff --git a/ethportal-peertest/src/cli.rs b/ethportal-peertest/src/cli.rs index 35c975bea..2bb875819 100644 --- a/ethportal-peertest/src/cli.rs +++ b/ethportal-peertest/src/cli.rs @@ -1,6 +1,8 @@ use std::env; use std::ffi::OsString; use structopt::StructOpt; +use trin_core::cli::DEFAULT_WEB3_HTTP_PORT as DEFAULT_TARGET_HTTP_PORT; +use trin_core::cli::DEFAULT_WEB3_IPC_PATH as DEFAULT_TARGET_IPC_PATH; const DEFAULT_LISTEN_PORT: &str = "9876"; const DEFAULT_WEB3_IPC_PATH: &str = "/tmp/json-rpc-peertest.ipc"; @@ -15,24 +17,46 @@ pub struct PeertestConfig { #[structopt( default_value(DEFAULT_LISTEN_PORT), short = "p", - long = "listen_port", + long = "listen-port", help = "The UDP port to listen on." )] pub listen_port: u16, #[structopt( default_value(DEFAULT_WEB3_IPC_PATH), - long = "web3_ipc_path", + long = "web3-ipc-path", help = "path to json-rpc socket address over IPC" )] pub web3_ipc_path: String, #[structopt( short, - long = "target_node", - help = "Base64-encoded ENR's of the nodes under test" + long = "target-node", + help = "Base64-encoded ENR of the node under test" )] pub target_node: String, + + #[structopt( + default_value = "ipc", + possible_values(&["http", "ipc"]), + long = "target-transport", + help = "Transport type of the node under test" + )] + pub target_transport: String, + + #[structopt( + default_value = DEFAULT_TARGET_IPC_PATH, + long = "target-ipc-path", + help = "IPC path of target node under test" + )] + pub target_ipc_path: String, + + #[structopt( + default_value = DEFAULT_TARGET_HTTP_PORT, + long = "target-http-port", + help = "HTTP port of target node under test" + )] + pub target_http_port: String, } impl PeertestConfig { diff --git a/ethportal-peertest/src/jsonrpc.rs b/ethportal-peertest/src/jsonrpc.rs new file mode 100644 index 000000000..efe5a480c --- /dev/null +++ b/ethportal-peertest/src/jsonrpc.rs @@ -0,0 +1,161 @@ +use std::io::prelude::*; +use std::os::unix::net::UnixStream; +use std::slice::Iter; + +use hyper::{self, Body, Client, Method, Request}; +use log::info; +use serde_json::{self, Value}; +use thiserror::Error; + +use trin_core::portalnet::U256; + +#[derive(Copy, Clone)] +pub struct JsonRpcEndpoint { + pub method: &'static str, + pub id: &'static u8, +} + +const ALL_ENDPOINTS: [JsonRpcEndpoint; 6] = [ + JsonRpcEndpoint { + method: "web3_clientVersion", + id: &0, + }, + JsonRpcEndpoint { + method: "discv5_nodeInfo", + id: &1, + }, + JsonRpcEndpoint { + method: "discv5_routingTableInfo", + id: &2, + }, + JsonRpcEndpoint { + method: "eth_blockNumber", + id: &3, + }, + JsonRpcEndpoint { + method: "portalHistory_dataRadius", + id: &4, + }, + JsonRpcEndpoint { + method: "portalState_dataRadius", + id: &5, + }, +]; + +fn validate_endpoint_response(method: &str, result: &Value) { + match method { + "web3_clientVersion" => { + assert_eq!(result.as_str().unwrap(), "trin v0.1.0"); + } + "discv5_nodeInfo" => { + let enr = result.get("enr").unwrap(); + assert!(enr.is_string()); + assert!(enr.as_str().unwrap().contains("enr:")); + assert!(result.get("nodeId").unwrap().is_string()); + } + "discv5_routingTableInfo" => { + let local_key = result.get("localKey").unwrap(); + assert!(local_key.is_string()); + assert!(local_key.as_str().unwrap().contains("0x")); + assert!(result.get("buckets").unwrap().is_array()); + } + "eth_blockNumber" => { + assert!(result.is_string()); + assert!(result.as_str().unwrap().contains("0x")); + } + "portalHistory_dataRadius" => { + assert_eq!(result.as_str().unwrap(), U256::from(u64::MAX).to_string()); + } + "portalState_dataRadius" => { + assert_eq!(result.as_str().unwrap(), U256::from(u64::MAX).to_string()); + } + _ => panic!("Unsupported endpoint"), + }; + info!("{:?} returned a valid response.", method); +} + +impl JsonRpcEndpoint { + pub fn all_endpoints() -> Iter<'static, Self> { + ALL_ENDPOINTS.iter() + } + + pub fn to_jsonrpc(self) -> String { + format!( + r#" + {{ + "jsonrpc":"2.0", + "id": {}, + "method": "{}" + }}"#, + self.id, self.method + ) + } +} + +#[allow(clippy::never_loop)] +pub async fn test_jsonrpc_endpoints_over_ipc(target_ipc_path: String) { + for endpoint in JsonRpcEndpoint::all_endpoints() { + info!("Testing over IPC: {:?}", endpoint.method); + let mut stream = UnixStream::connect(&target_ipc_path).unwrap(); + let v: Value = serde_json::from_str(&endpoint.to_jsonrpc()).unwrap(); + let data = serde_json::to_vec(&v).unwrap(); + stream.write_all(&data).unwrap(); + stream.flush().unwrap(); + let deser = serde_json::Deserializer::from_reader(stream); + for obj in deser.into_iter::() { + let response_obj = obj.unwrap(); + match get_response_result(response_obj) { + Ok(result) => validate_endpoint_response(endpoint.method, &result), + Err(msg) => panic!( + "Jsonrpc error for {:?} endpoint: {:?}", + endpoint.method, msg + ), + } + // break out of loop here since EOF is not sent, and loop will hang + break; + } + } +} + +#[derive(Error, Debug)] +pub enum JsonRpcResponseError { + #[error("JsonRpc response contains an error: {0}")] + Error(String), + + #[error("Invalid JsonRpc response")] + Invalid(), +} + +fn get_response_result(response: Value) -> Result { + match response.get("result") { + Some(result) => Ok(result.clone()), + None => match response.get("error") { + Some(error) => Err(JsonRpcResponseError::Error(error.to_string())), + None => Err(JsonRpcResponseError::Invalid()), + }, + } +} + +pub async fn test_jsonrpc_endpoints_over_http(target_http_port: String) { + let client = Client::new(); + for endpoint in JsonRpcEndpoint::all_endpoints() { + info!("Testing over HTTP: {:?}", endpoint.method); + let json_string = endpoint.to_jsonrpc(); + let req = Request::builder() + .method(Method::POST) + .uri(format!("http://127.0.0.1:{}", target_http_port)) + .header("content-type", "application/json") + .body(Body::from(json_string)) + .unwrap(); + let resp = client.request(req).await.unwrap(); + let body = hyper::body::to_bytes(resp.into_body()).await.unwrap(); + let response_obj: Value = serde_json::from_slice(&body).unwrap(); + match get_response_result(response_obj) { + Ok(result) => validate_endpoint_response(endpoint.method, &result), + Err(msg) => panic!( + "Jsonrpc error for {:?} endpoint: {:?}", + endpoint.method, msg + ), + } + } +} diff --git a/ethportal-peertest/src/lib.rs b/ethportal-peertest/src/lib.rs index b1a0dec41..384dbb0d6 100644 --- a/ethportal-peertest/src/lib.rs +++ b/ethportal-peertest/src/lib.rs @@ -1,2 +1,3 @@ pub mod cli; pub mod events; +pub mod jsonrpc; diff --git a/ethportal-peertest/src/main.rs b/ethportal-peertest/src/main.rs index cf07e8ecd..841b02c07 100644 --- a/ethportal-peertest/src/main.rs +++ b/ethportal-peertest/src/main.rs @@ -1,14 +1,19 @@ -use ethportal_peertest::cli::PeertestConfig; -use ethportal_peertest::events::PortalnetEvents; -use log::info; use std::collections::HashMap; use std::sync::Arc; + +use log::info; use tokio::sync::RwLock; -use trin_core::portalnet::utp::UtpListener; + +use ethportal_peertest::cli::PeertestConfig; +use ethportal_peertest::events::PortalnetEvents; +use ethportal_peertest::jsonrpc::{ + test_jsonrpc_endpoints_over_http, test_jsonrpc_endpoints_over_ipc, +}; use trin_core::portalnet::{ discovery::Discovery, overlay::{OverlayConfig, OverlayProtocol}, types::{PortalnetConfig, ProtocolKind}, + utp::UtpListener, Enr, U256, }; use trin_core::utils::setup_overlay_db; @@ -77,9 +82,17 @@ async fn main() -> Result<(), Box> { .unwrap(); info!("State network Ping result: {:?}", ping_result); - tokio::signal::ctrl_c() - .await - .expect("failed to pause until ctrl-c"); + match peertest_config.target_transport.as_str() { + "ipc" => test_jsonrpc_endpoints_over_ipc(peertest_config.target_ipc_path).await, + "http" => test_jsonrpc_endpoints_over_http(peertest_config.target_http_port).await, + _ => panic!( + "Invalid target-transport provided: {:?}", + peertest_config.target_transport + ), + } + + info!("All tests passed successfully!"); + std::process::exit(1); }) .await .unwrap(); diff --git a/trin-core/src/cli.rs b/trin-core/src/cli.rs index b05a866b6..787abeb5f 100644 --- a/trin-core/src/cli.rs +++ b/trin-core/src/cli.rs @@ -6,8 +6,8 @@ use std::ffi::OsString; use std::net::SocketAddr; use structopt::StructOpt; -const DEFAULT_WEB3_IPC_PATH: &str = "/tmp/trin-jsonrpc.ipc"; -const DEFAULT_WEB3_HTTP_PORT: &str = "8545"; +pub const DEFAULT_WEB3_IPC_PATH: &str = "/tmp/trin-jsonrpc.ipc"; +pub const DEFAULT_WEB3_HTTP_PORT: &str = "8545"; const DEFAULT_DISCOVERY_PORT: &str = "9000"; pub const HISTORY_NETWORK: &str = "history"; pub const STATE_NETWORK: &str = "state"; diff --git a/trin-core/src/jsonrpc/service.rs b/trin-core/src/jsonrpc/service.rs index bda84dd5a..3b5354ab7 100644 --- a/trin-core/src/jsonrpc/service.rs +++ b/trin-core/src/jsonrpc/service.rs @@ -1,25 +1,25 @@ -use super::types::JsonRequest; -use crate::cli::TrinConfig; use std::io::{self, BufRead, Read, Write}; use std::net::{TcpListener, TcpStream}; +#[cfg(unix)] +use std::os::unix; use std::str::FromStr; use std::sync::Mutex; use std::{fs, panic, process}; -#[cfg(unix)] -use std::os::unix; - -use crate::jsonrpc::endpoints::TrinEndpoint; -use crate::jsonrpc::types::PortalJsonRpcRequest; use httparse; -use log::{debug, info}; +use log::{debug, info, warn}; use serde_json::{json, Value}; +use thiserror::Error; use threadpool::ThreadPool; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use ureq; use validator::Validate; +use crate::cli::TrinConfig; +use crate::jsonrpc::endpoints::TrinEndpoint; +use crate::jsonrpc::types::{JsonRequest, PortalJsonRpcRequest}; + lazy_static! { static ref IPC_PATH: Mutex = Mutex::new(String::new()); } @@ -114,6 +114,11 @@ fn launch_http_client( trin_config: TrinConfig, portal_tx: UnboundedSender, ) { + ctrlc::set_handler(move || { + std::process::exit(1); + }) + .expect("Error setting Ctrl-C handler."); + let uri = format!("0.0.0.0:{}", trin_config.web3_http_port); let listener = TcpListener::bind(uri).unwrap(); for stream in listener.incoming() { @@ -131,6 +136,7 @@ fn launch_http_client( } }; } + info!("Clean exit"); } fn serve_ipc_client( @@ -154,6 +160,7 @@ fn serve_ipc_client( Err(e) => format!("Unsupported trin request: {}", e).into_bytes(), }; tx.write_all(&formatted_response).unwrap(); + tx.flush().unwrap(); } Err(e) => { debug!("An error occurred while parsing the JSON text. {}", e); @@ -178,10 +185,22 @@ fn serve_http_client( // Mark the bytes read as consumed so the buffer will not return them in a subsequent read reader.consume(received.len()); - let http_body = parse_http_body(received).unwrap(); + let http_body = match parse_http_body(received) { + Ok(val) => val, + Err(msg) => { + respond_with_parsing_error(stream, msg.to_string()); + return; + } + }; let deser = serde_json::Deserializer::from_str(&http_body); for obj in deser.into_iter::() { - let obj = obj.unwrap(); + let obj = match obj { + Ok(val) => val, + Err(msg) => { + respond_with_parsing_error(stream, msg.to_string()); + break; + } + }; let formatted_response = match obj.validate() { Ok(_) => process_http_request(obj, infura_url, portal_tx.clone()), Err(e) => format!("HTTP/1.1 400 BAD REQUEST\r\n\r\n{}", e).into_bytes(), @@ -191,20 +210,37 @@ fn serve_http_client( } } -fn parse_http_body(buf: Vec) -> Result { +fn respond_with_parsing_error(mut stream: TcpStream, msg: String) { + warn!("Error parsing http request: {:?}", msg); + let resp = format!("HTTP/1.1 400 BAD REQUEST\r\n\r\n{}", msg).into_bytes(); + stream.write_all(&resp).unwrap(); + stream.flush().unwrap(); +} + +#[derive(Error, Debug)] +pub enum HttpParseError { + #[error("Unable to parse http request: {0}")] + InvalidRequest(String), +} + +fn parse_http_body(buf: Vec) -> Result { let mut headers = [httparse::EMPTY_HEADER; 16]; let mut req = httparse::Request::new(&mut headers); let body_offset = match req.parse(&buf) { Ok(val) => match val { httparse::Status::Complete(offset) => offset, - httparse::Status::Partial => return Err("Error parsing http request.".to_owned()), + httparse::Status::Partial => { + return Err(HttpParseError::InvalidRequest( + "Http buffer parse incomplete".to_owned(), + )) + } }, - Err(msg) => return Err(format!("Error parsing http request: {:?}", msg)), + Err(msg) => return Err(HttpParseError::InvalidRequest(msg.to_string())), }; let body = buf[body_offset..buf.len()].to_vec(); match String::from_utf8(body) { Ok(val) => Ok(val), - Err(msg) => Err(format!("Error parsing http request: {:?}", msg)), + Err(msg) => Err(HttpParseError::InvalidRequest(msg.to_string())), } } @@ -270,7 +306,6 @@ fn dispatch_trin_request( // Handle all requests served by infura fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result { - //Re-encode json to proxy to Infura match proxy_to_url(&obj, infura_url) { Ok(result_body) => Ok(std::str::from_utf8(&result_body).unwrap().to_owned()), Err(err) => Err(json!({