Skip to content

Commit 492b74e

Browse files
authored
refactor: use boxed watcher, not watchable, on connection (#3632)
## Description Alternative to #3631 Replaces the `Watchable`s for path changes on the `Connection` with a boxed `Watcher`. The watcher is boxed because it would increase the `Connection` struct size significantly otherwise because the mapped-and-joined watcher with a `SmallVec` of `PathInfo` inside is ~600 bytes atm. The benefit of storing a `Watcher` and not a `Watchable` is that the watcher streams now close once the EndpointStateActor drops the state for the connection, which it does after the connection is closed. Also adds a test for path watching, including testing that the streams now close when the connection closes. ## Breaking Changes <!-- Optional, if there are any breaking changes document them, including how to migrate older code. --> ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist <!-- Remove any that are not relevant. --> - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. - [ ] List all breaking changes in the above "Breaking Changes" section. - [ ] Open an issue or PR on any number0 repos that are affected by this breaking change. Give guidance on how the updates should be handled or do the actual updates themselves. The major ones are: - [ ] [`quic-rpc`](https://github.com/n0-computer/quic-rpc) - [ ] [`iroh-gossip`](https://github.com/n0-computer/iroh-gossip) - [ ] [`iroh-blobs`](https://github.com/n0-computer/iroh-blobs) - [ ] [`dumbpipe`](https://github.com/n0-computer/dumbpipe) - [ ] [`sendme`](https://github.com/n0-computer/sendme)
1 parent 2f924d9 commit 492b74e

File tree

4 files changed

+174
-47
lines changed

4 files changed

+174
-47
lines changed

iroh/src/endpoint/connection.rs

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
discovery::DiscoveryTask,
4444
magicsock::{
4545
EndpointStateActorStoppedError,
46-
endpoint_map::{PathInfoList, PathsWatchable},
46+
endpoint_map::{PathInfoList, PathsWatcher},
4747
},
4848
};
4949

@@ -1213,7 +1213,7 @@ pub struct Connection {
12131213
inner: quinn::Connection,
12141214
remote_id: EndpointId,
12151215
alpn: Vec<u8>,
1216-
paths: PathsWatchable,
1216+
paths: PathsWatcher,
12171217
}
12181218

12191219
#[allow(missing_docs)]
@@ -1463,7 +1463,7 @@ impl Connection {
14631463
/// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected
14641464
/// [`PathInfo`]: crate::magicsock::PathInfo
14651465
pub fn paths(&self) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
1466-
self.paths.watch(self.inner.weak_handle())
1466+
self.paths.clone()
14671467
}
14681468

14691469
/// Derives keying material from this connection's TLS session secrets.
@@ -1511,16 +1511,21 @@ impl Connection {
15111511

15121512
#[cfg(test)]
15131513
mod tests {
1514+
use std::time::Duration;
1515+
15141516
use iroh_base::{EndpointAddr, SecretKey};
15151517
use n0_error::{Result, StackResultExt, StdResultExt};
1518+
use n0_future::StreamExt;
1519+
use n0_watcher::Watcher;
15161520
use rand::SeedableRng;
1517-
use tracing::{Instrument, info_span, trace_span};
1521+
use tracing::{Instrument, error_span, info_span, trace_span};
15181522
use tracing_test::traced_test;
15191523

15201524
use super::Endpoint;
15211525
use crate::{
15221526
RelayMode,
1523-
endpoint::{ConnectOptions, Incoming, ZeroRttStatus},
1527+
endpoint::{ConnectOptions, Incoming, PathInfo, PathInfoList, ZeroRttStatus},
1528+
test_utils::run_relay_server,
15241529
};
15251530

15261531
const TEST_ALPN: &[u8] = b"n0/iroh/test";
@@ -1730,4 +1735,94 @@ mod tests {
17301735
tokio::join!(client.close(), server.close());
17311736
Ok(())
17321737
}
1738+
1739+
#[tokio::test]
1740+
async fn test_paths_watcher() -> Result {
1741+
tracing_subscriber::fmt::init();
1742+
const ALPN: &[u8] = b"test";
1743+
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
1744+
let (relay_map, _relay_map, _guard) = run_relay_server().await?;
1745+
let server = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1746+
.secret_key(SecretKey::generate(&mut rng))
1747+
.insecure_skip_relay_cert_verify(true)
1748+
.alpns(vec![ALPN.to_vec()])
1749+
.bind()
1750+
.await?;
1751+
1752+
let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
1753+
.secret_key(SecretKey::generate(&mut rng))
1754+
.insecure_skip_relay_cert_verify(true)
1755+
.bind()
1756+
.await?;
1757+
1758+
server.online().await;
1759+
let server_addr = server.addr();
1760+
tracing::info!("server addr: {server_addr:?}");
1761+
1762+
let (conn_client, conn_server) = tokio::join!(
1763+
async { client.connect(server_addr, ALPN).await.unwrap() },
1764+
async { server.accept().await.unwrap().await.unwrap() }
1765+
);
1766+
tracing::info!("connected");
1767+
let mut paths_client = conn_client.paths().stream();
1768+
let mut paths_server = conn_server.paths().stream();
1769+
1770+
/// Advances the path stream until at least one IP and one relay paths are available.
1771+
///
1772+
/// Panics if the path stream finishes before that happens.
1773+
async fn wait_for_paths(
1774+
stream: &mut n0_watcher::Stream<impl n0_watcher::Watcher<Value = PathInfoList> + Unpin>,
1775+
) {
1776+
loop {
1777+
let paths = stream.next().await.expect("paths stream ended");
1778+
tracing::info!(?paths, "paths");
1779+
if paths.len() >= 2
1780+
&& paths.iter().any(PathInfo::is_relay)
1781+
&& paths.iter().any(PathInfo::is_ip)
1782+
{
1783+
tracing::info!("break");
1784+
return;
1785+
}
1786+
}
1787+
}
1788+
1789+
// Verify that both connections are notified of path changes and get an IP and a relay path.
1790+
tokio::join!(
1791+
async {
1792+
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_server))
1793+
.instrument(error_span!("paths-server"))
1794+
.await
1795+
.unwrap()
1796+
},
1797+
async {
1798+
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_client))
1799+
.instrument(error_span!("paths-client"))
1800+
.await
1801+
.unwrap()
1802+
}
1803+
);
1804+
1805+
// Close the client connection.
1806+
tracing::info!("close client conn");
1807+
conn_client.close(0u32.into(), b"");
1808+
1809+
// Verify that the path watch streams close.
1810+
assert_eq!(
1811+
tokio::time::timeout(Duration::from_secs(1), paths_client.next())
1812+
.await
1813+
.unwrap(),
1814+
None
1815+
);
1816+
assert_eq!(
1817+
tokio::time::timeout(Duration::from_secs(1), paths_server.next())
1818+
.await
1819+
.unwrap(),
1820+
None
1821+
);
1822+
1823+
server.close().await;
1824+
client.close().await;
1825+
1826+
Ok(())
1827+
}
17331828
}

iroh/src/magicsock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use crate::{
6464
disco::{self, SendAddr},
6565
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
6666
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
67-
magicsock::endpoint_map::PathsWatchable,
67+
magicsock::endpoint_map::PathsWatcher,
6868
metrics::EndpointMetrics,
6969
net_report::{self, IfStateDetails, Report},
7070
};
@@ -272,14 +272,14 @@ impl MagicSock {
272272
/// The actor is responsible for holepunching and opening additional paths to this
273273
/// connection.
274274
///
275-
/// Returns a future that resolves to [`PathsWatchable`].
275+
/// Returns a future that resolves to [`PathsWatcher`].
276276
///
277277
/// The returned future is `'static`, so it can be stored without being liftetime-bound to `&self`.
278278
pub(crate) fn register_connection(
279279
&self,
280280
remote: EndpointId,
281281
conn: WeakConnectionHandle,
282-
) -> impl Future<Output = Result<PathsWatchable, EndpointStateActorStoppedError>> + Send + 'static
282+
) -> impl Future<Output = Result<PathsWatcher, EndpointStateActorStoppedError>> + Send + 'static
283283
{
284284
let (tx, rx) = oneshot::channel();
285285
let sender = self.endpoint_map.endpoint_state_actor(remote);

iroh/src/magicsock/endpoint_map.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,22 @@ use serde::{Deserialize, Serialize};
1111
use tokio::sync::mpsc;
1212
use tracing::warn;
1313

14+
// #[cfg(any(test, feature = "test-utils"))]
15+
// use crate::endpoint::PathSelection;
16+
pub(super) use self::endpoint_state::EndpointStateMessage;
17+
pub(crate) use self::endpoint_state::PathsWatcher;
18+
use self::endpoint_state::{EndpointStateActor, EndpointStateHandle};
19+
pub use self::endpoint_state::{PathInfo, PathInfoList};
1420
use super::{
1521
DirectAddr, DiscoState, MagicsockMetrics,
1622
mapped_addrs::{AddrMap, EndpointIdMappedAddr, RelayMappedAddr},
1723
transports::{self, TransportsSender},
1824
};
19-
use crate::disco::{self};
20-
// #[cfg(any(test, feature = "test-utils"))]
21-
// use crate::endpoint::PathSelection;
25+
use crate::disco;
2226

2327
mod endpoint_state;
2428
mod path_state;
2529

26-
pub(super) use endpoint_state::EndpointStateMessage;
27-
pub(crate) use endpoint_state::PathsWatchable;
28-
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
29-
pub use endpoint_state::{PathInfo, PathInfoList};
30-
3130
// TODO: use this
3231
// /// Number of endpoints that are inactive for which we keep info about. This limit is enforced
3332
// /// periodically via [`NodeMap::prune_inactive`].

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ impl EndpointStateActor {
335335
async fn handle_msg_add_connection(
336336
&mut self,
337337
handle: WeakConnectionHandle,
338-
tx: oneshot::Sender<PathsWatchable>,
338+
tx: oneshot::Sender<PathsWatcher>,
339339
) {
340340
let pub_open_paths = Watchable::default();
341341
if let Some(conn) = handle.upgrade() {
@@ -397,10 +397,11 @@ impl EndpointStateActor {
397397
}
398398
self.trigger_holepunching().await;
399399
}
400-
tx.send(PathsWatchable {
401-
open_paths: pub_open_paths,
402-
selected_path: self.selected_path.clone(),
403-
})
400+
tx.send(PathsWatcher::new(
401+
pub_open_paths.watch(),
402+
self.selected_path.watch(),
403+
handle,
404+
))
404405
.ok();
405406
}
406407

@@ -1002,7 +1003,7 @@ pub(crate) enum EndpointStateMessage {
10021003
/// needed, any new paths discovered via holepunching will be added. And closed paths
10031004
/// will be removed etc.
10041005
#[debug("AddConnection(..)")]
1005-
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatchable>),
1006+
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatcher>),
10061007
/// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable.
10071008
AddEndpointAddr(EndpointAddr, Source),
10081009
/// Process a received DISCO CallMeMaybe message.
@@ -1128,38 +1129,70 @@ impl ConnectionState {
11281129
}
11291130
}
11301131

1131-
/// Watchables for the open paths and selected transmission path in a connection.
1132+
/// Watcher for the open paths and selected transmission path in a connection.
11321133
///
11331134
/// This is stored in the [`Connection`], and the watchables are set from within the endpoint state actor.
11341135
///
1136+
/// Internally, this contains a boxed-mapped-joined watcher over the open paths in the connection and the
1137+
/// selected path to the remote endpoint. The watcher is boxed because the mapped-joined watcher with
1138+
/// `SmallVec<PathInfoList>` has a size of over 800 bytes, which we don't want to put upon the [`Connection`].
1139+
///
11351140
/// [`Connection`]: crate::endpoint::Connection
1136-
#[derive(Debug, Default, Clone)]
1137-
pub(crate) struct PathsWatchable {
1138-
/// Watchable for the open paths (in this connection).
1139-
open_paths: Watchable<PathAddrList>,
1140-
/// Watchable for the selected transmission path (global for this remote endpoint).
1141-
selected_path: Watchable<Option<transports::Addr>>,
1141+
#[derive(Clone, derive_more::Debug)]
1142+
#[debug("PathsWatcher")]
1143+
#[allow(clippy::type_complexity)]
1144+
pub(crate) struct PathsWatcher(
1145+
Box<
1146+
n0_watcher::Map<
1147+
(
1148+
n0_watcher::Direct<PathAddrList>,
1149+
n0_watcher::Direct<Option<transports::Addr>>,
1150+
),
1151+
PathInfoList,
1152+
>,
1153+
>,
1154+
);
1155+
1156+
impl n0_watcher::Watcher for PathsWatcher {
1157+
type Value = PathInfoList;
1158+
1159+
fn get(&mut self) -> Self::Value {
1160+
self.0.get()
1161+
}
1162+
1163+
fn is_connected(&self) -> bool {
1164+
self.0.is_connected()
1165+
}
1166+
1167+
fn poll_updated(
1168+
&mut self,
1169+
cx: &mut std::task::Context<'_>,
1170+
) -> Poll<Result<Self::Value, n0_watcher::Disconnected>> {
1171+
self.0.poll_updated(cx)
1172+
}
11421173
}
11431174

1144-
impl PathsWatchable {
1145-
pub(crate) fn watch(
1146-
&self,
1175+
impl PathsWatcher {
1176+
fn new(
1177+
open_paths: n0_watcher::Direct<PathAddrList>,
1178+
selected_path: n0_watcher::Direct<Option<transports::Addr>>,
11471179
conn_handle: WeakConnectionHandle,
1148-
) -> impl Watcher<Value = PathInfoList> + Unpin + Send + Sync + 'static {
1149-
let joined_watcher = (self.open_paths.watch(), self.selected_path.watch());
1150-
joined_watcher.map(move |(open_paths, selected_path)| {
1151-
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);
1152-
let Some(conn) = conn_handle.upgrade() else {
1153-
return PathInfoList(Default::default());
1154-
};
1155-
let list = open_paths
1156-
.into_iter()
1157-
.flat_map(move |(remote, path_id)| {
1158-
PathInfo::new(path_id, &conn, remote, selected_path.as_ref())
1159-
})
1160-
.collect();
1161-
PathInfoList(list)
1162-
})
1180+
) -> Self {
1181+
Self(Box::new(open_paths.or(selected_path).map(
1182+
move |(open_paths, selected_path)| {
1183+
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);
1184+
let Some(conn) = conn_handle.upgrade() else {
1185+
return PathInfoList(Default::default());
1186+
};
1187+
let list = open_paths
1188+
.into_iter()
1189+
.flat_map(move |(remote, path_id)| {
1190+
PathInfo::new(path_id, &conn, remote, selected_path.as_ref())
1191+
})
1192+
.collect();
1193+
PathInfoList(list)
1194+
},
1195+
)))
11631196
}
11641197
}
11651198

0 commit comments

Comments
 (0)