Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit 4cdc096

Browse files
bors[bot]ljedrz
andauthored
Merge #369
369: Improved interop test functionalities, prepare a simple pubsub interop test r=aphelionz a=ljedrz Builds on #368, only the `test: ...` commits are new. Extends the interop test functionalities so that arbitrary HTTP API calls can be made for the foreign nodes; I attempted to use them in order to introduce a `pubsub` interop test, but it doesn't work yet. Since the underlying issue might cause the PR to grow, I'd prefer to merge it as-is, with the new `pubsub` interop test currently `#[ignore]`d. Blocked by #368. Co-authored-by: ljedrz <ljedrz@gmail.com>
2 parents 4c37ca8 + 46fc6a1 commit 4cdc096

File tree

2 files changed

+116
-8
lines changed

2 files changed

+116
-8
lines changed

tests/common/interop.rs

+63-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
2-
pub use common::ForeignNode;
2+
pub use common::{api_call, ForeignNode};
33

44
#[cfg(all(not(feature = "test_go_interop"), not(feature = "test_js_interop")))]
55
#[allow(dead_code)]
@@ -11,43 +11,49 @@ pub mod common {
1111
use libp2p::{core::PublicKey, Multiaddr, PeerId};
1212
use rand::prelude::*;
1313
use serde::Deserialize;
14-
use std::time::Duration;
1514
use std::{
1615
env, fs,
1716
path::PathBuf,
1817
process::{Child, Command, Stdio},
19-
thread,
2018
};
2119

20+
#[derive(Debug)]
2221
pub struct ForeignNode {
2322
pub dir: PathBuf,
2423
pub daemon: Child,
2524
pub id: PeerId,
2625
pub pk: PublicKey,
2726
pub addrs: Vec<Multiaddr>,
27+
pub binary_path: String,
28+
pub api_port: u16,
2829
}
2930

3031
impl ForeignNode {
3132
#[allow(dead_code)]
3233
pub fn new() -> ForeignNode {
34+
use std::{io::Read, net::SocketAddr, str};
35+
3336
// this environment variable should point to the location of the foreign ipfs binary
3437
#[cfg(feature = "test_go_interop")]
3538
const ENV_IPFS_PATH: &str = "GO_IPFS_PATH";
3639
#[cfg(feature = "test_js_interop")]
3740
const ENV_IPFS_PATH: &str = "JS_IPFS_PATH";
3841

42+
// obtain the path of the foreign ipfs binary from an environment variable
3943
let binary_path = env::vars()
4044
.find(|(key, _val)| key == ENV_IPFS_PATH)
4145
.unwrap_or_else(|| {
4246
panic!("the {} environment variable was not found", ENV_IPFS_PATH)
4347
})
4448
.1;
4549

50+
// create the temporary directory for the repo etc
4651
let mut tmp_dir = env::temp_dir();
4752
let mut rng = rand::thread_rng();
4853
tmp_dir.push(&format!("ipfs_test_{}", rng.gen::<u64>()));
4954
let _ = fs::create_dir(&tmp_dir);
5055

56+
// initialize the node and assign the temporary directory to it
5157
Command::new(&binary_path)
5258
.env("IPFS_PATH", &tmp_dir)
5359
.arg("init")
@@ -59,16 +65,44 @@ pub mod common {
5965
.status()
6066
.unwrap();
6167

62-
let daemon = Command::new(&binary_path)
68+
#[cfg(feature = "test_go_interop")]
69+
let daemon_args = &["daemon", "--enable-pubsub-experiment"];
70+
#[cfg(feature = "test_js_interop")]
71+
let daemon_args = &["daemon"];
72+
73+
// start the ipfs daemon
74+
let mut daemon = Command::new(&binary_path)
6375
.env("IPFS_PATH", &tmp_dir)
64-
.arg("daemon")
65-
.stdout(Stdio::null())
76+
.args(daemon_args)
77+
.stdout(Stdio::piped())
6678
.spawn()
6779
.unwrap();
6880

69-
// give the daemon a little bit of time to start
70-
thread::sleep(Duration::from_secs(1));
81+
// read the stdout of the spawned daemon...
82+
let mut buf = vec![0; 2048];
83+
let mut index = 0;
84+
if let Some(ref mut stdout) = daemon.stdout {
85+
while let Ok(read) = stdout.read(&mut buf[index..]) {
86+
index += read;
87+
if str::from_utf8(&buf).unwrap().contains("Daemon is ready") {
88+
break;
89+
}
90+
}
91+
}
92+
93+
// ...so that the randomly assigned API port can be registered
94+
let mut api_port = None;
95+
for line in str::from_utf8(&buf).unwrap().lines() {
96+
if line.contains("webui") {
97+
let addr = line.rsplitn(2, ' ').next().unwrap();
98+
let addr = addr.strip_prefix("http://").unwrap();
99+
let addr: SocketAddr = addr.rsplitn(2, '/').nth(1).unwrap().parse().unwrap();
100+
api_port = Some(addr.port());
101+
}
102+
}
103+
let api_port = api_port.unwrap();
71104

105+
// run /id in order to register the PeerId, PublicKey and Multiaddrs assigned to the node
72106
let node_id = Command::new(&binary_path)
73107
.env("IPFS_PATH", &tmp_dir)
74108
.arg("id")
@@ -96,6 +130,8 @@ pub mod common {
96130
id,
97131
pk,
98132
addrs: addresses,
133+
binary_path,
134+
api_port,
99135
}
100136
}
101137

@@ -112,6 +148,25 @@ pub mod common {
112148
}
113149
}
114150

151+
// this one is not a method on ForeignNode, as only its port number is needed and we don't
152+
// want to restrict ourselves from calling it from spawned tasks or threads (or to make the
153+
// internals of ForeignNode complicated by making it Clone)
154+
#[allow(dead_code)]
155+
pub async fn api_call<T: AsRef<str>>(api_port: u16, call: T) -> String {
156+
let bytes = Command::new("curl")
157+
.arg("-X")
158+
.arg("POST")
159+
.arg(&format!(
160+
"http://127.0.0.1:{}/api/v0/{}",
161+
api_port,
162+
call.as_ref()
163+
))
164+
.output()
165+
.unwrap()
166+
.stdout;
167+
String::from_utf8(bytes).unwrap()
168+
}
169+
115170
#[derive(Deserialize, Debug)]
116171
#[cfg_attr(feature = "test_go_interop", serde(rename_all = "PascalCase"))]
117172
#[cfg_attr(feature = "test_js_interop", serde(rename_all = "camelCase"))]

tests/pubsub.rs

+53
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,56 @@ async fn publish_between_two_nodes() {
138138

139139
assert!(disappeared, "timed out before a saw b's unsubscription");
140140
}
141+
142+
#[cfg(any(feature = "test_go_interop", feature = "test_js_interop"))]
143+
#[tokio::test(max_threads = 1)]
144+
#[ignore = "doesn't work yet"]
145+
async fn pubsub_interop() {
146+
use common::interop::{api_call, ForeignNode};
147+
use futures::{future, pin_mut};
148+
149+
let rust_node = Node::new("rusty_boi").await;
150+
let foreign_node = ForeignNode::new();
151+
let foreign_api_port = foreign_node.api_port;
152+
153+
rust_node
154+
.connect(foreign_node.addrs[0].clone())
155+
.await
156+
.unwrap();
157+
158+
const TOPIC: &str = "shared";
159+
160+
let _rust_sub_stream = rust_node.pubsub_subscribe(TOPIC.to_string()).await.unwrap();
161+
162+
let foreign_sub_answer = future::maybe_done(api_call(
163+
foreign_api_port,
164+
format!("pubsub/sub?arg={}", TOPIC),
165+
));
166+
pin_mut!(foreign_sub_answer);
167+
assert_eq!(foreign_sub_answer.as_mut().output_mut(), None);
168+
169+
// need to wait to see both sides so that the messages will get through
170+
let mut appeared = false;
171+
for _ in 0..100usize {
172+
if rust_node
173+
.pubsub_peers(Some(TOPIC.to_string()))
174+
.await
175+
.unwrap()
176+
.contains(&foreign_node.id)
177+
&& api_call(foreign_api_port, &format!("pubsub/peers?arg={}", TOPIC))
178+
.await
179+
.contains(&rust_node.id.to_string())
180+
{
181+
appeared = true;
182+
break;
183+
}
184+
timeout(Duration::from_millis(200), pending::<()>())
185+
.await
186+
.unwrap_err();
187+
}
188+
189+
assert!(
190+
appeared,
191+
"timed out before both nodes appeared as pubsub peers"
192+
);
193+
}

0 commit comments

Comments
 (0)