Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests for unstable-reconnecting-rpc-client #1711

Merged
merged 8 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ exclude = [
"testing/wasm-lightclient-tests",
"signer/wasm-tests",
"examples/wasm-example",
"examples/parachain-example"
"examples/parachain-example",
]
resolver = "2"

Expand Down Expand Up @@ -125,7 +125,12 @@ rand = "0.8.5"
pin-project = "1.1.5"

# Light client wasm:
web-sys = { version = "0.3.69", features = ["BinaryType", "CloseEvent", "MessageEvent", "WebSocket"] }
web-sys = { version = "0.3.69", features = [
"BinaryType",
"CloseEvent",
"MessageEvent",
"WebSocket",
] }
wasm-bindgen = "0.2.92"
send_wrapper = "0.6.0"
js-sys = "0.3.69"
Expand Down
6 changes: 5 additions & 1 deletion testing/integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ unstable-light-client-long-running = ["subxt/unstable-light-client"]
# the default one which relies on the "old" RPC methods.
unstable-backend-client = []

# Enable the unstable reconnecting rpc-client
unstable-reconnecting-rpc-client = []
pkhry marked this conversation as resolved.
Show resolved Hide resolved


pkhry marked this conversation as resolved.
Show resolved Hide resolved
[dev-dependencies]
assert_matches = { workspace = true }
codec = { package = "parity-scale-codec", workspace = true, features = ["derive", "bit-vec"] }
Expand All @@ -36,7 +40,7 @@ serde = { workspace = true }
scale-info = { workspace = true, features = ["bit-vec"] }
sp-core = { workspace = true }
syn = { workspace = true }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat"] }
subxt = { workspace = true, features = ["unstable-metadata", "native", "jsonrpsee", "substrate-compat", "unstable-reconnecting-rpc-client"] }
subxt-signer = { workspace = true, features = ["default"] }
subxt-codegen = { workspace = true }
subxt-metadata = { workspace = true }
Expand Down
39 changes: 38 additions & 1 deletion testing/integration-tests/src/full_client/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// see LICENSE for license details.

use crate::{
subxt_test, test_context,
subxt_test, test_context, test_context_unstable_rpc_client,
utils::{node_runtime, wait_for_blocks},
};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -409,3 +409,40 @@ async fn partial_fee_estimate_correct() {
// Both methods should yield the same fee
assert_eq!(partial_fee_1, partial_fee_2);
}

#[cfg(feature = "unstable-reconnecting-rpc-client")]
pkhry marked this conversation as resolved.
Show resolved Hide resolved
#[subxt_test]
async fn transaction_validation_with_reconnection() {
let ctx = test_context_unstable_rpc_client().await;
let api = ctx.client();

let alice = dev::alice();
let bob = dev::bob();

wait_for_blocks(&api).await;

let _ctx = ctx.restart().await;

let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob.public_key().into(), 10_000);

let signed_extrinsic = api
.tx()
.create_signed(&tx, &alice, Default::default())
.await
.unwrap();

signed_extrinsic
.validate()
.await
.expect("validation failed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.validate() obtains the latest block ref and then calls some runtime API, but I'm not sure how much that is testing the reconnecting logic, so curious what you think about this? I guess the point is that Subxt reconnects and doesn't start throwing out errors for every call, which is definitely a good start!

A couple of other tests that we might want to add:

  • Subscribe to storage entries and restart the node while streaming them, checking that you don't miss any inbetween or start over or whatever. (might require some setup though to get some storage entries of interest, or maybe subscribe to accountIds and expect the dev ones or something).

  • Subscribe to finalized blocks and restart while this is happening, checking that the block numbers of returned blocks follow eachother nicely regardless (though 6s block times means you'd probably not often screw up)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscribe to finalized blocks and restart while this is happening, checking that the block numbers of returned blocks follow eachother nicely regardless (though 6s block times means you'd probably not often screw up)

This will return a toplevel error instead of retrying in the stream currently.

Subscribe to storage entries and restart the node while streaming them, checking that you don't miss any inbetween or start over or whatever. (might require some setup though to get some storage entries of interest, or maybe subscribe to accountIds and expect the dev ones or something).
I've rewritten the test to check that clients follow the same rules for streaming finalized blocks


signed_extrinsic
.submit_and_watch()
.await
.unwrap()
.wait_for_finalized_success()
.await
.unwrap();
}
11 changes: 9 additions & 2 deletions testing/integration-tests/src/utils/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ pub(crate) use crate::{node_runtime, utils::TestNodeProcess};
use subxt::client::OnlineClient;
use subxt::SubstrateConfig;

use super::node_proc::RpcClientKind;

/// `substrate-node` should be installed on the $PATH. We fall back
/// to also checking for an older `substrate` binary.
const SUBSTRATE_NODE_PATHS: &str = "substrate-node,substrate";

pub async fn test_context_with(authority: String) -> TestContext {
pub async fn test_context_with(authority: String, rpc_client_kind: RpcClientKind) -> TestContext {
let paths =
std::env::var("SUBSTRATE_NODE_PATH").unwrap_or_else(|_| SUBSTRATE_NODE_PATHS.to_string());
let paths: Vec<_> = paths.split(',').map(|p| p.trim()).collect();

let mut proc = TestContext::build(&paths);
proc.with_authority(authority);
proc.with_rpc_client_kind(rpc_client_kind);
proc.spawn::<SubstrateConfig>().await.unwrap()
}

Expand All @@ -28,5 +31,9 @@ pub type TestContext = TestNodeProcess<SubstrateConfig>;
pub type TestClient = OnlineClient<SubstrateConfig>;

pub async fn test_context() -> TestContext {
test_context_with("alice".to_string()).await
test_context_with("alice".to_string(), RpcClientKind::Legacy).await
}

pub async fn test_context_unstable_rpc_client() -> TestContext {
pkhry marked this conversation as resolved.
Show resolved Hide resolved
test_context_with("alice".to_string(), RpcClientKind::UnstableReconnecting).await
}
59 changes: 46 additions & 13 deletions testing/integration-tests/src/utils/node_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
use std::cell::RefCell;
use std::ffi::{OsStr, OsString};
use std::sync::Arc;
use std::time::Duration;
use substrate_runner::SubstrateNode;
use subxt::backend::rpc::reconnecting_rpc_client::{
Client as UnstableReconnectingRpcClient, ExponentialBackoff,
};
use subxt::{
backend::{legacy, rpc, unstable},
Config, OnlineClient,
Expand Down Expand Up @@ -58,26 +62,29 @@ where
TestNodeProcessBuilder::new(paths)
}

pub async fn restart(mut self) -> Self {
tokio::task::spawn_blocking(move || {
if let Some(ref mut proc) = &mut self.proc {
let _ = proc.restart().unwrap();
}
self
})
.await
.expect("to succeed")
}

/// Hand back an RPC client connected to the test node which exposes the legacy RPC methods.
pub async fn legacy_rpc_methods(&self) -> legacy::LegacyRpcMethods<R> {
let rpc_client = self.rpc_client().await;
let rpc_client = self.rpc_client.clone();
legacy::LegacyRpcMethods::new(rpc_client)
}

/// Hand back an RPC client connected to the test node which exposes the unstable RPC methods.
pub async fn unstable_rpc_methods(&self) -> unstable::UnstableRpcMethods<R> {
let rpc_client = self.rpc_client().await;
let rpc_client = self.rpc_client.clone();
unstable::UnstableRpcMethods::new(rpc_client)
}

/// Hand back an RPC client connected to the test node.
pub async fn rpc_client(&self) -> rpc::RpcClient {
let url = get_url(self.proc.as_ref().map(|p| p.ws_port()));
rpc::RpcClient::from_url(url)
.await
.expect("Unable to connect RPC client to test node")
}

/// Always return a client using the unstable backend.
/// Only use for comparing backends; use [`TestNodeProcess::client()`] normally,
/// which enables us to run each test against both backends.
Expand Down Expand Up @@ -111,10 +118,17 @@ where
}
}

/// Kind of rpc client to use in tests
pub enum RpcClientKind {
Legacy,
UnstableReconnecting,
}

/// Construct a test node process.
pub struct TestNodeProcessBuilder {
node_paths: Vec<OsString>,
authority: Option<String>,
rpc_client: RpcClientKind,
}

impl TestNodeProcessBuilder {
Expand All @@ -132,9 +146,16 @@ impl TestNodeProcessBuilder {
Self {
node_paths: paths,
authority: None,
rpc_client: RpcClientKind::Legacy,
}
}

/// Set the testRunner to use a preferred RpcClient impl, ie Legacy or Unstable
pub fn with_rpc_client_kind(&mut self, rpc_client_kind: RpcClientKind) -> &mut Self {
self.rpc_client = rpc_client_kind;
self
}

/// Set the authority dev account for a node in validator mode e.g. --alice.
pub fn with_authority(&mut self, account: String) -> &mut Self {
self.authority = Some(account);
Expand All @@ -161,9 +182,11 @@ impl TestNodeProcessBuilder {
};

let ws_url = get_url(proc.as_ref().map(|p| p.ws_port()));
let rpc_client = build_rpc_client(&ws_url)
.await
.map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?;
let rpc_client = match self.rpc_client {
RpcClientKind::Legacy => build_rpc_client(&ws_url).await,
RpcClientKind::UnstableReconnecting => build_unstable_rpc_client(&ws_url).await,
}
.map_err(|e| format!("Failed to connect to node at {ws_url}: {e}"))?;

// Cache whatever client we build, and None for the other.
#[allow(unused_assignments, unused_mut)]
Expand Down Expand Up @@ -206,6 +229,16 @@ async fn build_rpc_client(ws_url: &str) -> Result<rpc::RpcClient, String> {
Ok(rpc_client)
}

async fn build_unstable_rpc_client(ws_url: &str) -> Result<rpc::RpcClient, String> {
let client = UnstableReconnectingRpcClient::builder()
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
.build(ws_url.to_string())
.await
.map_err(|e| format!("Cannot construct RPC client: {e}"))?;

Ok(rpc::RpcClient::new(client))
}

async fn build_legacy_client<T: Config>(
rpc_client: rpc::RpcClient,
) -> Result<OnlineClient<T>, String> {
Expand Down
69 changes: 68 additions & 1 deletion testing/substrate-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,27 @@ impl SubstrateNodeBuilder {
}

/// Spawn the node, handing back an object which, when dropped, will stop it.
pub fn spawn(self) -> Result<SubstrateNode, Error> {
pub fn spawn(mut self) -> Result<SubstrateNode, Error> {
// Try to spawn the binary at each path, returning the
// first "ok" or last error that we encountered.
let mut res = Err(io::Error::new(
io::ErrorKind::Other,
"No binary path provided",
));

let path = Command::new("mktemp")
.arg("-d")
.output()
.expect("failed to create base dir");
let path = String::from_utf8(path.stdout).expect("bad path");
let mut bin_path = OsString::new();
for binary_path in &self.binary_paths {
self.custom_flags
.insert("base-path".into(), Some(path.clone().into()));

res = SubstrateNodeBuilder::try_spawn(binary_path, &self.custom_flags);
if res.is_ok() {
bin_path.clone_from(binary_path);
break;
}
}
Expand All @@ -98,10 +109,13 @@ impl SubstrateNodeBuilder {
let p2p_port = p2p_port.ok_or(Error::CouldNotExtractP2pPort)?;

Ok(SubstrateNode {
binary_path: bin_path,
custom_flags: self.custom_flags,
proc,
ws_port,
p2p_address,
p2p_port,
base_path: path,
})
}

Expand Down Expand Up @@ -131,10 +145,13 @@ impl SubstrateNodeBuilder {
}

pub struct SubstrateNode {
binary_path: OsString,
custom_flags: HashMap<CowStr, Option<CowStr>>,
proc: process::Child,
ws_port: u16,
p2p_address: String,
p2p_port: u32,
base_path: String,
}

impl SubstrateNode {
Expand Down Expand Up @@ -167,11 +184,61 @@ impl SubstrateNode {
pub fn kill(&mut self) -> std::io::Result<()> {
self.proc.kill()
}

/// restart the node, handing back an object which, when dropped, will stop it.
pub fn restart(&mut self) -> Result<(), std::io::Error> {
let res: Result<(), io::Error> = self.kill();

match res {
Ok(_) => (),
Err(e) => {
self.cleanup();
return Err(e);
}
}

let proc = self.try_spawn()?;

self.proc = proc;
// Wait for RPC port to be logged (it's logged to stderr).

Ok(())
}

// Attempt to spawn a binary with the path/flags given.
fn try_spawn(&mut self) -> Result<Child, std::io::Error> {
let mut cmd = Command::new(&self.binary_path);

cmd.env("RUST_LOG", "info,libp2p_tcp=debug")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.arg("--dev");

for (key, val) in &self.custom_flags {
let arg = match val {
Some(val) => format!("--{key}={val}"),
None => format!("--{key}"),
};
cmd.arg(arg);
}

cmd.arg(format!("--rpc-port={}", self.ws_port));
cmd.arg(format!("--port={}", self.p2p_port));
cmd.spawn()
}

fn cleanup(&self) {
let _ = Command::new("rm")
.args(["-rf", &self.base_path])
.output()
.expect("success");
}
}

impl Drop for SubstrateNode {
fn drop(&mut self) {
let _ = self.kill();
self.cleanup()
}
}

Expand Down
5 changes: 4 additions & 1 deletion testing/test-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ serde = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util = { workspace = true, features = ["compat"] }
which = { workspace = true }
jsonrpsee = { workspace = true, features = ["async-client", "client-ws-transport-tls"] }
jsonrpsee = { workspace = true, features = [
"async-client",
"client-ws-transport-tls",
] }
hex = { workspace = true }
codec = { workspace = true }

Expand Down