Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Improved interop test functionalities, prepare a simple pubsub interop test #369

Merged
merged 7 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 70 additions & 14 deletions tests/common/interop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
pub use common::ForeignNode;
pub use common::{api_call, ForeignNode};

#[cfg(all(not(feature = "test_go_interop"), not(feature = "test_js_interop")))]
#[allow(dead_code)]
Expand All @@ -11,42 +11,49 @@ pub mod common {
use libp2p::{core::PublicKey, Multiaddr, PeerId};
use rand::prelude::*;
use serde::Deserialize;
use std::time::Duration;
use std::{
env, fs,
path::PathBuf,
process::{Child, Command, Stdio},
thread,
};

// this environment variable should point to the location of the foreign ipfs binary
#[cfg(feature = "test_go_interop")]
pub const ENV_IPFS_PATH: &str = "GO_IPFS_PATH";
#[cfg(feature = "test_js_interop")]
pub const ENV_IPFS_PATH: &str = "JS_IPFS_PATH";

#[derive(Debug)]
pub struct ForeignNode {
pub dir: PathBuf,
pub daemon: Child,
pub id: PeerId,
pub pk: PublicKey,
pub addrs: Vec<Multiaddr>,
pub binary_path: String,
pub api_port: u16,
}

impl ForeignNode {
#[allow(dead_code)]
pub fn new() -> ForeignNode {
use std::{io::Read, net::SocketAddr, str};

// this environment variable should point to the location of the foreign ipfs binary
#[cfg(feature = "test_go_interop")]
const ENV_IPFS_PATH: &str = "GO_IPFS_PATH";
#[cfg(feature = "test_js_interop")]
const ENV_IPFS_PATH: &str = "JS_IPFS_PATH";

// obtain the path of the foreign ipfs binary from an environment variable
let binary_path = env::vars()
.find(|(key, _val)| key == ENV_IPFS_PATH)
.unwrap_or_else(|| {
panic!("the {} environment variable was not found", ENV_IPFS_PATH)
})
.1;

// create the temporary directory for the repo etc
let mut tmp_dir = env::temp_dir();
let mut rng = rand::thread_rng();
tmp_dir.push(&format!("ipfs_test_{}", rng.gen::<u64>()));
let _ = fs::create_dir(&tmp_dir);

// initialize the node and assign the temporary directory to it
Command::new(&binary_path)
.env("IPFS_PATH", &tmp_dir)
.arg("init")
Expand All @@ -58,16 +65,44 @@ pub mod common {
.status()
.unwrap();

let daemon = Command::new(&binary_path)
#[cfg(feature = "test_go_interop")]
let daemon_args = &["daemon", "--enable-pubsub-experiment"];
#[cfg(feature = "test_js_interop")]
let daemon_args = &["daemon"];

// start the ipfs daemon
let mut daemon = Command::new(&binary_path)
.env("IPFS_PATH", &tmp_dir)
.arg("daemon")
.stdout(Stdio::null())
.args(daemon_args)
.stdout(Stdio::piped())
.spawn()
.unwrap();

// give the daemon a little bit of time to start
thread::sleep(Duration::from_secs(1));
// read the stdout of the spawned daemon...
let mut buf = vec![0; 2048];
let mut index = 0;
if let Some(ref mut stdout) = daemon.stdout {
while let Ok(read) = stdout.read(&mut buf[index..]) {
index += read;
if str::from_utf8(&buf).unwrap().contains("Daemon is ready") {
break;
}
}
}

// ...so that the randomly assigned API port can be registered
let mut api_port = None;
for line in str::from_utf8(&buf).unwrap().lines() {
if line.contains("webui") {
let addr = line.rsplitn(2, ' ').next().unwrap();
let addr = addr.strip_prefix("http://").unwrap();
let addr: SocketAddr = addr.rsplitn(2, '/').nth(1).unwrap().parse().unwrap();
api_port = Some(addr.port());
}
}
let api_port = api_port.unwrap();

// run /id in order to register the PeerId, PublicKey and Multiaddrs assigned to the node
let node_id = Command::new(&binary_path)
.env("IPFS_PATH", &tmp_dir)
.arg("id")
Expand Down Expand Up @@ -95,6 +130,8 @@ pub mod common {
id,
pk,
addrs: addresses,
binary_path,
api_port,
}
}

Expand All @@ -111,6 +148,25 @@ pub mod common {
}
}

// this one is not a method on ForeignNode, as only its port number is needed and we don't
// want to restrict ourselves from calling it from spawned tasks or threads (or to make the
// internals of ForeignNode complicated by making it Clone)
#[allow(dead_code)]
pub async fn api_call<T: AsRef<str>>(api_port: u16, call: T) -> String {
let bytes = Command::new("curl")
.arg("-X")
.arg("POST")
.arg(&format!(
"http://127.0.0.1:{}/api/v0/{}",
api_port,
call.as_ref()
))
.output()
.unwrap()
.stdout;
String::from_utf8(bytes).unwrap()
}

#[derive(Deserialize, Debug)]
#[cfg_attr(feature = "test_go_interop", serde(rename_all = "PascalCase"))]
#[cfg_attr(feature = "test_js_interop", serde(rename_all = "camelCase"))]
Expand Down
53 changes: 53 additions & 0 deletions tests/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,56 @@ async fn publish_between_two_nodes() {

assert!(disappeared, "timed out before a saw b's unsubscription");
}

#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
#[tokio::test(max_threads = 1)]
#[ignore = "doesn't work yet"]
async fn pubsub_interop() {
use common::interop::{api_call, ForeignNode};
use futures::{future, pin_mut};

let rust_node = Node::new("rusty_boi").await;
let foreign_node = ForeignNode::new();
let foreign_api_port = foreign_node.api_port;

rust_node
.connect(foreign_node.addrs[0].clone())
.await
.unwrap();

const TOPIC: &str = "shared";

let _rust_sub_stream = rust_node.pubsub_subscribe(TOPIC.to_string()).await.unwrap();

let foreign_sub_answer = future::maybe_done(api_call(
foreign_api_port,
format!("pubsub/sub?arg={}", TOPIC),
));
pin_mut!(foreign_sub_answer);
assert_eq!(foreign_sub_answer.as_mut().output_mut(), None);

// need to wait to see both sides so that the messages will get through
let mut appeared = false;
for _ in 0..100usize {
if rust_node
.pubsub_peers(Some(TOPIC.to_string()))
.await
.unwrap()
.contains(&foreign_node.id)
&& api_call(foreign_api_port, &format!("pubsub/peers?arg={}", TOPIC))
.await
.contains(&rust_node.id.to_string())
{
appeared = true;
break;
}
timeout(Duration::from_millis(200), pending::<()>())
.await
.unwrap_err();
}

assert!(
appeared,
"timed out before both nodes appeared as pubsub peers"
);
}