Skip to content

Commit 02c09e2

Browse files
authored
Merge c70ad9e into 2f924d9
2 parents 2f924d9 + c70ad9e commit 02c09e2

File tree

4 files changed

+174
-47
lines changed

4 files changed

+174
-47
lines changed

iroh/src/endpoint/connection.rs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
discovery::DiscoveryTask,
4444
magicsock::{
4545
EndpointStateActorStoppedError,
46-
endpoint_map::{PathInfoList, PathsWatchable},
46+
endpoint_map::{PathInfoList, PathsWatcher},
4747
},
4848
};
4949

@@ -1213,7 +1213,7 @@ pub struct Connection {
12131213
inner: quinn::Connection,
12141214
remote_id: EndpointId,
12151215
alpn: Vec<u8>,
1216-
paths: PathsWatchable,
1216+
paths: PathsWatcher,
12171217
}
12181218

12191219
#[allow(missing_docs)]
@@ -1463,7 +1463,7 @@ impl Connection {
14631463
/// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected
14641464
/// [`PathInfo`]: crate::magicsock::PathInfo
14651465
pub fn paths(&self) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
1466-
self.paths.watch(self.inner.weak_handle())
1466+
self.paths.clone()
14671467
}
14681468

14691469
/// Derives keying material from this connection's TLS session secrets.
@@ -1511,16 +1511,21 @@ impl Connection {
15111511

15121512
#[cfg(test)]
15131513
mod tests {
1514+
use std::time::Duration;
1515+
15141516
use iroh_base::{EndpointAddr, SecretKey};
15151517
use n0_error::{Result, StackResultExt, StdResultExt};
1518+
use n0_future::StreamExt;
1519+
use n0_watcher::Watcher;
15161520
use rand::SeedableRng;
1517-
use tracing::{Instrument, info_span, trace_span};
1521+
use tracing::{Instrument, error_span, info_span, trace_span};
15181522
use tracing_test::traced_test;
15191523

15201524
use super::Endpoint;
15211525
use crate::{
15221526
RelayMode,
1523-
endpoint::{ConnectOptions, Incoming, ZeroRttStatus},
1527+
endpoint::{ConnectOptions, Incoming, PathInfo, PathInfoList, ZeroRttStatus},
1528+
test_utils::run_relay_server,
15241529
};
15251530

15261531
const TEST_ALPN: &[u8] = b"n0/iroh/test";
@@ -1730,4 +1735,94 @@ mod tests {
17301735
tokio::join!(client.close(), server.close());
17311736
Ok(())
17321737
}
1738+
1739+
#[tokio::test]
1740+
async fn test_paths_watcher() -> Result {
1741+
tracing_subscriber::fmt::init();
1742+
const ALPN: &[u8] = b"test";
1743+
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1744+
let (relay_map, _relay_map, _guard) = run_relay_server().await?;
1745+
let server = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1746+
.secret_key(SecretKey::generate(&mut rng))
1747+
.insecure_skip_relay_cert_verify(true)
1748+
.alpns(vec![ALPN.to_vec()])
1749+
.bind()
1750+
.await?;
1751+
1752+
let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1753+
.secret_key(SecretKey::generate(&mut rng))
1754+
.insecure_skip_relay_cert_verify(true)
1755+
.bind()
1756+
.await?;
1757+
1758+
server.online().await;
1759+
let server_addr = server.addr();
1760+
tracing::info!("server addr: {server_addr:?}");
1761+
1762+
let (conn_client, conn_server) = tokio::join!(
1763+
async { client.connect(server_addr, ALPN).await.unwrap() },
1764+
async { server.accept().await.unwrap().await.unwrap() }
1765+
);
1766+
tracing::info!("connected");
1767+
let mut paths_client = conn_client.paths().stream();
1768+
let mut paths_server = conn_server.paths().stream();
1769+
1770+
/// Advances the path stream until at least one IP and one relay paths are available.
1771+
///
1772+
/// Panics if the path stream finishes before that happens.
1773+
async fn wait_for_paths(
1774+
stream: &mut n0_watcher::Stream<impl n0_watcher::Watcher<Value = PathInfoList> + Unpin>,
1775+
) {
1776+
loop {
1777+
let paths = stream.next().await.expect("paths stream ended");
1778+
tracing::info!(?paths, "paths");
1779+
if paths.len() >= 2
1780+
&& paths.iter().any(PathInfo::is_relay)
1781+
&& paths.iter().any(PathInfo::is_ip)
1782+
{
1783+
tracing::info!("break");
1784+
return;
1785+
}
1786+
}
1787+
}
1788+
1789+
// Verify that both connections are notified of path changes and get an IP and a relay path.
1790+
tokio::join!(
1791+
async {
1792+
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_server))
1793+
.instrument(error_span!("paths-server"))
1794+
.await
1795+
.unwrap()
1796+
},
1797+
async {
1798+
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_client))
1799+
.instrument(error_span!("paths-client"))
1800+
.await
1801+
.unwrap()
1802+
}
1803+
);
1804+
1805+
// Close the client connection.
1806+
tracing::info!("close client conn");
1807+
conn_client.close(0u32.into(), b"");
1808+
1809+
// Verify that the path watch streams close.
1810+
assert_eq!(
1811+
tokio::time::timeout(Duration::from_secs(1), paths_client.next())
1812+
.await
1813+
.unwrap(),
1814+
None
1815+
);
1816+
assert_eq!(
1817+
tokio::time::timeout(Duration::from_secs(1), paths_server.next())
1818+
.await
1819+
.unwrap(),
1820+
None
1821+
);
1822+
1823+
server.close().await;
1824+
client.close().await;
1825+
1826+
Ok(())
1827+
}
17331828
}

iroh/src/magicsock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use crate::{
6464
disco::{self, SendAddr},
6565
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
6666
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
67-
magicsock::endpoint_map::PathsWatchable,
67+
magicsock::endpoint_map::PathsWatcher,
6868
metrics::EndpointMetrics,
6969
net_report::{self, IfStateDetails, Report},
7070
};
@@ -272,14 +272,14 @@ impl MagicSock {
272272
/// The actor is responsible for holepunching and opening additional paths to this
273273
/// connection.
274274
///
275-
/// Returns a future that resolves to [`PathsWatchable`].
275+
/// Returns a future that resolves to [`PathsWatcher`].
276276
///
277277
/// The returned future is `'static`, so it can be stored without being liftetime-bound to `&self`.
278278
pub(crate) fn register_connection(
279279
&self,
280280
remote: EndpointId,
281281
conn: WeakConnectionHandle,
282-
) -> impl Future<Output = Result<PathsWatchable, EndpointStateActorStoppedError>> + Send + 'static
282+
) -> impl Future<Output = Result<PathsWatcher, EndpointStateActorStoppedError>> + Send + 'static
283283
{
284284
let (tx, rx) = oneshot::channel();
285285
let sender = self.endpoint_map.endpoint_state_actor(remote);

iroh/src/magicsock/endpoint_map.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,22 @@ use serde::{Deserialize, Serialize};
1111
use tokio::sync::mpsc;
1212
use tracing::warn;
1313

14+
// #[cfg(any(test, feature = "test-utils"))]
15+
// use crate::endpoint::PathSelection;
16+
pub(super) use self::endpoint_state::EndpointStateMessage;
17+
pub(crate) use self::endpoint_state::PathsWatcher;
18+
use self::endpoint_state::{EndpointStateActor, EndpointStateHandle};
19+
pub use self::endpoint_state::{PathInfo, PathInfoList};
1420
use super::{
1521
DirectAddr, DiscoState, MagicsockMetrics,
1622
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
1723
transports::{self, TransportsSender},
1824
};
19-
use crate::disco::{self};
20-
// #[cfg(any(test, feature = "test-utils"))]
21-
// use crate::endpoint::PathSelection;
25+
use crate::disco;
2226

2327
mod endpoint_state;
2428
mod path_state;
2529

26-
pub(super) use endpoint_state::EndpointStateMessage;
27-
pub(crate) use endpoint_state::PathsWatchable;
28-
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
29-
pub use endpoint_state::{PathInfo, PathInfoList};
30-
3130
// TODO: use this
3231
// /// Number of endpoints that are inactive for which we keep info about. This limit is enforced
3332
// /// periodically via [`NodeMap::prune_inactive`].

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ impl EndpointStateActor {
335335
async fn handle_msg_add_connection(
336336
&mut self,
337337
handle: WeakConnectionHandle,
338-
tx: oneshot::Sender<PathsWatchable>,
338+
tx: oneshot::Sender<PathsWatcher>,
339339
) {
340340
let pub_open_paths = Watchable::default();
341341
if let Some(conn) = handle.upgrade() {
@@ -397,10 +397,11 @@ impl EndpointStateActor {
397397
}
398398
self.trigger_holepunching().await;
399399
}
400-
tx.send(PathsWatchable {
401-
open_paths: pub_open_paths,
402-
selected_path: self.selected_path.clone(),
403-
})
400+
tx.send(PathsWatcher::new(
401+
pub_open_paths.watch(),
402+
self.selected_path.watch(),
403+
handle,
404+
))
404405
.ok();
405406
}
406407

@@ -1002,7 +1003,7 @@ pub(crate) enum EndpointStateMessage {
10021003
/// needed, any new paths discovered via holepunching will be added. And closed paths
10031004
/// will be removed etc.
10041005
#[debug("AddConnection(..)")]
1005-
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatchable>),
1006+
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatcher>),
10061007
/// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable.
10071008
AddEndpointAddr(EndpointAddr, Source),
10081009
/// Process a received DISCO CallMeMaybe message.
@@ -1128,38 +1129,70 @@ impl ConnectionState {
11281129
}
11291130
}
11301131

1131-
/// Watchables for the open paths and selected transmission path in a connection.
1132+
/// Watcher for the open paths and selected transmission path in a connection.
11321133
///
11331134
/// This is stored in the [`Connection`], and the watchables are set from within the endpoint state actor.
11341135
///
1136+
/// Internally, this contains a boxed-mapped-joined watcher over the open paths in the connection and the
1137+
/// selected path to the remote endpoint. The watcher is boxed because the mapped-joined watcher with
1138+
/// `SmallVec<PathInfoList>` has a size of over 800 bytes, which we don't want to put upon the [`Connection`].
1139+
///
11351140
/// [`Connection`]: crate::endpoint::Connection
1136-
#[derive(Debug, Default, Clone)]
1137-
pub(crate) struct PathsWatchable {
1138-
/// Watchable for the open paths (in this connection).
1139-
open_paths: Watchable<PathAddrList>,
1140-
/// Watchable for the selected transmission path (global for this remote endpoint).
1141-
selected_path: Watchable<Option<transports::Addr>>,
1141+
#[derive(Clone, derive_more::Debug)]
1142+
#[debug("PathsWatcher")]
1143+
#[allow(clippy::type_complexity)]
1144+
pub(crate) struct PathsWatcher(
1145+
Box<
1146+
n0_watcher::Map<
1147+
(
1148+
n0_watcher::Direct<PathAddrList>,
1149+
n0_watcher::Direct<Option<transports::Addr>>,
1150+
),
1151+
PathInfoList,
1152+
>,
1153+
>,
1154+
);
1155+
1156+
impl n0_watcher::Watcher for PathsWatcher {
1157+
type Value = PathInfoList;
1158+
1159+
fn get(&mut self) -> Self::Value {
1160+
self.0.get()
1161+
}
1162+
1163+
fn is_connected(&self) -> bool {
1164+
self.0.is_connected()
1165+
}
1166+
1167+
fn poll_updated(
1168+
&mut self,
1169+
cx: &mut std::task::Context<'_>,
1170+
) -> Poll<Result<Self::Value, n0_watcher::Disconnected>> {
1171+
self.0.poll_updated(cx)
1172+
}
11421173
}
11431174

1144-
impl PathsWatchable {
1145-
pub(crate) fn watch(
1146-
&self,
1175+
impl PathsWatcher {
1176+
fn new(
1177+
open_paths: n0_watcher::Direct<PathAddrList>,
1178+
selected_path: n0_watcher::Direct<Option<transports::Addr>>,
11471179
conn_handle: WeakConnectionHandle,
1148-
) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
1149-
let joined_watcher = (self.open_paths.watch(), self.selected_path.watch());
1150-
joined_watcher.map(move |(open_paths, selected_path)| {
1151-
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);
1152-
let Some(conn) = conn_handle.upgrade() else {
1153-
return PathInfoList(Default::default());
1154-
};
1155-
let list = open_paths
1156-
.into_iter()
1157-
.flat_map(move |(remote, path_id)| {
1158-
PathInfo::new(path_id, &conn, remote, selected_path.as_ref())
1159-
})
1160-
.collect();
1161-
PathInfoList(list)
1162-
})
1180+
) -> Self {
1181+
Self(Box::new(open_paths.or(selected_path).map(
1182+
move |(open_paths, selected_path)| {
1183+
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);
1184+
let Some(conn) = conn_handle.upgrade() else {
1185+
return PathInfoList(Default::default());
1186+
};
1187+
let list = open_paths
1188+
.into_iter()
1189+
.flat_map(move |(remote, path_id)| {
1190+
PathInfo::new(path_id, &conn, remote, selected_path.as_ref())
1191+
})
1192+
.collect();
1193+
PathInfoList(list)
1194+
},
1195+
)))
11631196
}
11641197
}
11651198

0 commit comments

Comments
 (0)