diff --git a/tests/common/interop.rs b/tests/common/interop.rs index ed17ea57b..78579adfd 100644 --- a/tests/common/interop.rs +++ b/tests/common/interop.rs @@ -1,5 +1,5 @@ #[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))] -pub use common::ForeignNode; +pub use common::{api_call, ForeignNode}; #[cfg(all(not(feature = "test_go_interop"), not(feature = "test_js_interop")))] #[allow(dead_code)] @@ -11,30 +11,35 @@ pub mod common { use libp2p::{core::PublicKey, Multiaddr, PeerId}; use rand::prelude::*; use serde::Deserialize; - use std::time::Duration; use std::{ env, fs, path::PathBuf, process::{Child, Command, Stdio}, - thread, }; - // this environment variable should point to the location of the foreign ipfs binary - #[cfg(feature = "test_go_interop")] - pub const ENV_IPFS_PATH: &str = "GO_IPFS_PATH"; - #[cfg(feature = "test_js_interop")] - pub const ENV_IPFS_PATH: &str = "JS_IPFS_PATH"; - + #[derive(Debug)] pub struct ForeignNode { pub dir: PathBuf, pub daemon: Child, pub id: PeerId, pub pk: PublicKey, pub addrs: Vec, + pub binary_path: String, + pub api_port: u16, } impl ForeignNode { + #[allow(dead_code)] pub fn new() -> ForeignNode { + use std::{io::Read, net::SocketAddr, str}; + + // this environment variable should point to the location of the foreign ipfs binary + #[cfg(feature = "test_go_interop")] + const ENV_IPFS_PATH: &str = "GO_IPFS_PATH"; + #[cfg(feature = "test_js_interop")] + const ENV_IPFS_PATH: &str = "JS_IPFS_PATH"; + + // obtain the path of the foreign ipfs binary from an environment variable let binary_path = env::vars() .find(|(key, _val)| key == ENV_IPFS_PATH) .unwrap_or_else(|| { @@ -42,11 +47,13 @@ pub mod common { }) .1; + // create the temporary directory for the repo etc let mut tmp_dir = env::temp_dir(); let mut rng = rand::thread_rng(); tmp_dir.push(&format!("ipfs_test_{}", rng.gen::())); let _ = fs::create_dir(&tmp_dir); + // initialize the node and assign the temporary directory to it Command::new(&binary_path) .env("IPFS_PATH", &tmp_dir) .arg("init") @@ -58,16 +65,44 @@ pub mod common { .status() .unwrap(); - let daemon = Command::new(&binary_path) + #[cfg(feature = "test_go_interop")] + let daemon_args = &["daemon", "--enable-pubsub-experiment"]; + #[cfg(feature = "test_js_interop")] + let daemon_args = &["daemon"]; + + // start the ipfs daemon + let mut daemon = Command::new(&binary_path) .env("IPFS_PATH", &tmp_dir) - .arg("daemon") - .stdout(Stdio::null()) + .args(daemon_args) + .stdout(Stdio::piped()) .spawn() .unwrap(); - // give the daemon a little bit of time to start - thread::sleep(Duration::from_secs(1)); + // read the stdout of the spawned daemon... + let mut buf = vec![0; 2048]; + let mut index = 0; + if let Some(ref mut stdout) = daemon.stdout { + while let Ok(read) = stdout.read(&mut buf[index..]) { + index += read; + if str::from_utf8(&buf).unwrap().contains("Daemon is ready") { + break; + } + } + } + + // ...so that the randomly assigned API port can be registered + let mut api_port = None; + for line in str::from_utf8(&buf).unwrap().lines() { + if line.contains("webui") { + let addr = line.rsplitn(2, ' ').next().unwrap(); + let addr = addr.strip_prefix("http://").unwrap(); + let addr: SocketAddr = addr.rsplitn(2, '/').nth(1).unwrap().parse().unwrap(); + api_port = Some(addr.port()); + } + } + let api_port = api_port.unwrap(); + // run /id in order to register the PeerId, PublicKey and Multiaddrs assigned to the node let node_id = Command::new(&binary_path) .env("IPFS_PATH", &tmp_dir) .arg("id") @@ -95,6 +130,8 @@ pub mod common { id, pk, addrs: addresses, + binary_path, + api_port, } } @@ -111,6 +148,25 @@ pub mod common { } } + // this one is not a method on ForeignNode, as only its port number is needed and we don't + // want to restrict ourselves from calling it from spawned tasks or threads (or to make the + // internals of ForeignNode complicated by making it Clone) + #[allow(dead_code)] + pub async fn api_call>(api_port: u16, call: T) -> String { + let bytes = Command::new("curl") + .arg("-X") + .arg("POST") + .arg(&format!( + "http://127.0.0.1:{}/api/v0/{}", + api_port, + call.as_ref() + )) + .output() + .unwrap() + .stdout; + String::from_utf8(bytes).unwrap() + } + #[derive(Deserialize, Debug)] #[cfg_attr(feature = "test_go_interop", serde(rename_all = "PascalCase"))] #[cfg_attr(feature = "test_js_interop", serde(rename_all = "camelCase"))] diff --git a/tests/pubsub.rs b/tests/pubsub.rs index e8e8e5f74..f09984e80 100644 --- a/tests/pubsub.rs +++ b/tests/pubsub.rs @@ -138,3 +138,56 @@ async fn publish_between_two_nodes() { assert!(disappeared, "timed out before a saw b's unsubscription"); } + +#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))] +#[tokio::test(max_threads = 1)] +#[ignore = "doesn't work yet"] +async fn pubsub_interop() { + use common::interop::{api_call, ForeignNode}; + use futures::{future, pin_mut}; + + let rust_node = Node::new("rusty_boi").await; + let foreign_node = ForeignNode::new(); + let foreign_api_port = foreign_node.api_port; + + rust_node + .connect(foreign_node.addrs[0].clone()) + .await + .unwrap(); + + const TOPIC: &str = "shared"; + + let _rust_sub_stream = rust_node.pubsub_subscribe(TOPIC.to_string()).await.unwrap(); + + let foreign_sub_answer = future::maybe_done(api_call( + foreign_api_port, + format!("pubsub/sub?arg={}", TOPIC), + )); + pin_mut!(foreign_sub_answer); + assert_eq!(foreign_sub_answer.as_mut().output_mut(), None); + + // need to wait to see both sides so that the messages will get through + let mut appeared = false; + for _ in 0..100usize { + if rust_node + .pubsub_peers(Some(TOPIC.to_string())) + .await + .unwrap() + .contains(&foreign_node.id) + && api_call(foreign_api_port, &format!("pubsub/peers?arg={}", TOPIC)) + .await + .contains(&rust_node.id.to_string()) + { + appeared = true; + break; + } + timeout(Duration::from_millis(200), pending::<()>()) + .await + .unwrap_err(); + } + + assert!( + appeared, + "timed out before both nodes appeared as pubsub peers" + ); +}