diff --git a/Cargo.lock b/Cargo.lock index 9cf9dca565..13caf3599b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,7 +1121,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2378,7 +2378,7 @@ dependencies = [ "once_cell", "socket2 0.5.10", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2739,8 +2739,7 @@ dependencies = [ [[package]] name = "n0-watcher" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38acf13c1ddafc60eb7316d52213467f8ccb70b6f02b65e7d97f7799b1f50be4" +source = "git+https://github.com/n0-computer/n0-watcher?branch=Frando%2Flazy-direct#334044203e4b2a616570fe67e15773f949dc7a2f" dependencies = [ "derive_more 2.0.1", "n0-error", @@ -2889,7 +2888,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3397,7 +3396,7 @@ dependencies = [ "once_cell", "socket2 0.5.10", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3666,7 +3665,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3768,7 +3767,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs 0.26.11", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3789,7 +3788,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs 1.0.3", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -4352,7 +4351,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -5124,7 +5123,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a1faa46daa..1130bab408 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ unused-async = "warn" iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" } iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" } iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" } +n0-watcher = { git = "https://github.com/n0-computer/n0-watcher", branch = "Frando/lazy-direct" } # iroh-quinn = { path = "../iroh-quinn/quinn" } # iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 9ec41b5724..78be3a0d7f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -26,7 +26,7 @@ use url::Url; pub use super::magicsock::{ AddEndpointAddrError, ConnectionType, DirectAddr, DirectAddrType, PathInfo, - endpoint_map::{PathInfoList, Source}, + endpoint_map::{PathInfoIter, Source}, }; #[cfg(wasm_browser)] use crate::discovery::pkarr::PkarrResolver; @@ -1804,9 +1804,9 @@ mod tests { send.write_all(b"hello").await.anyerr()?; let mut paths = conn.paths().stream(); info!("Waiting for direct connection"); - while let Some(infos) = paths.next().await { + while let Some(mut infos) = paths.next().await { info!(?infos, "new PathInfos"); - if infos.iter().any(|info| info.is_ip()) { + if infos.any(|info| info.is_ip()) { break; } } @@ -1893,15 +1893,15 @@ mod tests { // We should be connected via IP, because it is faster than the relay server. // TODO: Maybe not panic if this is not true? - let path_info = conn.paths().get(); + let mut path_info = conn.paths().get(); assert_eq!(path_info.len(), 1); - assert!(path_info.iter().next().unwrap().is_ip()); + assert!(path_info.next().unwrap().is_ip()); let mut paths = conn.paths().stream(); time::timeout(Duration::from_secs(5), async move { - while let Some(infos) = paths.next().await { + while let Some(mut infos) = paths.next().await { info!(?infos, "new PathInfos"); - if infos.iter().any(|info| info.is_relay()) { + if infos.any(|info| info.is_relay()) { break; } } @@ -1944,9 +1944,9 @@ mod tests { // being added on this side too. let mut paths = conn.paths().stream(); time::timeout(Duration::from_secs(5), async move { - while let Some(infos) = paths.next().await { + while let Some(mut infos) = paths.next().await { info!(?infos, "new PathInfos"); - if infos.iter().any(|path| path.is_relay()) { + if infos.any(|path| path.is_relay()) { break; } } diff --git a/iroh/src/endpoint/connection.rs b/iroh/src/endpoint/connection.rs index dc269a3a55..7833cdc23c 100644 --- a/iroh/src/endpoint/connection.rs +++ b/iroh/src/endpoint/connection.rs @@ -43,7 +43,7 @@ use crate::{ discovery::DiscoveryTask, magicsock::{ EndpointStateActorStoppedError, - endpoint_map::{PathInfoList, PathsWatchable}, + endpoint_map::{PathInfoIter, PathsWatcher}, }, }; @@ -1213,7 +1213,7 @@ pub struct Connection { inner: quinn::Connection, remote_id: EndpointId, alpn: Vec, - paths: PathsWatchable, + paths: PathsWatcher, } #[allow(missing_docs)] @@ -1462,7 +1462,7 @@ impl Connection { /// /// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected /// [`PathInfo`]: crate::magicsock::PathInfo - pub fn paths(&self) -> impl Watcher { + pub fn paths(&self) -> impl Watcher { self.paths.watch(self.inner.weak_handle()) } diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 3f417f5b61..36fda5168d 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -64,7 +64,7 @@ use crate::{ disco::{self, SendAddr}, discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData}, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, - magicsock::endpoint_map::PathsWatchable, + magicsock::endpoint_map::PathsWatcher, metrics::EndpointMetrics, net_report::{self, IfStateDetails, Report}, }; @@ -275,14 +275,14 @@ impl MagicSock { /// The actor is responsible for holepunching and opening additional paths to this /// connection. /// - /// Returns a future that resolves to [`PathsWatchable`]. + /// Returns a future that resolves to [`PathsWatcher`]. /// /// The returned future is `'static`, so it can be stored without being liftetime-bound to `&self`. pub(crate) fn register_connection( &self, remote: EndpointId, conn: WeakConnectionHandle, - ) -> impl Future> + Send + 'static + ) -> impl Future> + Send + 'static { let (tx, rx) = oneshot::channel(); let sender = self.endpoint_map.endpoint_state_actor(remote); @@ -1985,7 +1985,7 @@ mod tests { info!("stats: {:#?}", stats); // TODO: ensure panics in this function are reported ok if matches!(loss, ExpectedLoss::AlmostNone) { - for info in conn.paths().get().iter() { + for info in conn.paths().get() { assert!( info.stats().lost_packets < 10, "[receiver] path {:?} should not loose many packets", diff --git a/iroh/src/magicsock/endpoint_map.rs b/iroh/src/magicsock/endpoint_map.rs index 7c4b80d4fc..dc375b842f 100644 --- a/iroh/src/magicsock/endpoint_map.rs +++ b/iroh/src/magicsock/endpoint_map.rs @@ -25,8 +25,8 @@ mod endpoint_state; mod path_state; pub(super) use endpoint_state::EndpointStateMessage; -pub(crate) use endpoint_state::PathsWatchable; -pub use endpoint_state::{ConnectionType, PathInfo, PathInfoList}; +pub(crate) use endpoint_state::PathsWatcher; +pub use endpoint_state::{ConnectionType, PathInfo, PathInfoIter}; use endpoint_state::{EndpointStateActor, EndpointStateHandle}; // TODO: use this diff --git a/iroh/src/magicsock/endpoint_map/endpoint_state.rs b/iroh/src/magicsock/endpoint_map/endpoint_state.rs index e5cb923979..eb5f27bfaa 100644 --- a/iroh/src/magicsock/endpoint_map/endpoint_state.rs +++ b/iroh/src/magicsock/endpoint_map/endpoint_state.rs @@ -79,7 +79,7 @@ type PathEvents = MergeUnbounded< >; /// List of addrs and path ids for open paths in a connection. -pub(crate) type PathAddrList = SmallVec<[(TransportAddr, PathId); 4]>; +pub(crate) type PathAddrList = SmallVec<[(TransportAddr, PathId); 2]>; /// The state we need to know about a single remote endpoint. /// @@ -331,9 +331,10 @@ impl EndpointStateActor { async fn handle_msg_add_connection( &mut self, handle: WeakConnectionHandle, - tx: oneshot::Sender, + tx: oneshot::Sender, ) { let pub_open_paths = Watchable::default(); + if let Some(conn) = handle.upgrade() { // Remove any conflicting stable_ids from the local state. let conn_id = ConnId(conn.stable_id()); @@ -393,11 +394,14 @@ impl EndpointStateActor { } self.trigger_holepunching().await; } - tx.send(PathsWatchable { - open_paths: pub_open_paths, - selected_path: self.selected_path.clone(), - }) - .ok(); + + // Create a watcher over the open path and the selected path and pass it back + // to the Connection. + let paths_watcher = PathsWatcher { + open_paths: pub_open_paths.weak_watcher(), + selected_path: self.selected_path.weak_watcher(), + }; + tx.send(paths_watcher).ok(); } /// Handles [`EndpointStateMessage::AddEndpointAddr`]. @@ -996,7 +1000,7 @@ pub(crate) enum EndpointStateMessage { /// needed, any new paths discovered via holepunching will be added. And closed paths /// will be removed etc. #[debug("AddConnection(..)")] - AddConnection(WeakConnectionHandle, oneshot::Sender), + AddConnection(WeakConnectionHandle, oneshot::Sender), /// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable. AddEndpointAddr(EndpointAddr, Source), /// Process a received DISCO CallMeMaybe message. @@ -1143,64 +1147,90 @@ impl ConnectionState { } } -/// Watchables for the open paths and selected transmission path in a connection. -/// -/// This is stored in the [`Connection`], and the watchables are set from within the endpoint state actor. -/// -/// [`Connection`]: crate::endpoint::Connection -#[derive(Debug, Default, Clone)] -pub(crate) struct PathsWatchable { - /// Watchable for the open paths (in this connection). - open_paths: Watchable, - /// Watchable for the selected transmission path (global for this remote endpoint). - selected_path: Watchable>, +/// Watchers for the open paths and selected path of a connection. +#[derive(Debug, Clone)] +pub(crate) struct PathsWatcher { + open_paths: n0_watcher::WeakWatcher, + selected_path: n0_watcher::WeakWatcher>, } -impl PathsWatchable { +impl PathsWatcher { pub(crate) fn watch( &self, conn_handle: WeakConnectionHandle, - ) -> impl Watcher { - let joined_watcher = (self.open_paths.watch(), self.selected_path.watch()); - joined_watcher.map(move |(open_paths, selected_path)| { - let selected_path: Option = selected_path.map(Into::into); - let Some(conn) = conn_handle.upgrade() else { - return PathInfoList(Default::default()); - }; - let list = open_paths - .into_iter() - .flat_map(move |(remote, path_id)| { - PathInfo::new(path_id, &conn, remote, selected_path.as_ref()) - }) - .collect(); - PathInfoList(list) - }) + ) -> impl Watcher { + self.open_paths + .upgrade_lazy() + .or(self.selected_path.upgrade_lazy()) + .map(move |(open_paths, selected_path)| { + let selected_path = selected_path.map(TransportAddr::from); + PathInfoIter { + conn_handle: conn_handle.clone(), + selected_path, + open_paths, + iter_pos: 0, + } + }) } } /// List of [`PathInfo`] for the network paths of a [`Connection`]. /// -/// This struct implements [`IntoIterator`]. +/// This struct implements [`Iterator`]. /// /// [`Connection`]: crate::endpoint::Connection -#[derive(derive_more::Debug, derive_more::IntoIterator, Eq, PartialEq, Clone)] -#[debug("{_0:?}")] -pub struct PathInfoList(SmallVec<[PathInfo; 4]>); - -impl PathInfoList { - /// Returns an iterator over the path infos. - pub fn iter(&self) -> impl Iterator { - self.0.iter() +#[derive(derive_more::Debug, Clone)] +pub struct PathInfoIter { + open_paths: PathAddrList, + selected_path: Option, + #[debug(skip)] + conn_handle: WeakConnectionHandle, + iter_pos: usize, +} + +impl PartialEq for PathInfoIter { + fn eq(&self, other: &Self) -> bool { + self.open_paths == other.open_paths && self.selected_path == other.selected_path } +} + +impl Eq for PathInfoIter {} +impl Iterator for PathInfoIter { + type Item = PathInfo; + + fn next(&mut self) -> Option { + let (remote, path_id) = self.open_paths.get(self.iter_pos)?; + let stats = self + .conn_handle + .upgrade() + .and_then(|conn| conn.path_stats(*path_id))?; + let is_selected = Some(remote) == self.selected_path.as_ref(); + self.iter_pos += 1; + Some(PathInfo { + is_selected, + remote: remote.clone(), + stats, + }) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.open_paths.len() - self.iter_pos; + (len, Some(len)) + } +} + +impl ExactSizeIterator for PathInfoIter {} + +impl PathInfoIter { /// Returns `true` if the list is empty. pub fn is_empty(&self) -> bool { - self.0.is_empty() + self.len() == 0 } /// Returns the number of paths. pub fn len(&self) -> usize { - self.0.len() + self.open_paths.len() } } @@ -1209,41 +1239,22 @@ impl PathInfoList { /// [`Connection`]: crate::endpoint::Connection #[derive(derive_more::Debug, Clone)] pub struct PathInfo { - path_id: PathId, - #[debug(skip)] - handle: WeakConnectionHandle, - stats: PathStats, remote: TransportAddr, is_selected: bool, + stats: PathStats, } impl PartialEq for PathInfo { fn eq(&self, other: &Self) -> bool { - self.path_id == other.path_id - && self.remote == other.remote + self.remote == other.remote && self.is_selected == other.is_selected + && self.stats == other.stats } } impl Eq for PathInfo {} impl PathInfo { - fn new( - path_id: PathId, - conn: &quinn::Connection, - remote: TransportAddr, - selected_path: Option<&TransportAddr>, - ) -> Option { - let stats = conn.path_stats(path_id)?; - Some(Self { - path_id, - handle: conn.weak_handle(), - is_selected: Some(&remote) == selected_path, - remote, - stats, - }) - } - /// The remote transport address used by this network path. pub fn remote_addr(&self) -> &TransportAddr { &self.remote @@ -1267,11 +1278,8 @@ impl PathInfo { } /// Returns stats for this transmission path. - pub fn stats(&self) -> PathStats { - self.handle - .upgrade() - .and_then(|conn| conn.path_stats(self.path_id)) - .unwrap_or(self.stats) + pub fn stats(&self) -> &PathStats { + &self.stats } /// Current best estimate of this paths's latency (round-trip-time)