Skip to content

Commit 7a0644f

Browse files
feat(iroh)!: allow dynamic changing of the RelayMap of an endpoint (#3522)
## Description Closes #3471 ## Breaking Changes - added - `iroh::Endpoint::insert_relay` - `iroh::Endpoint::remove_relay` - `iroh::Endpoint::RelayMap::insert` - `iroh::Endpoint::RelayMap::remove` - changed - `iroh_relay::RelayMap::urls` - `iroh_relay::RelayMap::nodes` - `iroh_relay::RelayMap::get_node` ## Notes & open questions - Are the changes to the API of `RelayMap` acceptable? - Is this enough?
1 parent b2a55bf commit 7a0644f

File tree

6 files changed

+232
-29
lines changed

6 files changed

+232
-29
lines changed

iroh-relay/src/relay_map.rs

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,45 @@
11
//! based on tailscale/tailcfg/derpmap.go
22
3-
use std::{collections::BTreeMap, fmt, sync::Arc};
3+
use std::{
4+
collections::BTreeMap,
5+
fmt,
6+
sync::{Arc, RwLock},
7+
};
48

59
use iroh_base::RelayUrl;
610
use serde::{Deserialize, Serialize};
711

812
use crate::defaults::DEFAULT_RELAY_QUIC_PORT;
913

1014
/// Configuration of all the relay servers that can be used.
11-
#[derive(Debug, Clone, PartialEq, Eq)]
15+
#[derive(Debug, Clone)]
1216
pub struct RelayMap {
1317
/// A map of the different relay IDs to the [`RelayNode`] information
14-
nodes: Arc<BTreeMap<RelayUrl, Arc<RelayNode>>>,
18+
nodes: Arc<RwLock<BTreeMap<RelayUrl, Arc<RelayNode>>>>,
1519
}
1620

21+
impl PartialEq for RelayMap {
22+
fn eq(&self, other: &Self) -> bool {
23+
let this = self.nodes.read().expect("poisoned");
24+
let that = other.nodes.read().expect("poisoned");
25+
this.eq(&*that)
26+
}
27+
}
28+
29+
impl Eq for RelayMap {}
30+
1731
impl RelayMap {
1832
/// Returns the sorted relay URLs.
19-
pub fn urls(&self) -> impl Iterator<Item = &RelayUrl> {
20-
self.nodes.keys()
33+
pub fn urls<T>(&self) -> T
34+
where
35+
T: FromIterator<RelayUrl>,
36+
{
37+
self.nodes
38+
.read()
39+
.expect("poisoned")
40+
.keys()
41+
.cloned()
42+
.collect::<T>()
2143
}
2244

2345
/// Create an empty relay map.
@@ -28,39 +50,57 @@ impl RelayMap {
2850
}
2951

3052
/// Returns an `Iterator` over all known nodes.
31-
pub fn nodes(&self) -> impl Iterator<Item = &Arc<RelayNode>> {
32-
self.nodes.values()
53+
pub fn nodes<T>(&self) -> T
54+
where
55+
T: FromIterator<Arc<RelayNode>>,
56+
{
57+
self.nodes
58+
.read()
59+
.expect("poisoned")
60+
.values()
61+
.cloned()
62+
.collect::<T>()
3363
}
3464

3565
/// Is this a known node?
3666
pub fn contains_node(&self, url: &RelayUrl) -> bool {
37-
self.nodes.contains_key(url)
67+
self.nodes.read().expect("poisoned").contains_key(url)
3868
}
3969

4070
/// Get the given node.
41-
pub fn get_node(&self, url: &RelayUrl) -> Option<&Arc<RelayNode>> {
42-
self.nodes.get(url)
71+
pub fn get_node(&self, url: &RelayUrl) -> Option<Arc<RelayNode>> {
72+
self.nodes.read().expect("poisoned").get(url).cloned()
4373
}
4474

4575
/// How many nodes are known?
4676
pub fn len(&self) -> usize {
47-
self.nodes.len()
77+
self.nodes.read().expect("poisoned").len()
4878
}
4979

5080
/// Are there any nodes in this map?
5181
pub fn is_empty(&self) -> bool {
52-
self.nodes.is_empty()
82+
self.nodes.read().expect("poisoned").is_empty()
83+
}
84+
85+
/// Insert a new relay.
86+
pub fn insert(&self, url: RelayUrl, node: Arc<RelayNode>) -> Option<Arc<RelayNode>> {
87+
self.nodes.write().expect("poisoned").insert(url, node)
88+
}
89+
90+
/// Removes an existing relay by `RelayUrl`.
91+
pub fn remove(&self, url: &RelayUrl) -> Option<Arc<RelayNode>> {
92+
self.nodes.write().expect("poisoned").remove(url)
5393
}
5494
}
5595

5696
impl FromIterator<RelayNode> for RelayMap {
5797
fn from_iter<T: IntoIterator<Item = RelayNode>>(iter: T) -> Self {
5898
Self {
59-
nodes: Arc::new(
99+
nodes: Arc::new(RwLock::new(
60100
iter.into_iter()
61101
.map(|node| (node.url.clone(), Arc::new(node)))
62102
.collect(),
63-
),
103+
)),
64104
}
65105
}
66106
}
@@ -72,15 +112,17 @@ impl From<RelayUrl> for RelayMap {
72112
/// discovery ports.
73113
fn from(value: RelayUrl) -> Self {
74114
Self {
75-
nodes: Arc::new([(value.clone(), Arc::new(value.into()))].into()),
115+
nodes: Arc::new(RwLock::new(
116+
[(value.clone(), Arc::new(value.into()))].into(),
117+
)),
76118
}
77119
}
78120
}
79121

80122
impl From<RelayNode> for RelayMap {
81123
fn from(value: RelayNode) -> Self {
82124
Self {
83-
nodes: Arc::new([(value.url.clone(), Arc::new(value))].into()),
125+
nodes: Arc::new(RwLock::new([(value.url.clone(), Arc::new(value))].into())),
84126
}
85127
}
86128
}
@@ -92,11 +134,11 @@ impl FromIterator<RelayUrl> for RelayMap {
92134
/// discovery ports.
93135
fn from_iter<T: IntoIterator<Item = RelayUrl>>(iter: T) -> Self {
94136
Self {
95-
nodes: Arc::new(
137+
nodes: Arc::new(RwLock::new(
96138
iter.into_iter()
97139
.map(|url| (url.clone(), Arc::new(url.into())))
98140
.collect(),
99-
),
141+
)),
100142
}
101143
}
102144
}

iroh/src/endpoint.rs

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use std::{
2121
};
2222

2323
use ed25519_dalek::{VerifyingKey, pkcs8::DecodePublicKey};
24-
use iroh_base::{NodeAddr, NodeId, SecretKey};
25-
use iroh_relay::RelayMap;
24+
use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey};
25+
use iroh_relay::{RelayMap, RelayNode};
2626
use n0_future::time::Duration;
2727
use n0_watcher::Watcher;
2828
use nested_enum_utils::common_fields;
@@ -632,6 +632,24 @@ impl Endpoint {
632632
self.msock.endpoint().set_server_config(Some(server_config));
633633
}
634634

635+
/// Adds the provided configuration to the [`RelayMap`].
636+
///
637+
/// Replacing and returning any existing configuration for [`RelayUrl`].
638+
pub async fn insert_relay(
639+
&self,
640+
relay: RelayUrl,
641+
node: Arc<RelayNode>,
642+
) -> Option<Arc<RelayNode>> {
643+
self.msock.insert_relay(relay, node).await
644+
}
645+
646+
/// Removes the configuration from the [`RelayMap`] for the provided [`RelayUrl`].
647+
///
648+
/// Returns any existing configuration.
649+
pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayNode>> {
650+
self.msock.remove_relay(relay).await
651+
}
652+
635653
// # Methods for establishing connectivity.
636654

637655
/// Connects to a remote [`Endpoint`].
@@ -2430,6 +2448,115 @@ mod tests {
24302448
Ok(())
24312449
}
24322450

2451+
#[tokio::test]
2452+
#[traced_test]
2453+
async fn endpoint_relay_map_change() -> Result {
2454+
let (relay_map, relay_url, _guard1) = run_relay_server().await?;
2455+
let client = Endpoint::builder()
2456+
.insecure_skip_relay_cert_verify(true)
2457+
.relay_mode(RelayMode::Custom(relay_map.clone()))
2458+
.bind()
2459+
.await?;
2460+
let server = Endpoint::builder()
2461+
.insecure_skip_relay_cert_verify(true)
2462+
.relay_mode(RelayMode::Custom(relay_map))
2463+
.alpns(vec![TEST_ALPN.to_vec()])
2464+
.bind()
2465+
.await?;
2466+
2467+
let task = tokio::spawn({
2468+
let server = server.clone();
2469+
async move {
2470+
for i in 0..2 {
2471+
println!("accept: round {i}");
2472+
let Some(conn) = server.accept().await else {
2473+
snafu::whatever!("Expected an incoming connection");
2474+
};
2475+
let conn = conn.await.e()?;
2476+
let (mut send, mut recv) = conn.accept_bi().await.e()?;
2477+
let data = recv.read_to_end(1000).await.e()?;
2478+
send.write_all(&data).await.e()?;
2479+
send.finish().e()?;
2480+
conn.closed().await;
2481+
}
2482+
Ok::<_, Error>(())
2483+
}
2484+
});
2485+
2486+
server.online().await;
2487+
2488+
let mut addr = server.node_addr();
2489+
println!("round1: {:?}", addr);
2490+
2491+
// remove direct addrs to force relay usage
2492+
addr.direct_addresses.clear();
2493+
2494+
let conn = client.connect(addr, TEST_ALPN).await?;
2495+
let (mut send, mut recv) = conn.open_bi().await.e()?;
2496+
send.write_all(b"Hello, world!").await.e()?;
2497+
send.finish().e()?;
2498+
let data = recv.read_to_end(1000).await.e()?;
2499+
conn.close(0u32.into(), b"bye!");
2500+
2501+
assert_eq!(&data, b"Hello, world!");
2502+
2503+
// setup a second relay server
2504+
let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
2505+
let new_node = new_relay_map
2506+
.get_node(&new_relay_url)
2507+
.expect("missing node")
2508+
.clone();
2509+
dbg!(&new_relay_map);
2510+
2511+
let addr_watcher = server.watch_node_addr();
2512+
2513+
// add new new relay
2514+
assert!(
2515+
server
2516+
.insert_relay(new_relay_url.clone(), new_node.clone())
2517+
.await
2518+
.is_none()
2519+
);
2520+
// remove the old relay
2521+
assert!(server.remove_relay(&relay_url).await.is_some());
2522+
2523+
println!("------- changed ----- ");
2524+
2525+
let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
2526+
let mut stream = addr_watcher.stream();
2527+
while let Some(addr) = stream.next().await {
2528+
if addr.relay_url.as_ref() != Some(&relay_url) {
2529+
return addr;
2530+
}
2531+
}
2532+
panic!("failed to change relay");
2533+
})
2534+
.await
2535+
.e()?;
2536+
2537+
println!("round2: {:?}", addr);
2538+
assert_eq!(addr.relay_url, Some(new_relay_url));
2539+
2540+
// remove direct addrs to force relay usage
2541+
addr.direct_addresses.clear();
2542+
2543+
let conn = client.connect(addr, TEST_ALPN).await?;
2544+
let (mut send, mut recv) = conn.open_bi().await.e()?;
2545+
send.write_all(b"Hello, world!").await.e()?;
2546+
send.finish().e()?;
2547+
let data = recv.read_to_end(1000).await.e()?;
2548+
conn.close(0u32.into(), b"bye!");
2549+
2550+
task.await.e()??;
2551+
2552+
client.close().await;
2553+
server.close().await;
2554+
2555+
assert_eq!(&data, b"Hello, world!");
2556+
2557+
Ok(())
2558+
}
2559+
24332560
#[tokio::test]
24342561
#[traced_test]
24352562
async fn endpoint_bidi_send_recv() -> Result {

0 commit comments

Comments
 (0)