Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests: parse port from substrate binary output to avoid races #501

Merged
merged 4 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion subxt/tests/integration/utils/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub async fn test_node_process_with(

let proc = TestNodeProcess::<DefaultConfig>::build(path.as_str())
.with_authority(key)
.scan_for_open_ports()
.spawn::<DefaultConfig>()
.await;
proc.unwrap()
Expand Down
126 changes: 48 additions & 78 deletions subxt/tests/integration/utils/node_proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ use std::{
OsStr,
OsString,
},
net::TcpListener,
io::{
BufRead,
BufReader,
Read,
},
process,
thread,
time,
};
use subxt::{
Client,
Expand Down Expand Up @@ -79,7 +81,6 @@ where
pub struct TestNodeProcessBuilder {
node_path: OsString,
authority: Option<AccountKeyring>,
scan_port_range: bool,
}

impl TestNodeProcessBuilder {
Expand All @@ -90,7 +91,6 @@ impl TestNodeProcessBuilder {
Self {
node_path: node_path.as_ref().into(),
authority: None,
scan_port_range: false,
}
}

Expand All @@ -100,81 +100,46 @@ impl TestNodeProcessBuilder {
self
}

/// Enable port scanning to scan for open ports.
///
/// Allows spawning multiple node instances for tests to run in parallel.
pub fn scan_for_open_ports(&mut self) -> &mut Self {
self.scan_port_range = true;
self
}

/// Spawn the substrate node at the given path, and wait for rpc to be initialized.
pub async fn spawn<R>(&self) -> Result<TestNodeProcess<R>, String>
where
R: Config,
{
let mut cmd = process::Command::new(&self.node_path);
cmd.env("RUST_LOG", "error").arg("--dev").arg("--tmp");
cmd.env("RUST_LOG", "info")
.arg("--dev")
.arg("--tmp")
.stdout(process::Stdio::piped())
.stderr(process::Stdio::piped())
.arg("--port=0")
.arg("--rpc-port=0")
.arg("--ws-port=0");

if let Some(authority) = self.authority {
let authority = format!("{:?}", authority);
let arg = format!("--{}", authority.as_str().to_lowercase());
cmd.arg(arg);
}

let ws_port = if self.scan_port_range {
let (p2p_port, http_port, ws_port) = next_open_port()
.ok_or_else(|| "No available ports in the given port range".to_owned())?;

cmd.arg(format!("--port={}", p2p_port));
cmd.arg(format!("--rpc-port={}", http_port));
cmd.arg(format!("--ws-port={}", ws_port));
ws_port
} else {
// the default Websockets port
9944
};

let ws_url = format!("ws://127.0.0.1:{}", ws_port);

let mut proc = cmd.spawn().map_err(|e| {
format!(
"Error spawning substrate node '{}': {}",
self.node_path.to_string_lossy(),
e
)
})?;
// wait for rpc to be initialized
const MAX_ATTEMPTS: u32 = 6;
let mut attempts = 1;
let mut wait_secs = 1;
let client = loop {
thread::sleep(time::Duration::from_secs(wait_secs));
log::info!(
"Connecting to contracts enabled node, attempt {}/{}",
attempts,
MAX_ATTEMPTS
);
let result = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match result {
Ok(client) => break Ok(client),
Err(err) => {
if attempts < MAX_ATTEMPTS {
attempts += 1;
wait_secs *= 2; // backoff
continue
}
break Err(err)
}
}
};

// Wait for RPC port to be logged (it's logged to stderr):
let stderr = proc.stderr.take().unwrap();
let ws_port = find_substrate_port_from_output(stderr);
let ws_url = format!("ws://127.0.0.1:{}", ws_port);

// Connect to the node with a subxt client:
let client = ClientBuilder::new().set_url(ws_url.clone()).build().await;
match client {
Ok(client) => Ok(TestNodeProcess { proc, client }),
Err(err) => {
let err = format!(
"Failed to connect to node rpc at {} after {} attempts: {}",
ws_url, attempts, err
);
let err = format!("Failed to connect to node rpc at {}: {}", ws_url, err);
log::error!("{}", err);
proc.kill().map_err(|e| {
format!("Error killing substrate process '{}': {}", proc.id(), e)
Expand All @@ -185,25 +150,30 @@ impl TestNodeProcessBuilder {
}
}

/// Returns the next set of 3 open ports.
///
/// Returns None if there are not 3 open ports available.
fn next_open_port() -> Option<(u16, u16, u16)> {
// Ask the kernel to allocate a port.
let next_port = || {
match TcpListener::bind(("127.0.0.1", 0)) {
Ok(listener) => {
if let Ok(address) = listener.local_addr() {
Some(address.port())
} else {
None
}
}
Err(_) => None,
}
};

// The ports allocated should be different, unless in
// the unlikely case that the system has less than 3 available ports.
Some((next_port()?, next_port()?, next_port()?))
// Consume a stderr reader from a spawned substrate command and
// locate the port number that is logged out to it.
fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 {
BufReader::new(r)
.lines()
.find_map(|line| {
let line = line
.expect("failed to obtain next line from stdout for port discovery");

// does the line contain our port (we expect this specific output from substrate).
let line_end = match line.rsplit_once("Listening for new connections on 127.0.0.1:") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not work once we update to jsonrpsee in substrate but a later question I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DQ: Why wont this work with the update? Just out of curiousity.

Copy link
Collaborator Author

@jsdw jsdw Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, different log output for the port I guess?

(I wonder what the log output will be? it should be easy enough to look for both when the time comes, to continue to support older nodes :))

(I do wish there was a better way to find the port that a thing is running on; maybe I should look into this a bit more! @lexnv any ideas?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never tried it, but maybe you could use something like https://docs.rs/netstat/latest/netstat/ to find the open ports of the spawned process, then might be easy enough to find the websockets port?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh thanks for the pointer; I might have a look into this!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah, different log output for the port I guess?

Yeah, but we could change to have this log in jsonrpsee it's quite useful.

None => return None,
Some((_, after)) => after
};

// trim non-numeric chars from the end of the port part of the line.
let port_str = line_end.trim_end_matches(|b| !('0'..='9').contains(&b));

// expect to have a number here (the chars after '127.0.0.1:') and parse them into a u16.
let port_num = port_str
.parse()
.unwrap_or_else(|_| panic!("valid port expected on 'Listening for new connections' line, got '{port_str}'"));

Some(port_num)
})
.expect("We should find a port before the reader ends")
}
2 changes: 1 addition & 1 deletion test-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ codec = { package = "parity-scale-codec", version = "3.0.0", default-features =
[build-dependencies]
subxt = { path = "../subxt" }
sp-core = "6.0.0"
tokio = { version = "1.8", features = ["rt", "macros"] }
tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] }
which = "4.2.2"