From 87c2146b45556662b2d919866b3984bfb51a13bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 13 Dec 2024 11:41:12 +0100 Subject: [PATCH 01/17] Introduce a `Watcher` trait and `Or` combinator --- iroh/bench/src/iroh.rs | 1 + iroh/examples/connect.rs | 5 +- iroh/src/discovery/pkarr.rs | 4 +- iroh/src/endpoint.rs | 37 ++-- iroh/src/endpoint/rtt_actor.rs | 12 +- iroh/src/magicsock.rs | 12 +- iroh/src/magicsock/node_map.rs | 9 +- iroh/src/magicsock/node_map/node_state.rs | 4 +- iroh/src/watchable.rs | 213 +++++++++++++++------- 9 files changed, 203 insertions(+), 94 deletions(-) diff --git a/iroh/bench/src/iroh.rs b/iroh/bench/src/iroh.rs index b01811a8bb..d4d57ad3d7 100644 --- a/iroh/bench/src/iroh.rs +++ b/iroh/bench/src/iroh.rs @@ -7,6 +7,7 @@ use anyhow::{Context, Result}; use bytes::Bytes; use iroh::{ endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig}, + watchable::WatcherExt as _, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, }; use tracing::{trace, warn}; diff --git a/iroh/examples/connect.rs b/iroh/examples/connect.rs index 5187c02a1d..9535936187 100644 --- a/iroh/examples/connect.rs +++ b/iroh/examples/connect.rs @@ -9,7 +9,10 @@ use std::net::SocketAddr; use anyhow::Context; use clap::Parser; -use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; +use iroh::{ + watchable::{Watcher as _, WatcherExt as _}, + Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, +}; use tracing::info; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index a319094b68..c9114db16c 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -61,7 +61,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::node_info::NodeInfo, endpoint::force_staging_infra, - watchable::{Disconnected, Watchable, Watcher}, + watchable::{DirectWatcher, Disconnected, Watchable, Watcher as _, WatcherExt as _}, Endpoint, }; @@ -221,7 +221,7 @@ struct PublisherService { secret_key: SecretKey, #[debug("PkarrClient")] pkarr_client: PkarrRelayClient, - watcher: Watcher>, + watcher: DirectWatcher>, ttl: u32, republish_interval: Duration, } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 8ce5a97bd5..d2fcb6cb37 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::Watcher, + watchable::{DirectWatcher, Watcher as _, WatcherExt as _}, }; mod rtt_actor; @@ -762,13 +762,28 @@ impl Endpoint { /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. pub async fn node_addr(&self) -> Result { - let addrs = self.direct_addresses().initialized().await?; - let relay = self.home_relay().get()?; - Ok(NodeAddr::from_parts( - self.node_id(), - relay, - addrs.into_iter().map(|x| x.addr), - )) + let mut watch_addrs = self.direct_addresses(); + let mut watch_relay = self.home_relay(); + tokio::select! { + addrs = watch_addrs.initialized() => { + let addrs = addrs?; + let relay = self.home_relay().get()?; + Ok(NodeAddr::from_parts( + self.node_id(), + relay, + addrs.into_iter().map(|x| x.addr), + )) + }, + relay = watch_relay.initialized() => { + let relay = relay?; + let addrs = self.direct_addresses().get()?.unwrap_or_default(); + Ok(NodeAddr::from_parts( + self.node_id(), + Some(relay), + addrs.into_iter().map(|x| x.addr), + )) + }, + } } /// Returns a [`Watcher`] for the [`RelayUrl`] of the Relay server used as home relay. @@ -796,7 +811,7 @@ impl Endpoint { /// let _relay_url = mep.home_relay().initialized().await.unwrap(); /// # }); /// ``` - pub fn home_relay(&self) -> Watcher> { + pub fn home_relay(&self) -> DirectWatcher> { self.msock.home_relay() } @@ -834,7 +849,7 @@ impl Endpoint { /// ``` /// /// [STUN]: https://en.wikipedia.org/wiki/STUN - pub fn direct_addresses(&self) -> Watcher>> { + pub fn direct_addresses(&self) -> DirectWatcher>> { self.msock.direct_addresses() } @@ -906,7 +921,7 @@ impl Endpoint { /// # Errors /// /// Will error if we do not have any address information for the given `node_id`. - pub fn conn_type(&self, node_id: NodeId) -> Result> { + pub fn conn_type(&self, node_id: NodeId) -> Result> { self.msock.conn_type(node_id) } diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index c1a9748ef5..2c2f8392eb 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -13,7 +13,11 @@ use tokio::{ use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, info_span, trace, Instrument}; -use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watchable::WatcherStream}; +use crate::{ + magicsock::ConnectionType, + metrics::MagicsockMetrics, + watchable::{DirectWatcher, WatcherStream}, +}; #[derive(Debug)] pub(super) struct RttHandle { @@ -51,7 +55,7 @@ pub(super) enum RttMessage { /// The connection. connection: quinn::WeakConnectionHandle, /// Path changes for this connection from the magic socket. - conn_type_changes: WatcherStream, + conn_type_changes: WatcherStream>, /// For reporting-only, the Node ID of this connection. node_id: NodeId, }, @@ -64,7 +68,7 @@ pub(super) enum RttMessage { #[derive(Debug)] struct RttActor { /// Stream of connection type changes. - connection_events: stream_group::Keyed>, + connection_events: stream_group::Keyed>>, /// References to the connections. /// /// These are weak references so not to keep the connections alive. The key allows @@ -121,7 +125,7 @@ impl RttActor { fn handle_new_connection( &mut self, connection: quinn::WeakConnectionHandle, - conn_type_changes: WatcherStream, + conn_type_changes: WatcherStream>, node_id: NodeId, ) { let key = self.connection_events.insert(conn_type_changes); diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 49336e0d20..6dbe40374f 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -66,7 +66,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::DnsResolver, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, - watchable::{Watchable, Watcher}, + watchable::{DirectWatcher, Watchable}, }; mod metrics; @@ -323,7 +323,7 @@ impl MagicSock { /// store [`Some`] set of addresses. /// /// To get the current direct addresses, use [`Watcher::initialized`]. - pub(crate) fn direct_addresses(&self) -> Watcher>> { + pub(crate) fn direct_addresses(&self) -> DirectWatcher>> { self.direct_addrs.addrs.watch() } @@ -331,7 +331,7 @@ impl MagicSock { /// /// Note that this can be used to wait for the initial home relay to be known using /// [`Watcher::initialized`]. - pub(crate) fn home_relay(&self) -> Watcher> { + pub(crate) fn home_relay(&self) -> DirectWatcher> { self.my_relay.watch() } @@ -345,7 +345,7 @@ impl MagicSock { /// /// Will return an error if there is no address information known about the /// given `node_id`. - pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { + pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { self.node_map.conn_type(node_id) } @@ -2853,7 +2853,9 @@ mod tests { use super::*; use crate::{ defaults::staging::{self, EU_RELAY_HOSTNAME}, - tls, Endpoint, RelayMode, + tls, + watchable::WatcherExt as _, + Endpoint, RelayMode, }; const ALPN: &[u8] = b"n0/test/1"; diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index d25fbf84fe..91a387787f 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -21,7 +21,7 @@ use super::{ }; use crate::{ disco::{CallMeMaybe, Pong, SendAddr}, - watchable::Watcher, + watchable::DirectWatcher, }; mod best_addr; @@ -291,7 +291,10 @@ impl NodeMap { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `node_id` - pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + pub(super) fn conn_type( + &self, + node_id: NodeId, + ) -> anyhow::Result> { self.inner.lock().expect("poisoned").conn_type(node_id) } @@ -459,7 +462,7 @@ impl NodeMapInner { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { match self.get(NodeStateKey::NodeId(node_id)) { Some(ep) => Ok(ep.conn_type()), None => anyhow::bail!("No endpoint for {node_id:?} found"), diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index d62d4c294e..0710edaadd 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use crate::{ disco::{self, SendAddr}, magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, util::relay_only_mode, - watchable::{Watchable, Watcher}, + watchable::{DirectWatcher, Watchable}, }; /// Number of addresses that are not active that we keep around per node. @@ -191,7 +191,7 @@ impl NodeState { self.id } - pub(super) fn conn_type(&self) -> Watcher { + pub(super) fn conn_type(&self) -> DirectWatcher { self.conn_type.watch() } diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index 9ed6cd9056..3388ec8c6f 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -88,8 +88,8 @@ impl Watchable { } /// Creates a [`Watcher`] allowing the value to be observed, but not modified. - pub fn watch(&self) -> Watcher { - Watcher { + pub fn watch(&self) -> DirectWatcher { + DirectWatcher { epoch: self.shared.state.read().expect("poisoned").epoch, shared: Arc::downgrade(&self.shared), } @@ -110,31 +110,70 @@ impl Watchable { /// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always /// end up reporting the most recent state eventually. #[derive(Debug, Clone)] -pub struct Watcher { +pub struct DirectWatcher { epoch: u64, shared: Weak>, } -impl Watcher { - /// Returns the currently held value. +/// A handle to a value that's represented by one or more underlying [`Watchable`]s. +/// +/// This handle allows one to observe the latest state and be notified +/// of any changes to this value. +pub trait Watcher: Clone { + /// The type of value that can change. /// - /// Returns [`Err(Disconnected)`](Disconnected) if the original - /// [`Watchable`] was dropped. - pub fn get(&self) -> Result { - let shared = self.shared.upgrade().ok_or(Disconnected)?; - Ok(shared.get()) - } + /// We require `Clone`, because we need to be able to make + /// the values have a lifetime that's detached from the original [`Watchable`]'s + /// lifetime. + /// + /// We require `Eq`, to be able to check whether the value actually changed or + /// not, so we can notify or not notify accordingly. + type Value: Clone + Eq; + + /// Returns the current state of the underlying value, or errors out with + /// [`Disconnected`], if one of the underlying [`Watchable`]s has been dropped. + fn get(&self) -> Result; + + /// Polls for the next value, or returns [`Disconnected`] if one of the underlying + /// watchables has been dropped. + fn poll_updated( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll>; +} +/// Extension methods for the [`Watcher`] trait. +pub trait WatcherExt: Watcher { /// Returns a future completing with `Ok(value)` once a new value is set, or with /// [`Err(Disconnected)`](Disconnected) if the connected [`Watchable`] was dropped. /// /// # Cancel Safety /// /// The returned future is cancel-safe. - pub fn updated(&mut self) -> WatchNextFut { + fn updated(&mut self) -> WatchNextFut { WatchNextFut { watcher: self } } + /// Returns a future completing once the value is set to [`Some`] value. + /// + /// If the current value is [`Some`] value, this future will resolve immediately. + /// + /// This is a utility for the common case of storing an [`Option`] inside a + /// [`Watchable`]. + fn initialized(&mut self) -> WatchInitializedFut + where + Self: Watcher>, + { + WatchInitializedFut { + initial: match self.get() { + Ok(Some(value)) => Some(Ok(value)), + Ok(None) => None, + Err(Disconnected) => Some(Err(Disconnected)), + }, + watcher: self, + } + } + /// Returns a stream which will yield the most recent values as items. /// /// The first item of the stream is the current value, so that this stream can be easily @@ -148,10 +187,14 @@ impl Watcher { /// # Cancel Safety /// /// The returned stream is cancel-safe. - pub fn stream(mut self) -> WatcherStream { - debug_assert!(self.epoch > 0); - self.epoch -= 1; - WatcherStream { watcher: self } + fn stream(self) -> WatcherStream + where + Self: Unpin, + { + WatcherStream { + initial: self.get().ok(), + watcher: self, + } } /// Returns a stream which will yield the most recent values as items, starting from @@ -168,21 +211,67 @@ impl Watcher { /// # Cancel Safety /// /// The returned stream is cancel-safe. - pub fn stream_updates_only(self) -> WatcherStream { - WatcherStream { watcher: self } + fn stream_updates_only(self) -> WatcherStream + where + Self: Unpin, + { + WatcherStream { + initial: None, + watcher: self, + } } } -impl Watcher> { - /// Returns a future completing once the value is set to [`Some`] value. - /// - /// If the current value is [`Some`] value, this future will resolve immediately. - /// - /// This is a utility for the common case of storing an [`Option`] inside a - /// [`Watchable`]. - pub fn initialized(&mut self) -> WatchInitializedFut { - self.epoch = PRE_INITIAL_EPOCH; - WatchInitializedFut { watcher: self } +impl WatcherExt for T {} + +impl Watcher for DirectWatcher { + type Value = T; + + fn get(&self) -> Result { + let shared = self.shared.upgrade().ok_or(Disconnected)?; + Ok(shared.get()) + } + + fn poll_updated( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let Some(shared) = self.shared.upgrade() else { + return Poll::Ready(Err(Disconnected)); + }; + match shared.poll_updated(cx, self.epoch) { + Poll::Pending => Poll::Pending, + Poll::Ready((current_epoch, value)) => { + self.epoch = current_epoch; + Poll::Ready(Ok(value)) + } + } + } +} + +/// Combines two [`Watcher`]s into a single watcher. +#[derive(Clone, Debug)] +pub struct Or(S, T); + +impl Watcher for Or { + type Value = (S::Value, T::Value); + + fn get(&self) -> Result { + Ok((self.0.get()?, self.1.get()?)) + } + + fn poll_updated( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let poll_0 = self.0.poll_updated(cx)?; + let poll_1 = self.1.poll_updated(cx)?; + match (poll_0, poll_1) { + (Poll::Ready(s), Poll::Ready(t)) => Poll::Ready(Ok((s, t))), + (Poll::Ready(s), Poll::Pending) => Poll::Ready(self.1.get().map(move |t| (s, t))), + (Poll::Pending, Poll::Ready(t)) => Poll::Ready(self.0.get().map(move |s| (s, t))), + (Poll::Pending, Poll::Pending) => Poll::Pending, + } } } @@ -194,24 +283,15 @@ impl Watcher> { /// /// This future is cancel-safe. #[derive(Debug)] -pub struct WatchNextFut<'a, T> { - watcher: &'a mut Watcher, +pub struct WatchNextFut<'a, W: Watcher> { + watcher: &'a mut W, } -impl Future for WatchNextFut<'_, T> { - type Output = Result; +impl Future for WatchNextFut<'_, W> { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let Some(shared) = self.watcher.shared.upgrade() else { - return Poll::Ready(Err(Disconnected)); - }; - match shared.poll_next(cx, self.watcher.epoch) { - Poll::Pending => Poll::Pending, - Poll::Ready((current_epoch, value)) => { - self.watcher.epoch = current_epoch; - Poll::Ready(Ok(value)) - } - } + self.watcher.poll_updated(cx) } } @@ -224,22 +304,22 @@ impl Future for WatchNextFut<'_, T> { /// /// This Future is cancel-safe. #[derive(Debug)] -pub struct WatchInitializedFut<'a, T> { - watcher: &'a mut Watcher>, +pub struct WatchInitializedFut<'a, T, W: Watcher>> { + initial: Option>, + watcher: &'a mut W, } -impl Future for WatchInitializedFut<'_, T> { +impl> + Unpin> Future + for WatchInitializedFut<'_, T, W> +{ type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + if let Some(value) = self.as_mut().initial.take() { + return Poll::Ready(value); + } loop { - let Some(shared) = self.watcher.shared.upgrade() else { - return Poll::Ready(Err(Disconnected)); - }; - let (epoch, value) = futures_lite::ready!(shared.poll_next(cx, self.watcher.epoch)); - self.watcher.epoch = epoch; - - if let Some(value) = value { + if let Some(value) = futures_lite::ready!(self.as_mut().watcher.poll_updated(cx)?) { return Poll::Ready(Ok(value)); } } @@ -254,23 +334,25 @@ impl Future for WatchInitializedFut<'_, T> { /// /// This stream is cancel-safe. #[derive(Debug, Clone)] -pub struct WatcherStream { - watcher: Watcher, +pub struct WatcherStream { + initial: Option, + watcher: W, } -impl Stream for WatcherStream { - type Item = T; +impl Stream for WatcherStream +where + W::Value: Unpin, +{ + type Item = W::Value; fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let Some(shared) = self.watcher.shared.upgrade() else { - return Poll::Ready(None); - }; - match shared.poll_next(cx, self.watcher.epoch) { + if let Some(value) = self.as_mut().initial.take() { + return Poll::Ready(Some(value)); + } + match self.as_mut().watcher.poll_updated(cx) { + Poll::Ready(Ok(value)) => Poll::Ready(Some(value)), + Poll::Ready(Err(Disconnected)) => Poll::Ready(None), Poll::Pending => Poll::Pending, - Poll::Ready((epoch, value)) => { - self.watcher.epoch = epoch; - Poll::Ready(Some(value)) - } } } } @@ -284,7 +366,6 @@ pub struct Disconnected; // Private: const INITIAL_EPOCH: u64 = 1; -const PRE_INITIAL_EPOCH: u64 = 0; /// The shared state for a [`Watchable`]. #[derive(Debug, Default)] @@ -315,7 +396,7 @@ impl Shared { self.state.read().expect("poisoned").value.clone() } - fn poll_next(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<(u64, T)> { + fn poll_updated(&self, cx: &mut task::Context<'_>, last_epoch: u64) -> Poll<(u64, T)> { { let state = self.state.read().expect("poisoned"); let epoch = state.epoch; From 4de1bcb6d7b504003882a6b78d546eb3c86d989e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 13 Dec 2024 11:47:18 +0100 Subject: [PATCH 02/17] Merge `WatcherExt` and `Watcher` trait --- iroh/bench/src/iroh.rs | 2 +- iroh/examples/connect.rs | 5 +---- iroh/src/discovery/pkarr.rs | 2 +- iroh/src/endpoint.rs | 2 +- iroh/src/magicsock.rs | 2 +- iroh/src/watchable.rs | 5 ----- 6 files changed, 5 insertions(+), 13 deletions(-) diff --git a/iroh/bench/src/iroh.rs b/iroh/bench/src/iroh.rs index d4d57ad3d7..35f797283a 100644 --- a/iroh/bench/src/iroh.rs +++ b/iroh/bench/src/iroh.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Result}; use bytes::Bytes; use iroh::{ endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig}, - watchable::WatcherExt as _, + watchable::Watcher as _, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, }; use tracing::{trace, warn}; diff --git a/iroh/examples/connect.rs b/iroh/examples/connect.rs index 9535936187..4931890d94 100644 --- a/iroh/examples/connect.rs +++ b/iroh/examples/connect.rs @@ -9,10 +9,7 @@ use std::net::SocketAddr; use anyhow::Context; use clap::Parser; -use iroh::{ - watchable::{Watcher as _, WatcherExt as _}, - Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, -}; +use iroh::{watchable::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; use tracing::info; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index c9114db16c..98ad170b45 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -61,7 +61,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::node_info::NodeInfo, endpoint::force_staging_infra, - watchable::{DirectWatcher, Disconnected, Watchable, Watcher as _, WatcherExt as _}, + watchable::{DirectWatcher, Disconnected, Watchable, Watcher as _}, Endpoint, }; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index d2fcb6cb37..d5e482165f 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::{DirectWatcher, Watcher as _, WatcherExt as _}, + watchable::{DirectWatcher, Watcher as _}, }; mod rtt_actor; diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 6dbe40374f..b9ca25e0ad 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -2854,7 +2854,7 @@ mod tests { use crate::{ defaults::staging::{self, EU_RELAY_HOSTNAME}, tls, - watchable::WatcherExt as _, + watchable::Watcher as _, Endpoint, RelayMode, }; diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index 3388ec8c6f..d1894e63d2 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -140,10 +140,7 @@ pub trait Watcher: Clone { &mut self, cx: &mut task::Context<'_>, ) -> Poll>; -} -/// Extension methods for the [`Watcher`] trait. -pub trait WatcherExt: Watcher { /// Returns a future completing with `Ok(value)` once a new value is set, or with /// [`Err(Disconnected)`](Disconnected) if the connected [`Watchable`] was dropped. /// @@ -222,8 +219,6 @@ pub trait WatcherExt: Watcher { } } -impl WatcherExt for T {} - impl Watcher for DirectWatcher { type Value = T; From ce81e062cd9a6d1537a2bf25e0277de47789b1b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 13 Dec 2024 16:10:26 +0100 Subject: [PATCH 03/17] Introduce `MapWatcher` and make `Endpoint::node_addr` return `impl Watcher` --- iroh/examples/connect-unreliable.rs | 4 +- iroh/examples/listen-unreliable.rs | 4 +- iroh/examples/listen.rs | 4 +- iroh/src/discovery.rs | 12 ++-- iroh/src/endpoint.rs | 54 +++++++++--------- iroh/src/magicsock.rs | 2 +- iroh/src/watchable.rs | 85 +++++++++++++++++++++++------ 7 files changed, 108 insertions(+), 57 deletions(-) diff --git a/iroh/examples/connect-unreliable.rs b/iroh/examples/connect-unreliable.rs index ea375ba0dd..4dd9d15a66 100644 --- a/iroh/examples/connect-unreliable.rs +++ b/iroh/examples/connect-unreliable.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use clap::Parser; -use iroh::{Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; +use iroh::{watchable::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; use tracing::info; // An example ALPN that we are using to communicate over the `Endpoint` @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { .bind() .await?; - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr()?.initialized().await?; let me = node_addr.node_id; println!("node id: {me}"); println!("node listening addresses:"); diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index 06d24a76ab..ae51e9968c 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -3,7 +3,7 @@ //! This example uses the default relay servers to attempt to holepunch, and will use that relay server to relay packets if the two devices cannot establish a direct UDP connection. //! run this example from the project root: //! $ cargo run --example listen-unreliable -use iroh::{Endpoint, RelayMode, SecretKey}; +use iroh::{watchable::Watcher as _, Endpoint, RelayMode, SecretKey}; use tracing::{info, warn}; // An example ALPN that we are using to communicate over the `Endpoint` @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr()?.initialized().await?; let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index 13413992dd..f786640d08 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -5,7 +5,7 @@ //! $ cargo run --example listen use std::time::Duration; -use iroh::{endpoint::ConnectionError, Endpoint, RelayMode, SecretKey}; +use iroh::{endpoint::ConnectionError, watchable::Watcher as _, Endpoint, RelayMode, SecretKey}; use tracing::{debug, info, warn}; // An example ALPN that we are using to communicate over the `Endpoint` @@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr().await?; + let node_addr = endpoint.node_addr()?.initialized().await?; let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index a8ee7965f6..8910a6be20 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -450,7 +450,7 @@ mod tests { use tokio_util::task::AbortOnDropHandle; use super::*; - use crate::RelayMode; + use crate::{watchable::Watcher as _, RelayMode}; type InfoStore = HashMap, BTreeSet, u64)>; @@ -580,7 +580,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for our address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr()?.initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -606,7 +606,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr()?.initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -636,7 +636,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr()?.initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -659,7 +659,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr()?.initialized().await?; let res = ep2.connect(ep1_addr, TEST_ALPN).await; assert!(res.is_err()); Ok(()) @@ -682,7 +682,7 @@ mod tests { new_endpoint(secret, disco).await }; // wait for out address to be updated and thus published at least once - ep1.node_addr().await?; + ep1.node_addr()?.initialized().await?; let ep1_wrong_addr = NodeAddr { node_id: ep1.node_id(), relay_url: None, diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index d5e482165f..3af756c2ca 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::{DirectWatcher, Watcher as _}, + watchable::{DirectWatcher, Watcher}, }; mod rtt_actor; @@ -761,29 +761,27 @@ impl Endpoint { /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. - pub async fn node_addr(&self) -> Result { - let mut watch_addrs = self.direct_addresses(); - let mut watch_relay = self.home_relay(); - tokio::select! { - addrs = watch_addrs.initialized() => { - let addrs = addrs?; - let relay = self.home_relay().get()?; - Ok(NodeAddr::from_parts( - self.node_id(), - relay, - addrs.into_iter().map(|x| x.addr), - )) - }, - relay = watch_relay.initialized() => { - let relay = relay?; - let addrs = self.direct_addresses().get()?.unwrap_or_default(); - Ok(NodeAddr::from_parts( - self.node_id(), - Some(relay), - addrs.into_iter().map(|x| x.addr), - )) - }, - } + pub fn node_addr(&self) -> Result>> { + let watch_addrs = self.direct_addresses(); + let watch_relay = self.home_relay(); + let node_id = self.node_id(); + let watcher = + watch_addrs + .or(watch_relay) + .map(move |(addrs, relay)| match (addrs, relay) { + (Some(addrs), relay) => Some(NodeAddr::from_parts( + node_id, + relay, + addrs.into_iter().map(|x| x.addr), + )), + (None, Some(relay)) => Some(NodeAddr::from_parts( + node_id, + Some(relay), + std::iter::empty(), + )), + (None, None) => None, + })?; + Ok(watcher) } /// Returns a [`Watcher`] for the [`RelayUrl`] of the Relay server used as home relay. @@ -1449,7 +1447,7 @@ mod tests { .bind() .await .unwrap(); - let my_addr = ep.node_addr().await.unwrap(); + let my_addr = ep.node_addr().unwrap().initialized().await.unwrap(); let res = ep.connect(my_addr.clone(), TEST_ALPN).await; assert!(res.is_err()); let err = res.err().unwrap(); @@ -1731,8 +1729,8 @@ mod tests { .bind() .await .unwrap(); - let ep1_nodeaddr = ep1.node_addr().await.unwrap(); - let ep2_nodeaddr = ep2.node_addr().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr().unwrap().initialized().await.unwrap(); + let ep2_nodeaddr = ep2.node_addr().unwrap().initialized().await.unwrap(); ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap(); ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap(); let ep1_nodeid = ep1.node_id(); @@ -1855,7 +1853,7 @@ mod tests { let ep1_nodeid = ep1.node_id(); let ep2_nodeid = ep2.node_id(); - let ep1_nodeaddr = ep1.node_addr().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr().unwrap().initialized().await.unwrap(); tracing::info!( "node id 1 {ep1_nodeid}, relay URL {:?}", ep1_nodeaddr.relay_url() diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index b9ca25e0ad..cb56bf137c 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -3200,7 +3200,7 @@ mod tests { println!("first conn!"); let conn = m1 .endpoint - .connect(m2.endpoint.node_addr().await?, ALPN) + .connect(m2.endpoint.node_addr()?.initialized().await?, ALPN) .await?; println!("Closing first conn"); conn.close(0u32.into(), b"bye lolz"); diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index d1894e63d2..bd9fffe565 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -101,20 +101,6 @@ impl Watchable { } } -/// An observer for a value. -/// -/// The [`Watcher`] can get the current value, and will be notified when the value changes. -/// Only the most recent value is accessible, and if the thread with the [`Watchable`] -/// changes the value faster than the thread with the [`Watcher`] can keep up with, then -/// it'll miss in-between values. -/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always -/// end up reporting the most recent state eventually. -#[derive(Debug, Clone)] -pub struct DirectWatcher { - epoch: u64, - shared: Weak>, -} - /// A handle to a value that's represented by one or more underlying [`Watchable`]s. /// /// This handle allows one to observe the latest state and be notified @@ -217,6 +203,38 @@ pub trait Watcher: Clone { watcher: self, } } + + /// Maps this watcher with a function that transforms the observed values. + fn map T>( + self, + map: F, + ) -> Result, Disconnected> { + Ok(MapWatcher { + current: (map)(self.get()?), + map, + watcher: self, + }) + } + + /// Returns a watcher that updates every time this or the other watcher + /// updates, and yields both watcher's items together when that happens. + fn or(self, other: W) -> OrWatcher { + OrWatcher(self, other) + } +} + +/// An observer for a value. +/// +/// The [`Watcher`] can get the current value, and will be notified when the value changes. +/// Only the most recent value is accessible, and if the thread with the [`Watchable`] +/// changes the value faster than the thread with the [`Watcher`] can keep up with, then +/// it'll miss in-between values. +/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always +/// end up reporting the most recent state eventually. +#[derive(Debug, Clone)] +pub struct DirectWatcher { + epoch: u64, + shared: Weak>, } impl Watcher for DirectWatcher { @@ -245,10 +263,13 @@ impl Watcher for DirectWatcher { } /// Combines two [`Watcher`]s into a single watcher. +/// +/// This watcher updates when one of the inner watchers +/// is updated at least once. #[derive(Clone, Debug)] -pub struct Or(S, T); +pub struct OrWatcher(S, T); -impl Watcher for Or { +impl Watcher for OrWatcher { type Value = (S::Value, T::Value); fn get(&self) -> Result { @@ -270,6 +291,38 @@ impl Watcher for Or { } } +/// Maps a [`Watcher`] and allows filtering updates. +#[derive(Clone, Debug)] +pub struct MapWatcher T> { + map: F, + watcher: W, + current: T, +} + +impl T> Watcher for MapWatcher { + type Value = T; + + fn get(&self) -> Result { + Ok((self.map)(self.watcher.get()?)) + } + + fn poll_updated( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + loop { + let value = futures_lite::ready!(self.watcher.poll_updated(cx)?); + let mapped = (self.map)(value); + if mapped != self.current { + self.current = mapped.clone(); + return Poll::Ready(Ok(mapped)); + } else { + self.current = mapped; + } + } + } +} + /// Future returning the next item after the current one in a [`Watcher`]. /// /// See [`Watcher::updated`]. From 476338d282ba7d069f9e42ff88bc5e2a34aadf48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Fri, 13 Dec 2024 18:24:53 +0100 Subject: [PATCH 04/17] Use a named type for `Endpoint::node_addr` return type --- iroh/src/endpoint.rs | 51 ++++++++++++++++++++++++++++--------------- iroh/src/watchable.rs | 22 ++++++++++++++----- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 3af756c2ca..3a5b14fee2 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::{DirectWatcher, Watcher}, + watchable::{DirectWatcher, MapWatcher, OrWatcher, Watcher}, }; mod rtt_actor; @@ -74,6 +74,22 @@ const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); type DiscoveryBuilder = Box Option> + Send + Sync>; +type NodeAddrMapper = Box< + dyn Fn((Option>, Option)) -> Option + + Send + + Sync + + 'static, +>; + +/// TODO(matheus23): DOCS (don't even ask) +/// +/// Implements [`Watcher`]`>`. +pub type NodeAddrWatcher = MapWatcher< + OrWatcher>>, DirectWatcher>>, + Option, + NodeAddrMapper, +>; + /// Builder for [`Endpoint`]. /// /// By default the endpoint will generate a new random [`SecretKey`], which will result in a @@ -761,26 +777,25 @@ impl Endpoint { /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. - pub fn node_addr(&self) -> Result>> { + pub fn node_addr(&self) -> Result { let watch_addrs = self.direct_addresses(); let watch_relay = self.home_relay(); let node_id = self.node_id(); - let watcher = - watch_addrs - .or(watch_relay) - .map(move |(addrs, relay)| match (addrs, relay) { - (Some(addrs), relay) => Some(NodeAddr::from_parts( - node_id, - relay, - addrs.into_iter().map(|x| x.addr), - )), - (None, Some(relay)) => Some(NodeAddr::from_parts( - node_id, - Some(relay), - std::iter::empty(), - )), - (None, None) => None, - })?; + let mapper: NodeAddrMapper = Box::new(move |(addrs, relay)| match (addrs, relay) { + (Some(addrs), relay) => Some(NodeAddr::from_parts( + node_id, + relay, + addrs.into_iter().map(|x| x.addr), + )), + (None, Some(relay)) => Some(NodeAddr::from_parts( + node_id, + Some(relay), + std::iter::empty(), + )), + (None, None) => None, + }); + + let watcher = watch_addrs.or(watch_relay).map(mapper)?; Ok(watcher) } diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index bd9fffe565..2029ac45c7 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -205,13 +205,13 @@ pub trait Watcher: Clone { } /// Maps this watcher with a function that transforms the observed values. - fn map T>( + fn map T>( self, map: F, ) -> Result, Disconnected> { Ok(MapWatcher { current: (map)(self.get()?), - map, + map: Arc::new(map), watcher: self, }) } @@ -292,14 +292,24 @@ impl Watcher for OrWatcher { } /// Maps a [`Watcher`] and allows filtering updates. -#[derive(Clone, Debug)] -pub struct MapWatcher T> { - map: F, +#[derive(Debug)] +pub struct MapWatcher T> { + map: Arc, watcher: W, current: T, } -impl T> Watcher for MapWatcher { +impl T> Clone for MapWatcher { + fn clone(&self) -> Self { + Self { + map: self.map.clone(), + watcher: self.watcher.clone(), + current: self.current.clone(), + } + } +} + +impl T> Watcher for MapWatcher { type Value = T; fn get(&self) -> Result { From f8b7e5e73a8b196d2ee44b62048c6d3c3601bb80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 11:21:43 +0100 Subject: [PATCH 05/17] Fix `local_swarm_discovery` --- iroh/src/discovery/local_swarm_discovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/discovery/local_swarm_discovery.rs b/iroh/src/discovery/local_swarm_discovery.rs index b4975807c4..9afbda6813 100644 --- a/iroh/src/discovery/local_swarm_discovery.rs +++ b/iroh/src/discovery/local_swarm_discovery.rs @@ -54,7 +54,7 @@ use tracing::{debug, error, info_span, trace, warn, Instrument}; use crate::{ discovery::{Discovery, DiscoveryItem}, - watchable::Watchable, + watchable::{Watchable, Watcher as _}, Endpoint, }; From 30eab12e22ac4b3c5b99f4fcb3b6a77877be0b05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 11:55:49 +0100 Subject: [PATCH 06/17] Make `Endpoint::node_addr` infallible --- iroh/examples/connect-unreliable.rs | 2 +- iroh/examples/listen-unreliable.rs | 2 +- iroh/examples/listen.rs | 2 +- iroh/src/discovery.rs | 10 +++++----- iroh/src/endpoint.rs | 16 +++++++++------- iroh/src/magicsock.rs | 2 +- iroh/src/watchable.rs | 2 +- 7 files changed, 19 insertions(+), 17 deletions(-) diff --git a/iroh/examples/connect-unreliable.rs b/iroh/examples/connect-unreliable.rs index 4dd9d15a66..8065288071 100644 --- a/iroh/examples/connect-unreliable.rs +++ b/iroh/examples/connect-unreliable.rs @@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> { .bind() .await?; - let node_addr = endpoint.node_addr()?.initialized().await?; + let node_addr = endpoint.node_addr().initialized().await?; let me = node_addr.node_id; println!("node id: {me}"); println!("node listening addresses:"); diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index ae51e9968c..902a29268f 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr()?.initialized().await?; + let node_addr = endpoint.node_addr().initialized().await?; let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index f786640d08..e4fb2ffe36 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -37,7 +37,7 @@ async fn main() -> anyhow::Result<()> { println!("node id: {me}"); println!("node listening addresses:"); - let node_addr = endpoint.node_addr()?.initialized().await?; + let node_addr = endpoint.node_addr().initialized().await?; let local_addrs = node_addr .direct_addresses .into_iter() diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 8910a6be20..1b24d174d2 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -580,7 +580,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for our address to be updated and thus published at least once - ep1.node_addr()?.initialized().await?; + ep1.node_addr().initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -606,7 +606,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr()?.initialized().await?; + ep1.node_addr().initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -636,7 +636,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr()?.initialized().await?; + ep1.node_addr().initialized().await?; let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?; Ok(()) } @@ -659,7 +659,7 @@ mod tests { }; let ep1_addr = NodeAddr::new(ep1.node_id()); // wait for out address to be updated and thus published at least once - ep1.node_addr()?.initialized().await?; + ep1.node_addr().initialized().await?; let res = ep2.connect(ep1_addr, TEST_ALPN).await; assert!(res.is_err()); Ok(()) @@ -682,7 +682,7 @@ mod tests { new_endpoint(secret, disco).await }; // wait for out address to be updated and thus published at least once - ep1.node_addr()?.initialized().await?; + ep1.node_addr().initialized().await?; let ep1_wrong_addr = NodeAddr { node_id: ep1.node_id(), relay_url: None, diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 3a5b14fee2..1296eb17c2 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -777,7 +777,7 @@ impl Endpoint { /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses /// as they would be returned by [`Endpoint::home_relay`] and /// [`Endpoint::direct_addresses`]. - pub fn node_addr(&self) -> Result { + pub fn node_addr(&self) -> NodeAddrWatcher { let watch_addrs = self.direct_addresses(); let watch_relay = self.home_relay(); let node_id = self.node_id(); @@ -795,8 +795,10 @@ impl Endpoint { (None, None) => None, }); - let watcher = watch_addrs.or(watch_relay).map(mapper)?; - Ok(watcher) + watch_addrs + .or(watch_relay) + .map(mapper) + .expect("watchable is alive - cannot be disconnected yet") } /// Returns a [`Watcher`] for the [`RelayUrl`] of the Relay server used as home relay. @@ -1462,7 +1464,7 @@ mod tests { .bind() .await .unwrap(); - let my_addr = ep.node_addr().unwrap().initialized().await.unwrap(); + let my_addr = ep.node_addr().initialized().await.unwrap(); let res = ep.connect(my_addr.clone(), TEST_ALPN).await; assert!(res.is_err()); let err = res.err().unwrap(); @@ -1744,8 +1746,8 @@ mod tests { .bind() .await .unwrap(); - let ep1_nodeaddr = ep1.node_addr().unwrap().initialized().await.unwrap(); - let ep2_nodeaddr = ep2.node_addr().unwrap().initialized().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr().initialized().await.unwrap(); + let ep2_nodeaddr = ep2.node_addr().initialized().await.unwrap(); ep1.add_node_addr(ep2_nodeaddr.clone()).unwrap(); ep2.add_node_addr(ep1_nodeaddr.clone()).unwrap(); let ep1_nodeid = ep1.node_id(); @@ -1868,7 +1870,7 @@ mod tests { let ep1_nodeid = ep1.node_id(); let ep2_nodeid = ep2.node_id(); - let ep1_nodeaddr = ep1.node_addr().unwrap().initialized().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr().initialized().await.unwrap(); tracing::info!( "node id 1 {ep1_nodeid}, relay URL {:?}", ep1_nodeaddr.relay_url() diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index cb56bf137c..39aa3d3c86 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -3200,7 +3200,7 @@ mod tests { println!("first conn!"); let conn = m1 .endpoint - .connect(m2.endpoint.node_addr()?.initialized().await?, ALPN) + .connect(m2.endpoint.node_addr().initialized().await?, ALPN) .await?; println!("Closing first conn"); conn.close(0u32.into(), b"bye lolz"); diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index 2029ac45c7..f4f91fe2fe 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -418,7 +418,7 @@ where /// The error for when a [`Watcher`] is disconnected from its underlying /// [`Watchable`] value, because of that watchable having been dropped. #[derive(thiserror::Error, Debug)] -#[error("Watch lost connection to underlying Watchable, it was dropped")] +#[error("Watcher lost connection to underlying Watchable, it was dropped")] pub struct Disconnected; // Private: From 4342996a8534b55f07e5d1a0f3855bcb8f58098f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 12:04:00 +0100 Subject: [PATCH 07/17] Make `MapWatcher` store a `dyn Fn` --- iroh/src/endpoint.rs | 35 +++++++++++++---------------------- iroh/src/watchable.rs | 17 +++++++++-------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 1296eb17c2..8938906659 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -74,20 +74,12 @@ const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); type DiscoveryBuilder = Box Option> + Send + Sync>; -type NodeAddrMapper = Box< - dyn Fn((Option>, Option)) -> Option - + Send - + Sync - + 'static, ->; - /// TODO(matheus23): DOCS (don't even ask) /// /// Implements [`Watcher`]`>`. pub type NodeAddrWatcher = MapWatcher< OrWatcher>>, DirectWatcher>>, Option, - NodeAddrMapper, >; /// Builder for [`Endpoint`]. @@ -781,23 +773,22 @@ impl Endpoint { let watch_addrs = self.direct_addresses(); let watch_relay = self.home_relay(); let node_id = self.node_id(); - let mapper: NodeAddrMapper = Box::new(move |(addrs, relay)| match (addrs, relay) { - (Some(addrs), relay) => Some(NodeAddr::from_parts( - node_id, - relay, - addrs.into_iter().map(|x| x.addr), - )), - (None, Some(relay)) => Some(NodeAddr::from_parts( - node_id, - Some(relay), - std::iter::empty(), - )), - (None, None) => None, - }); watch_addrs .or(watch_relay) - .map(mapper) + .map(move |(addrs, relay)| match (addrs, relay) { + (Some(addrs), relay) => Some(NodeAddr::from_parts( + node_id, + relay, + addrs.into_iter().map(|x| x.addr), + )), + (None, Some(relay)) => Some(NodeAddr::from_parts( + node_id, + Some(relay), + std::iter::empty(), + )), + (None, None) => None, + }) .expect("watchable is alive - cannot be disconnected yet") } diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index f4f91fe2fe..ba91012954 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -205,10 +205,10 @@ pub trait Watcher: Clone { } /// Maps this watcher with a function that transforms the observed values. - fn map T>( + fn map( self, - map: F, - ) -> Result, Disconnected> { + map: impl Fn(Self::Value) -> T + 'static, + ) -> Result, Disconnected> { Ok(MapWatcher { current: (map)(self.get()?), map: Arc::new(map), @@ -292,14 +292,15 @@ impl Watcher for OrWatcher { } /// Maps a [`Watcher`] and allows filtering updates. -#[derive(Debug)] -pub struct MapWatcher T> { - map: Arc, +#[derive(derive_more::Debug)] +pub struct MapWatcher { + #[debug("Arc T + 'static>")] + map: Arc T + 'static>, watcher: W, current: T, } -impl T> Clone for MapWatcher { +impl Clone for MapWatcher { fn clone(&self) -> Self { Self { map: self.map.clone(), @@ -309,7 +310,7 @@ impl T> Clone for MapWatch } } -impl T> Watcher for MapWatcher { +impl Watcher for MapWatcher { type Value = T; fn get(&self) -> Result { From 619b8e8ce48b7287fc8c117f56b99f6fe43ef939 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 12:07:05 +0100 Subject: [PATCH 08/17] Replace `OrWatcher` with an implementation on tuples --- iroh/src/endpoint.rs | 7 +++++-- iroh/src/watchable.rs | 13 +++---------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 8938906659..2bda7aa82d 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::{DirectWatcher, MapWatcher, OrWatcher, Watcher}, + watchable::{DirectWatcher, MapWatcher, Watcher}, }; mod rtt_actor; @@ -78,7 +78,10 @@ type DiscoveryBuilder = Box Option> /// /// Implements [`Watcher`]`>`. pub type NodeAddrWatcher = MapWatcher< - OrWatcher>>, DirectWatcher>>, + ( + DirectWatcher>>, + DirectWatcher>, + ), Option, >; diff --git a/iroh/src/watchable.rs b/iroh/src/watchable.rs index ba91012954..614d68e562 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watchable.rs @@ -218,8 +218,8 @@ pub trait Watcher: Clone { /// Returns a watcher that updates every time this or the other watcher /// updates, and yields both watcher's items together when that happens. - fn or(self, other: W) -> OrWatcher { - OrWatcher(self, other) + fn or(self, other: W) -> (Self, W) { + (self, other) } } @@ -262,14 +262,7 @@ impl Watcher for DirectWatcher { } } -/// Combines two [`Watcher`]s into a single watcher. -/// -/// This watcher updates when one of the inner watchers -/// is updated at least once. -#[derive(Clone, Debug)] -pub struct OrWatcher(S, T); - -impl Watcher for OrWatcher { +impl Watcher for (S, T) { type Value = (S::Value, T::Value); fn get(&self) -> Result { From 30ab6fc39e6c9ef71dfcd24988a13fffaa623c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 12:14:25 +0100 Subject: [PATCH 09/17] Rename `watchable` into `watcher`, and remove `Watch*` prefixes from its types --- iroh/bench/src/iroh.rs | 2 +- iroh/examples/connect-unreliable.rs | 2 +- iroh/examples/connect.rs | 2 +- iroh/examples/listen-unreliable.rs | 2 +- iroh/examples/listen.rs | 2 +- iroh/src/discovery.rs | 2 +- iroh/src/discovery/pkarr.rs | 4 +- iroh/src/endpoint.rs | 14 +++---- iroh/src/endpoint/rtt_actor.rs | 8 ++-- iroh/src/lib.rs | 2 +- iroh/src/magicsock.rs | 10 ++--- iroh/src/magicsock/node_map.rs | 9 ++--- iroh/src/magicsock/node_map/node_state.rs | 4 +- iroh/src/{watchable.rs => watcher.rs} | 47 +++++++++++------------ 14 files changed, 53 insertions(+), 57 deletions(-) rename iroh/src/{watchable.rs => watcher.rs} (94%) diff --git a/iroh/bench/src/iroh.rs b/iroh/bench/src/iroh.rs index 35f797283a..792481a1de 100644 --- a/iroh/bench/src/iroh.rs +++ b/iroh/bench/src/iroh.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Result}; use bytes::Bytes; use iroh::{ endpoint::{Connection, ConnectionError, RecvStream, SendStream, TransportConfig}, - watchable::Watcher as _, + watcher::Watcher as _, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, }; use tracing::{trace, warn}; diff --git a/iroh/examples/connect-unreliable.rs b/iroh/examples/connect-unreliable.rs index 8065288071..b361b7e268 100644 --- a/iroh/examples/connect-unreliable.rs +++ b/iroh/examples/connect-unreliable.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use clap::Parser; -use iroh::{watchable::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; +use iroh::{watcher::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; use tracing::info; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/examples/connect.rs b/iroh/examples/connect.rs index 4931890d94..767abd478e 100644 --- a/iroh/examples/connect.rs +++ b/iroh/examples/connect.rs @@ -9,7 +9,7 @@ use std::net::SocketAddr; use anyhow::Context; use clap::Parser; -use iroh::{watchable::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; +use iroh::{watcher::Watcher as _, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey}; use tracing::info; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/examples/listen-unreliable.rs b/iroh/examples/listen-unreliable.rs index 902a29268f..d787ed6107 100644 --- a/iroh/examples/listen-unreliable.rs +++ b/iroh/examples/listen-unreliable.rs @@ -3,7 +3,7 @@ //! This example uses the default relay servers to attempt to holepunch, and will use that relay server to relay packets if the two devices cannot establish a direct UDP connection. //! run this example from the project root: //! $ cargo run --example listen-unreliable -use iroh::{watchable::Watcher as _, Endpoint, RelayMode, SecretKey}; +use iroh::{watcher::Watcher as _, Endpoint, RelayMode, SecretKey}; use tracing::{info, warn}; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/examples/listen.rs b/iroh/examples/listen.rs index e4fb2ffe36..9b969f3bc3 100644 --- a/iroh/examples/listen.rs +++ b/iroh/examples/listen.rs @@ -5,7 +5,7 @@ //! $ cargo run --example listen use std::time::Duration; -use iroh::{endpoint::ConnectionError, watchable::Watcher as _, Endpoint, RelayMode, SecretKey}; +use iroh::{endpoint::ConnectionError, watcher::Watcher as _, Endpoint, RelayMode, SecretKey}; use tracing::{debug, info, warn}; // An example ALPN that we are using to communicate over the `Endpoint` diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 1b24d174d2..d9b8d3afc6 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -450,7 +450,7 @@ mod tests { use tokio_util::task::AbortOnDropHandle; use super::*; - use crate::{watchable::Watcher as _, RelayMode}; + use crate::{watcher::Watcher as _, RelayMode}; type InfoStore = HashMap, BTreeSet, u64)>; diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index 98ad170b45..ec6087008b 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -61,7 +61,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::node_info::NodeInfo, endpoint::force_staging_infra, - watchable::{DirectWatcher, Disconnected, Watchable, Watcher as _}, + watcher::{Direct, Disconnected, Watchable, Watcher as _}, Endpoint, }; @@ -221,7 +221,7 @@ struct PublisherService { secret_key: SecretKey, #[debug("PkarrClient")] pkarr_client: PkarrRelayClient, - watcher: DirectWatcher>, + watcher: Direct>, ttl: u32, republish_interval: Duration, } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 2bda7aa82d..0ccff6a623 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watchable::{DirectWatcher, MapWatcher, Watcher}, + watcher::{self, Watcher as _}, }; mod rtt_actor; @@ -77,10 +77,10 @@ type DiscoveryBuilder = Box Option> /// TODO(matheus23): DOCS (don't even ask) /// /// Implements [`Watcher`]`>`. -pub type NodeAddrWatcher = MapWatcher< +pub type NodeAddrWatcher = watcher::Map< ( - DirectWatcher>>, - DirectWatcher>, + watcher::Direct>>, + watcher::Direct>, ), Option, >; @@ -820,7 +820,7 @@ impl Endpoint { /// let _relay_url = mep.home_relay().initialized().await.unwrap(); /// # }); /// ``` - pub fn home_relay(&self) -> DirectWatcher> { + pub fn home_relay(&self) -> watcher::Direct> { self.msock.home_relay() } @@ -858,7 +858,7 @@ impl Endpoint { /// ``` /// /// [STUN]: https://en.wikipedia.org/wiki/STUN - pub fn direct_addresses(&self) -> DirectWatcher>> { + pub fn direct_addresses(&self) -> watcher::Direct>> { self.msock.direct_addresses() } @@ -930,7 +930,7 @@ impl Endpoint { /// # Errors /// /// Will error if we do not have any address information for the given `node_id`. - pub fn conn_type(&self, node_id: NodeId) -> Result> { + pub fn conn_type(&self, node_id: NodeId) -> Result> { self.msock.conn_type(node_id) } diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index 2c2f8392eb..7080b058bd 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -16,7 +16,7 @@ use tracing::{debug, error, info_span, trace, Instrument}; use crate::{ magicsock::ConnectionType, metrics::MagicsockMetrics, - watchable::{DirectWatcher, WatcherStream}, + watcher::{Direct, Stream}, }; #[derive(Debug)] @@ -55,7 +55,7 @@ pub(super) enum RttMessage { /// The connection. connection: quinn::WeakConnectionHandle, /// Path changes for this connection from the magic socket. - conn_type_changes: WatcherStream>, + conn_type_changes: Stream>, /// For reporting-only, the Node ID of this connection. node_id: NodeId, }, @@ -68,7 +68,7 @@ pub(super) enum RttMessage { #[derive(Debug)] struct RttActor { /// Stream of connection type changes. - connection_events: stream_group::Keyed>>, + connection_events: stream_group::Keyed>>, /// References to the connections. /// /// These are weak references so not to keep the connections alive. The key allows @@ -125,7 +125,7 @@ impl RttActor { fn handle_new_connection( &mut self, connection: quinn::WeakConnectionHandle, - conn_type_changes: WatcherStream>, + conn_type_changes: Stream>, node_id: NodeId, ) { let key = self.connection_events.insert(conn_type_changes); diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 988b7697a4..659925d97c 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -246,7 +246,7 @@ pub mod endpoint; pub mod metrics; pub mod protocol; mod tls; -pub mod watchable; +pub mod watcher; pub use endpoint::{Endpoint, RelayMode}; pub use iroh_base::{ diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 39aa3d3c86..cc925926f2 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -66,7 +66,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::DnsResolver, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, - watchable::{DirectWatcher, Watchable}, + watcher::{Direct, Watchable}, }; mod metrics; @@ -323,7 +323,7 @@ impl MagicSock { /// store [`Some`] set of addresses. /// /// To get the current direct addresses, use [`Watcher::initialized`]. - pub(crate) fn direct_addresses(&self) -> DirectWatcher>> { + pub(crate) fn direct_addresses(&self) -> Direct>> { self.direct_addrs.addrs.watch() } @@ -331,7 +331,7 @@ impl MagicSock { /// /// Note that this can be used to wait for the initial home relay to be known using /// [`Watcher::initialized`]. - pub(crate) fn home_relay(&self) -> DirectWatcher> { + pub(crate) fn home_relay(&self) -> Direct> { self.my_relay.watch() } @@ -345,7 +345,7 @@ impl MagicSock { /// /// Will return an error if there is no address information known about the /// given `node_id`. - pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { + pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { self.node_map.conn_type(node_id) } @@ -2854,7 +2854,7 @@ mod tests { use crate::{ defaults::staging::{self, EU_RELAY_HOSTNAME}, tls, - watchable::Watcher as _, + watcher::Watcher as _, Endpoint, RelayMode, }; diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 91a387787f..37a7ce63dd 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -21,7 +21,7 @@ use super::{ }; use crate::{ disco::{CallMeMaybe, Pong, SendAddr}, - watchable::DirectWatcher, + watcher::Direct, }; mod best_addr; @@ -291,10 +291,7 @@ impl NodeMap { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `node_id` - pub(super) fn conn_type( - &self, - node_id: NodeId, - ) -> anyhow::Result> { + pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { self.inner.lock().expect("poisoned").conn_type(node_id) } @@ -462,7 +459,7 @@ impl NodeMapInner { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { match self.get(NodeStateKey::NodeId(node_id)) { Some(ep) => Ok(ep.conn_type()), None => anyhow::bail!("No endpoint for {node_id:?} found"), diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index 0710edaadd..f5a738e1a0 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use crate::{ disco::{self, SendAddr}, magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, util::relay_only_mode, - watchable::{DirectWatcher, Watchable}, + watcher::{Direct, Watchable}, }; /// Number of addresses that are not active that we keep around per node. @@ -191,7 +191,7 @@ impl NodeState { self.id } - pub(super) fn conn_type(&self) -> DirectWatcher { + pub(super) fn conn_type(&self) -> Direct { self.conn_type.watch() } diff --git a/iroh/src/watchable.rs b/iroh/src/watcher.rs similarity index 94% rename from iroh/src/watchable.rs rename to iroh/src/watcher.rs index 614d68e562..dec6384d93 100644 --- a/iroh/src/watchable.rs +++ b/iroh/src/watcher.rs @@ -18,7 +18,6 @@ use std::{ task::{self, Poll, Waker}, }; -use futures_lite::stream::Stream; #[cfg(iroh_loom)] use loom::sync; use sync::{Mutex, RwLock}; @@ -88,8 +87,8 @@ impl Watchable { } /// Creates a [`Watcher`] allowing the value to be observed, but not modified. - pub fn watch(&self) -> DirectWatcher { - DirectWatcher { + pub fn watch(&self) -> Direct { + Direct { epoch: self.shared.state.read().expect("poisoned").epoch, shared: Arc::downgrade(&self.shared), } @@ -133,8 +132,8 @@ pub trait Watcher: Clone { /// # Cancel Safety /// /// The returned future is cancel-safe. - fn updated(&mut self) -> WatchNextFut { - WatchNextFut { watcher: self } + fn updated(&mut self) -> NextFut { + NextFut { watcher: self } } /// Returns a future completing once the value is set to [`Some`] value. @@ -143,11 +142,11 @@ pub trait Watcher: Clone { /// /// This is a utility for the common case of storing an [`Option`] inside a /// [`Watchable`]. - fn initialized(&mut self) -> WatchInitializedFut + fn initialized(&mut self) -> InitializedFut where Self: Watcher>, { - WatchInitializedFut { + InitializedFut { initial: match self.get() { Ok(Some(value)) => Some(Ok(value)), Ok(None) => None, @@ -170,11 +169,11 @@ pub trait Watcher: Clone { /// # Cancel Safety /// /// The returned stream is cancel-safe. - fn stream(self) -> WatcherStream + fn stream(self) -> Stream where Self: Unpin, { - WatcherStream { + Stream { initial: self.get().ok(), watcher: self, } @@ -194,11 +193,11 @@ pub trait Watcher: Clone { /// # Cancel Safety /// /// The returned stream is cancel-safe. - fn stream_updates_only(self) -> WatcherStream + fn stream_updates_only(self) -> Stream where Self: Unpin, { - WatcherStream { + Stream { initial: None, watcher: self, } @@ -208,8 +207,8 @@ pub trait Watcher: Clone { fn map( self, map: impl Fn(Self::Value) -> T + 'static, - ) -> Result, Disconnected> { - Ok(MapWatcher { + ) -> Result, Disconnected> { + Ok(Map { current: (map)(self.get()?), map: Arc::new(map), watcher: self, @@ -232,12 +231,12 @@ pub trait Watcher: Clone { /// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always /// end up reporting the most recent state eventually. #[derive(Debug, Clone)] -pub struct DirectWatcher { +pub struct Direct { epoch: u64, shared: Weak>, } -impl Watcher for DirectWatcher { +impl Watcher for Direct { type Value = T; fn get(&self) -> Result { @@ -286,14 +285,14 @@ impl Watcher for (S, T) { /// Maps a [`Watcher`] and allows filtering updates. #[derive(derive_more::Debug)] -pub struct MapWatcher { +pub struct Map { #[debug("Arc T + 'static>")] map: Arc T + 'static>, watcher: W, current: T, } -impl Clone for MapWatcher { +impl Clone for Map { fn clone(&self) -> Self { Self { map: self.map.clone(), @@ -303,7 +302,7 @@ impl Clone for MapWatcher { } } -impl Watcher for MapWatcher { +impl Watcher for Map { type Value = T; fn get(&self) -> Result { @@ -335,11 +334,11 @@ impl Watcher for MapWatcher { /// /// This future is cancel-safe. #[derive(Debug)] -pub struct WatchNextFut<'a, W: Watcher> { +pub struct NextFut<'a, W: Watcher> { watcher: &'a mut W, } -impl Future for WatchNextFut<'_, W> { +impl Future for NextFut<'_, W> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { @@ -356,13 +355,13 @@ impl Future for WatchNextFut<'_, W> { /// /// This Future is cancel-safe. #[derive(Debug)] -pub struct WatchInitializedFut<'a, T, W: Watcher>> { +pub struct InitializedFut<'a, T, W: Watcher>> { initial: Option>, watcher: &'a mut W, } impl> + Unpin> Future - for WatchInitializedFut<'_, T, W> + for InitializedFut<'_, T, W> { type Output = Result; @@ -386,12 +385,12 @@ impl> + Unpin> Future /// /// This stream is cancel-safe. #[derive(Debug, Clone)] -pub struct WatcherStream { +pub struct Stream { initial: Option, watcher: W, } -impl Stream for WatcherStream +impl futures_lite::stream::Stream for Stream where W::Value: Unpin, { From d42812b7f99760cde71978541216622489359e02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 13:41:03 +0100 Subject: [PATCH 10/17] Make docs build again --- iroh/src/discovery/local_swarm_discovery.rs | 2 +- iroh/src/endpoint.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/discovery/local_swarm_discovery.rs b/iroh/src/discovery/local_swarm_discovery.rs index 9afbda6813..aaf8dc0d4e 100644 --- a/iroh/src/discovery/local_swarm_discovery.rs +++ b/iroh/src/discovery/local_swarm_discovery.rs @@ -54,7 +54,7 @@ use tracing::{debug, error, info_span, trace, warn, Instrument}; use crate::{ discovery::{Discovery, DiscoveryItem}, - watchable::{Watchable, Watcher as _}, + watcher::{Watchable, Watcher as _}, Endpoint, }; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 0ccff6a623..a4af356127 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -38,7 +38,7 @@ use crate::{ dns::{default_resolver, DnsResolver}, magicsock::{self, Handle, QuicMappedAddr}, tls, - watcher::{self, Watcher as _}, + watcher::{self, Watcher}, }; mod rtt_actor; From 6a40c65118bce345372cb0e2fa690a5f2e769d85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 15:41:08 +0100 Subject: [PATCH 11/17] fix `endpoint_conn_type_stream` test - Properly close the connection - Rename to `endpoint_conn_type_becomes_direct` - Use `TestResult` - Rename `handle_direct_connection` to `wait_for_conn_type_direct` --- iroh/src/endpoint.rs | 75 +++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index a4af356127..f379daf01e 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -1443,6 +1443,8 @@ mod tests { use futures_lite::StreamExt; use iroh_test::CallOnDrop; use rand::SeedableRng; + use testresult::TestResult; + use tokio_util::task::AbortOnDropHandle; use tracing::{error_span, info, info_span, Instrument}; use super::*; @@ -1816,10 +1818,10 @@ mod tests { } #[tokio::test] - async fn endpoint_conn_type_stream() { + async fn endpoint_conn_type_becomes_direct() -> TestResult { const TIMEOUT: Duration = std::time::Duration::from_secs(15); let _logging_guard = iroh_test::logging::setup(); - let (relay_map, _relay_url, _relay_guard) = run_relay_server().await.unwrap(); + let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?; let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42); let ep1_secret_key = SecretKey::generate(&mut rng); let ep2_secret_key = SecretKey::generate(&mut rng); @@ -1829,18 +1831,16 @@ mod tests { .alpns(vec![TEST_ALPN.to_vec()]) .relay_mode(RelayMode::Custom(relay_map.clone())) .bind() - .await - .unwrap(); + .await?; let ep2 = Endpoint::builder() .secret_key(ep2_secret_key) .insecure_skip_relay_cert_verify(true) .alpns(vec![TEST_ALPN.to_vec()]) .relay_mode(RelayMode::Custom(relay_map)) .bind() - .await - .unwrap(); + .await?; - async fn handle_direct_conn(ep: &Endpoint, node_id: PublicKey) -> Result<()> { + async fn wait_for_conn_type_direct(ep: &Endpoint, node_id: PublicKey) -> TestResult { let mut stream = ep.conn_type(node_id)?.stream(); let src = ep.node_id().fmt_short(); let dst = node_id.fmt_short(); @@ -1850,53 +1850,56 @@ mod tests { return Ok(()); } } - anyhow::bail!("conn_type stream ended before `ConnectionType::Direct`"); + panic!("conn_type stream ended before `ConnectionType::Direct`"); } - async fn accept(ep: &Endpoint) -> NodeId { - let incoming = ep.accept().await.unwrap(); - let conn = incoming.await.unwrap(); - let node_id = get_remote_node_id(&conn).unwrap(); + async fn accept(ep: &Endpoint) -> TestResult { + let incoming = ep.accept().await.expect("ep closed"); + let conn = incoming.await?; + let node_id = get_remote_node_id(&conn)?; tracing::info!(node_id=%node_id.fmt_short(), "accepted connection"); - node_id + Ok(conn) } let ep1_nodeid = ep1.node_id(); let ep2_nodeid = ep2.node_id(); - let ep1_nodeaddr = ep1.node_addr().initialized().await.unwrap(); + let ep1_nodeaddr = ep1.node_addr().initialized().await?; tracing::info!( "node id 1 {ep1_nodeid}, relay URL {:?}", ep1_nodeaddr.relay_url() ); tracing::info!("node id 2 {ep2_nodeid}"); - let ep1_side = async move { - accept(&ep1).await; - handle_direct_conn(&ep1, ep2_nodeid).await - }; - - let ep2_side = async move { - ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap(); - handle_direct_conn(&ep2, ep1_nodeid).await - }; - - let res_ep1 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep1_side)); - - let ep1_abort_handle = res_ep1.abort_handle(); - let _ep1_guard = CallOnDrop::new(move || { - ep1_abort_handle.abort(); + let ep1_side = tokio::time::timeout(TIMEOUT, async move { + let conn = accept(&ep1).await?; + let mut send = conn.open_uni().await?; + wait_for_conn_type_direct(&ep1, ep2_nodeid).await?; + send.write_all(b"Conn is direct").await?; + send.finish()?; + conn.closed().await; + TestResult::Ok(()) }); - let res_ep2 = tokio::spawn(tokio::time::timeout(TIMEOUT, ep2_side)); - let ep2_abort_handle = res_ep2.abort_handle(); - let _ep2_guard = CallOnDrop::new(move || { - ep2_abort_handle.abort(); + let ep2_side = tokio::time::timeout(TIMEOUT, async move { + let conn = ep2.connect(ep1_nodeaddr, TEST_ALPN).await?; + let mut recv = conn.accept_uni().await?; + wait_for_conn_type_direct(&ep2, ep1_nodeid).await?; + let read = recv.read_to_end(100).await?; + assert_eq!(read, b"Conn is direct".to_vec()); + conn.close(0u32.into(), b"done"); + conn.closed().await; + TestResult::Ok(()) }); - let (r1, r2) = tokio::try_join!(res_ep1, res_ep2).unwrap(); - r1.expect("ep1 timeout").unwrap(); - r2.expect("ep2 timeout").unwrap(); + let res_ep1 = AbortOnDropHandle::new(tokio::spawn(ep1_side)); + let res_ep2 = AbortOnDropHandle::new(tokio::spawn(ep2_side)); + + let (r1, r2) = tokio::try_join!(res_ep1, res_ep2)?; + r1??; + r2??; + + Ok(()) } #[tokio::test] From 27a32c3a6bc091e769401635f2c8fa4dc5510a61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 16:10:20 +0100 Subject: [PATCH 12/17] fix doctests --- iroh/src/endpoint.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index f379daf01e..6ca105c6aa 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -812,7 +812,7 @@ impl Endpoint { /// To wait for a home relay connection to be established, use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::Endpoint; + /// use iroh::{Endpoint, watcher::{Watcher as _}}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { @@ -848,7 +848,7 @@ impl Endpoint { /// To get the first set of direct addresses use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::Endpoint; + /// use iroh::{Endpoint, watcher::{Watcher as _}}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { From e5caf8984628f52c604c7ec61564ef5adf2ccb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 16:46:06 +0100 Subject: [PATCH 13/17] fix doctests and add more documentation --- iroh/src/endpoint.rs | 30 ++++++++++++++---- iroh/src/magicsock.rs | 16 +++++++--- iroh/src/magicsock/node_map.rs | 2 ++ iroh/src/watcher.rs | 56 +++++++++++++++++++--------------- 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 6ca105c6aa..5d687a7898 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -74,9 +74,12 @@ const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); type DiscoveryBuilder = Box Option> + Send + Sync>; -/// TODO(matheus23): DOCS (don't even ask) +/// A type alias for the return value of [`Endpoint::node_addr`]. /// -/// Implements [`Watcher`]`>`. +/// This type implements [`Watcher`] with `Value` being an optional [`NodeAddr`]. +/// +/// We return a named type instead of `impl Watcher`, as this allows +/// you to e.g. store the watcher in a struct. pub type NodeAddrWatcher = watcher::Map< ( watcher::Direct>>, @@ -767,11 +770,26 @@ impl Endpoint { self.static_config.secret_key.public() } - /// Returns the current [`NodeAddr`] for this endpoint. + /// Returns a [`Watcher`] for the current [`NodeAddr`] for this endpoint. + /// + /// The observed [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses + /// as they would be returned by [`Endpoint::home_relay`] and [`Endpoint::direct_addresses`]. /// - /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and direct addresses - /// as they would be returned by [`Endpoint::home_relay`] and - /// [`Endpoint::direct_addresses`]. + /// Use [`Watcher::initialized`] to wait for a [`NodeAddr`] that is ready to be connected to: + /// + /// ```no_run + /// # async fn wrapper() -> testresult::TestResult { + /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// + /// let endpoint = Endpoint::builder() + /// .alpns(vec![b"my-alpn".to_vec()]) + /// .bind() + /// .await?; + /// let node_addr = endpoint.node_addr().initialized().await?; + /// # let _ = node_addr; + /// # Ok(()) + /// # } + /// ``` pub fn node_addr(&self) -> NodeAddrWatcher { let watch_addrs = self.direct_addresses(); let watch_relay = self.home_relay(); diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index cc925926f2..41b9c9e89e 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -66,7 +66,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::DnsResolver, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, - watcher::{Direct, Watchable}, + watcher::{self, Watchable}, }; mod metrics; @@ -323,7 +323,10 @@ impl MagicSock { /// store [`Some`] set of addresses. /// /// To get the current direct addresses, use [`Watcher::initialized`]. - pub(crate) fn direct_addresses(&self) -> Direct>> { + /// + /// [`Watcher`]: crate::watcher::Watcher + /// [`Watcher::initialized`]: crate::watcher::Watcher::initialized + pub(crate) fn direct_addresses(&self) -> watcher::Direct>> { self.direct_addrs.addrs.watch() } @@ -331,7 +334,10 @@ impl MagicSock { /// /// Note that this can be used to wait for the initial home relay to be known using /// [`Watcher::initialized`]. - pub(crate) fn home_relay(&self) -> Direct> { + /// + /// [`Watcher`]: crate::watcher::Watcher + /// [`Watcher::initialized`]: crate::watcher::Watcher::initialized + pub(crate) fn home_relay(&self) -> watcher::Direct> { self.my_relay.watch() } @@ -345,7 +351,9 @@ impl MagicSock { /// /// Will return an error if there is no address information known about the /// given `node_id`. - pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { + /// + /// [`Watcher`]: crate::watcher::Watcher + pub(crate) fn conn_type(&self, node_id: NodeId) -> Result> { self.node_map.conn_type(node_id) } diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 37a7ce63dd..094d12ee78 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -291,6 +291,8 @@ impl NodeMap { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `node_id` + /// + /// [`Watcher`]: crate::watcher::Watcher pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { self.inner.lock().expect("poisoned").conn_type(node_id) } diff --git a/iroh/src/watcher.rs b/iroh/src/watcher.rs index dec6384d93..b7f15a9df8 100644 --- a/iroh/src/watcher.rs +++ b/iroh/src/watcher.rs @@ -24,7 +24,7 @@ use sync::{Mutex, RwLock}; /// A wrapper around a value that notifies [`Watcher`]s when the value is modified. /// -/// Only the most recent value is available to any observer, but but observer is guaranteed +/// Only the most recent value is available to any observer, but the observer is guaranteed /// to be notified of the most recent value. #[derive(Debug, Default)] pub struct Watchable { @@ -86,7 +86,7 @@ impl Watchable { ret } - /// Creates a [`Watcher`] allowing the value to be observed, but not modified. + /// Creates a [`Direct`] [`Watcher`], allowing the value to be observed, but not modified. pub fn watch(&self) -> Direct { Direct { epoch: self.shared.state.read().expect("poisoned").epoch, @@ -102,8 +102,22 @@ impl Watchable { /// A handle to a value that's represented by one or more underlying [`Watchable`]s. /// -/// This handle allows one to observe the latest state and be notified -/// of any changes to this value. +/// A [`Watcher`] can get the current value, and will be notified when the value changes. +/// Only the most recent value is accessible, and if the threads with the underlying [`Watchable`]s +/// change the value faster than the threads with the [`Watcher`] can keep up with, then +/// it'll miss in-between values. +/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always +/// end up reporting the most recent state eventually. +/// +/// Watchers can be modified via [`Watcher::map`] to observe a value derived from the original +/// value via a function. +/// +/// Watchers can be combined via [`Watcher::or`] to allow observing multiple values at once and +/// getting an update in case any of the values updates. +/// +/// One of the underlying [`Watchable`]s might already be dropped. In that case, +/// the watcher will be "disconnected" and return [`Err(Disconnected)`](Disconnected) +/// on some function calls or, when turned into a stream, that stream will end. pub trait Watcher: Clone { /// The type of value that can change. /// @@ -120,7 +134,7 @@ pub trait Watcher: Clone { fn get(&self) -> Result; /// Polls for the next value, or returns [`Disconnected`] if one of the underlying - /// watchables has been dropped. + /// [`Watchable`]s has been dropped. fn poll_updated( &mut self, cx: &mut task::Context<'_>, @@ -142,6 +156,10 @@ pub trait Watcher: Clone { /// /// This is a utility for the common case of storing an [`Option`] inside a /// [`Watchable`]. + /// + /// # Cancel Safety + /// + /// The returned future is cancel-safe. fn initialized(&mut self) -> InitializedFut where Self: Watcher>, @@ -204,6 +222,9 @@ pub trait Watcher: Clone { } /// Maps this watcher with a function that transforms the observed values. + /// + /// The returned watcher will only register updates, when the *mapped* value + /// observably changes. For this, it needs to store a clone of `T` in the watcher. fn map( self, map: impl Fn(Self::Value) -> T + 'static, @@ -222,14 +243,9 @@ pub trait Watcher: Clone { } } -/// An observer for a value. +/// The immediate, direct observer of a [`Watchable`] value. /// -/// The [`Watcher`] can get the current value, and will be notified when the value changes. -/// Only the most recent value is accessible, and if the thread with the [`Watchable`] -/// changes the value faster than the thread with the [`Watcher`] can keep up with, then -/// it'll miss in-between values. -/// When the thread changing the [`Watchable`] pauses updating, the [`Watcher`] will always -/// end up reporting the most recent state eventually. +/// This type is mainly used via the [`Watcher`] interface. #[derive(Debug, Clone)] pub struct Direct { epoch: u64, @@ -283,8 +299,10 @@ impl Watcher for (S, T) { } } -/// Maps a [`Watcher`] and allows filtering updates. -#[derive(derive_more::Debug)] +/// Wraps a [`Watcher`] to allow observing a derived value. +/// +/// See [`Watcher::map`]. +#[derive(derive_more::Debug, Clone)] pub struct Map { #[debug("Arc T + 'static>")] map: Arc T + 'static>, @@ -292,16 +310,6 @@ pub struct Map { current: T, } -impl Clone for Map { - fn clone(&self) -> Self { - Self { - map: self.map.clone(), - watcher: self.watcher.clone(), - current: self.current.clone(), - } - } -} - impl Watcher for Map { type Value = T; From cd2bf41b9e60a1e87b2232e9f109d31937c9a84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 16:57:11 +0100 Subject: [PATCH 14/17] Cleanup --- iroh/src/discovery/pkarr.rs | 4 ++-- iroh/src/endpoint.rs | 6 +++--- iroh/src/endpoint/rtt_actor.rs | 12 ++++-------- iroh/src/magicsock/node_map.rs | 9 ++++++--- iroh/src/magicsock/node_map/node_state.rs | 4 ++-- iroh/src/watcher.rs | 5 +++++ 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/iroh/src/discovery/pkarr.rs b/iroh/src/discovery/pkarr.rs index ec6087008b..a493c177f3 100644 --- a/iroh/src/discovery/pkarr.rs +++ b/iroh/src/discovery/pkarr.rs @@ -61,7 +61,7 @@ use crate::{ discovery::{Discovery, DiscoveryItem}, dns::node_info::NodeInfo, endpoint::force_staging_infra, - watcher::{Direct, Disconnected, Watchable, Watcher as _}, + watcher::{self, Disconnected, Watchable, Watcher as _}, Endpoint, }; @@ -221,7 +221,7 @@ struct PublisherService { secret_key: SecretKey, #[debug("PkarrClient")] pkarr_client: PkarrRelayClient, - watcher: Direct>, + watcher: watcher::Direct>, ttl: u32, republish_interval: Duration, } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 5d687a7898..23ac5ee9ab 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -779,7 +779,7 @@ impl Endpoint { /// /// ```no_run /// # async fn wrapper() -> testresult::TestResult { - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// let endpoint = Endpoint::builder() /// .alpns(vec![b"my-alpn".to_vec()]) @@ -830,7 +830,7 @@ impl Endpoint { /// To wait for a home relay connection to be established, use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { @@ -866,7 +866,7 @@ impl Endpoint { /// To get the first set of direct addresses use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::{Watcher as _}}; + /// use iroh::{Endpoint, watcher::Watcher}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs index 7080b058bd..951760f120 100644 --- a/iroh/src/endpoint/rtt_actor.rs +++ b/iroh/src/endpoint/rtt_actor.rs @@ -13,11 +13,7 @@ use tokio::{ use tokio_util::task::AbortOnDropHandle; use tracing::{debug, error, info_span, trace, Instrument}; -use crate::{ - magicsock::ConnectionType, - metrics::MagicsockMetrics, - watcher::{Direct, Stream}, -}; +use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watcher}; #[derive(Debug)] pub(super) struct RttHandle { @@ -55,7 +51,7 @@ pub(super) enum RttMessage { /// The connection. connection: quinn::WeakConnectionHandle, /// Path changes for this connection from the magic socket. - conn_type_changes: Stream>, + conn_type_changes: watcher::Stream>, /// For reporting-only, the Node ID of this connection. node_id: NodeId, }, @@ -68,7 +64,7 @@ pub(super) enum RttMessage { #[derive(Debug)] struct RttActor { /// Stream of connection type changes. - connection_events: stream_group::Keyed>>, + connection_events: stream_group::Keyed>>, /// References to the connections. /// /// These are weak references so not to keep the connections alive. The key allows @@ -125,7 +121,7 @@ impl RttActor { fn handle_new_connection( &mut self, connection: quinn::WeakConnectionHandle, - conn_type_changes: Stream>, + conn_type_changes: watcher::Stream>, node_id: NodeId, ) { let key = self.connection_events.insert(conn_type_changes); diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 094d12ee78..8ceaf1d706 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -21,7 +21,7 @@ use super::{ }; use crate::{ disco::{CallMeMaybe, Pong, SendAddr}, - watcher::Direct, + watcher, }; mod best_addr; @@ -293,7 +293,10 @@ impl NodeMap { /// the `node_id` /// /// [`Watcher`]: crate::watcher::Watcher - pub(super) fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + pub(super) fn conn_type( + &self, + node_id: NodeId, + ) -> anyhow::Result> { self.inner.lock().expect("poisoned").conn_type(node_id) } @@ -461,7 +464,7 @@ impl NodeMapInner { /// /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` - fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { + fn conn_type(&self, node_id: NodeId) -> anyhow::Result> { match self.get(NodeStateKey::NodeId(node_id)) { Some(ep) => Ok(ep.conn_type()), None => anyhow::bail!("No endpoint for {node_id:?} found"), diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index f5a738e1a0..1f457812ed 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -24,7 +24,7 @@ use crate::{ disco::{self, SendAddr}, magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, util::relay_only_mode, - watcher::{Direct, Watchable}, + watcher::{self, Watchable}, }; /// Number of addresses that are not active that we keep around per node. @@ -191,7 +191,7 @@ impl NodeState { self.id } - pub(super) fn conn_type(&self) -> Direct { + pub(super) fn conn_type(&self) -> watcher::Direct { self.conn_type.watch() } diff --git a/iroh/src/watcher.rs b/iroh/src/watcher.rs index b7f15a9df8..f0297152bf 100644 --- a/iroh/src/watcher.rs +++ b/iroh/src/watcher.rs @@ -7,6 +7,11 @@ //! In that way, a [`Watchable`] is like a [`tokio::sync::broadcast::Sender`] (and a //! [`Watcher`] is like a [`tokio::sync::broadcast::Receiver`]), except that there's no risk //! of the channel filling up, but instead you might miss items. +//! +//! This module is meant to be imported like this (if you use all of these things): +//! ```ignore +//! use iroh::watcher::{self, Watchable, Watcher as _}; +//! ``` #[cfg(not(iroh_loom))] use std::sync; From 39106b51744bd5938d16e28161cb187eddd3b2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 16:57:34 +0100 Subject: [PATCH 15/17] `cargo make format` --- iroh/src/endpoint.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 23ac5ee9ab..70dd838016 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -779,7 +779,7 @@ impl Endpoint { /// /// ```no_run /// # async fn wrapper() -> testresult::TestResult { - /// use iroh::{Endpoint, watcher::Watcher}; + /// use iroh::{watcher::Watcher, Endpoint}; /// /// let endpoint = Endpoint::builder() /// .alpns(vec![b"my-alpn".to_vec()]) @@ -830,7 +830,7 @@ impl Endpoint { /// To wait for a home relay connection to be established, use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::Watcher}; + /// use iroh::{watcher::Watcher, Endpoint}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { @@ -866,7 +866,7 @@ impl Endpoint { /// To get the first set of direct addresses use [`Watcher::initialized`]: /// ```no_run /// use futures_lite::StreamExt; - /// use iroh::{Endpoint, watcher::Watcher}; + /// use iroh::{watcher::Watcher, Endpoint}; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); /// # rt.block_on(async move { From 652e12a36265577410e54a99b3691d2e1bf68579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 16:58:59 +0100 Subject: [PATCH 16/17] Fix `echo.rs` example --- iroh/examples/echo.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh/examples/echo.rs b/iroh/examples/echo.rs index e18aee8949..00e1d3ccf1 100644 --- a/iroh/examples/echo.rs +++ b/iroh/examples/echo.rs @@ -11,6 +11,7 @@ use futures_lite::future::Boxed as BoxedFuture; use iroh::{ endpoint::Connecting, protocol::{ProtocolHandler, Router}, + watcher::Watcher as _, Endpoint, NodeAddr, }; @@ -23,7 +24,7 @@ const ALPN: &[u8] = b"iroh-example/echo/0"; #[tokio::main] async fn main() -> Result<()> { let router = accept_side().await?; - let node_addr = router.endpoint().node_addr().await?; + let node_addr = router.endpoint().node_addr().initialized().await?; connect_side(node_addr).await?; From bfe5f3f08cc8cf2745aea8a3813612aa9cad6161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Thu, 19 Dec 2024 17:05:13 +0100 Subject: [PATCH 17/17] Fix `transfer.rs` example --- iroh/examples/transfer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index b5dee21393..159ca86d2d 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -8,7 +8,8 @@ use bytes::Bytes; use clap::{Parser, Subcommand}; use indicatif::HumanBytes; use iroh::{ - endpoint::ConnectionError, Endpoint, NodeAddr, RelayMap, RelayMode, RelayUrl, SecretKey, + endpoint::ConnectionError, watcher::Watcher as _, Endpoint, NodeAddr, RelayMap, RelayMode, + RelayUrl, SecretKey, }; use iroh_base::ticket::NodeTicket; use tracing::info;