Skip to content

Commit ddbf8c1

Browse files
committed
refactor: expose Watcher, not Watchable
1 parent e0f10ce commit ddbf8c1

File tree

4 files changed

+38
-36
lines changed

4 files changed

+38
-36
lines changed

iroh/src/endpoint/connection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
discovery::DiscoveryTask,
4444
magicsock::{
4545
EndpointStateActorStoppedError,
46-
endpoint_map::{PathInfoList, PathsWatchable},
46+
endpoint_map::{PathInfoList, PathsWatcher},
4747
},
4848
};
4949

@@ -1213,7 +1213,7 @@ pub struct Connection {
12131213
inner: quinn::Connection,
12141214
remote_id: EndpointId,
12151215
alpn: Vec<u8>,
1216-
paths: PathsWatchable,
1216+
paths: PathsWatcher,
12171217
}
12181218

12191219
#[allow(missing_docs)]
@@ -1463,7 +1463,7 @@ impl Connection {
14631463
/// [`PathInfo::is_selected`]: crate::magicsock::PathInfo::is_selected
14641464
/// [`PathInfo`]: crate::magicsock::PathInfo
14651465
pub fn paths(&self) -> impl Watcher<Value = PathInfoList> {
1466-
self.paths.watch(self.inner.weak_handle())
1466+
self.paths.clone()
14671467
}
14681468

14691469
/// Derives keying material from this connection's TLS session secrets.

iroh/src/magicsock.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use crate::{
6464
disco::{self, SendAddr},
6565
discovery::{ConcurrentDiscovery, Discovery, EndpointData, UserData},
6666
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
67-
magicsock::endpoint_map::PathsWatchable,
67+
magicsock::endpoint_map::PathsWatcher,
6868
metrics::EndpointMetrics,
6969
net_report::{self, IfStateDetails, Report},
7070
};
@@ -275,14 +275,14 @@ impl MagicSock {
275275
/// The actor is responsible for holepunching and opening additional paths to this
276276
/// connection.
277277
///
278-
/// Returns a future that resolves to [`PathsWatchable`].
278+
/// Returns a future that resolves to [`PathsWatcher`].
279279
///
280280
/// The returned future is `'static`, so it can be stored without being liftetime-bound to `&self`.
281281
pub(crate) fn register_connection(
282282
&self,
283283
remote: EndpointId,
284284
conn: WeakConnectionHandle,
285-
) -> impl Future<Output = Result<PathsWatchable, EndpointStateActorStoppedError>> + Send + 'static
285+
) -> impl Future<Output = Result<PathsWatcher, EndpointStateActorStoppedError>> + Send + 'static
286286
{
287287
let (tx, rx) = oneshot::channel();
288288
let sender = self.endpoint_map.endpoint_state_actor(remote);

iroh/src/magicsock/endpoint_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ mod endpoint_state;
2525
mod path_state;
2626

2727
pub(super) use endpoint_state::EndpointStateMessage;
28-
pub(crate) use endpoint_state::PathsWatchable;
28+
pub(crate) use endpoint_state::PathsWatcher;
2929
pub use endpoint_state::{ConnectionType, PathInfo, PathInfoList};
3030
use endpoint_state::{EndpointStateActor, EndpointStateHandle};
3131

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,10 @@ impl EndpointStateActor {
331331
async fn handle_msg_add_connection(
332332
&mut self,
333333
handle: WeakConnectionHandle,
334-
tx: oneshot::Sender<PathsWatchable>,
334+
tx: oneshot::Sender<PathsWatcher>,
335335
) {
336336
let pub_open_paths = Watchable::default();
337+
337338
if let Some(conn) = handle.upgrade() {
338339
// Remove any conflicting stable_ids from the local state.
339340
let conn_id = ConnId(conn.stable_id());
@@ -393,11 +394,12 @@ impl EndpointStateActor {
393394
}
394395
self.trigger_holepunching().await;
395396
}
396-
tx.send(PathsWatchable {
397-
open_paths: pub_open_paths,
398-
selected_path: self.selected_path.clone(),
399-
})
400-
.ok();
397+
398+
// Create a watcher over the open path and the selected path and pass it back
399+
// to the Connection.
400+
let paths_watcher =
401+
paths_watcher(handle, pub_open_paths.watch(), self.selected_path.watch());
402+
tx.send(paths_watcher).ok();
401403
}
402404

403405
/// Handles [`EndpointStateMessage::AddEndpointAddr`].
@@ -996,7 +998,7 @@ pub(crate) enum EndpointStateMessage {
996998
/// needed, any new paths discovered via holepunching will be added. And closed paths
997999
/// will be removed etc.
9981000
#[debug("AddConnection(..)")]
999-
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatchable>),
1001+
AddConnection(WeakConnectionHandle, oneshot::Sender<PathsWatcher>),
10001002
/// Adds a [`EndpointAddr`] with locations where the endpoint might be reachable.
10011003
AddEndpointAddr(EndpointAddr, Source),
10021004
/// Process a received DISCO CallMeMaybe message.
@@ -1143,30 +1145,30 @@ impl ConnectionState {
11431145
}
11441146
}
11451147

1146-
/// Watchables for the open paths and selected transmission path in a connection.
1147-
///
1148-
/// This is stored in the [`Connection`], and the watchables are set from within the endpoint state actor.
1149-
///
1150-
/// [`Connection`]: crate::endpoint::Connection
1151-
#[derive(Debug, Default, Clone)]
1152-
pub(crate) struct PathsWatchable {
1153-
/// Watchable for the open paths (in this connection).
1154-
open_paths: Watchable<PathAddrList>,
1155-
/// Watchable for the selected transmission path (global for this remote endpoint).
1156-
selected_path: Watchable<Option<transports::Addr>>,
1157-
}
1148+
/// Watcher over [`PathInfoList`].
1149+
pub(crate) type PathsWatcher = n0_watcher::Map<
1150+
(
1151+
n0_watcher::Direct<PathAddrList>,
1152+
n0_watcher::Direct<Option<transports::Addr>>,
1153+
),
1154+
PathInfoList,
1155+
>;
11581156

1159-
impl PathsWatchable {
1160-
pub(crate) fn watch(
1161-
&self,
1162-
conn_handle: WeakConnectionHandle,
1163-
) -> impl Watcher<Value = PathInfoList> {
1164-
let joined_watcher = (self.open_paths.watch(), self.selected_path.watch());
1165-
joined_watcher.map(move |(open_paths, selected_path)| {
1166-
let selected_path: Option<TransportAddr> = selected_path.map(Into::into);
1157+
/// Combines the open_paths and selected_path watchers into a watcher over [`PathInfoList`].
1158+
///
1159+
/// Takes a [`WeakConnectionHandle`] which is used to populate the stats in [`PathInfo`] when the list updates.
1160+
fn paths_watcher(
1161+
conn_handle: WeakConnectionHandle,
1162+
open_paths_watcher: n0_watcher::Direct<PathAddrList>,
1163+
selected_path_watcher: n0_watcher::Direct<Option<transports::Addr>>,
1164+
) -> PathsWatcher {
1165+
open_paths_watcher
1166+
.or(selected_path_watcher)
1167+
.map(move |(open_paths, selected_path)| {
11671168
let Some(conn) = conn_handle.upgrade() else {
11681169
return PathInfoList(Default::default());
11691170
};
1171+
let selected_path = selected_path.map(TransportAddr::from);
11701172
let list = open_paths
11711173
.into_iter()
11721174
.flat_map(move |(remote, path_id)| {
@@ -1175,7 +1177,6 @@ impl PathsWatchable {
11751177
.collect();
11761178
PathInfoList(list)
11771179
})
1178-
}
11791180
}
11801181

11811182
/// List of [`PathInfo`] for the network paths of a [`Connection`].
@@ -1210,10 +1211,10 @@ impl PathInfoList {
12101211
#[derive(derive_more::Debug, Clone)]
12111212
pub struct PathInfo {
12121213
path_id: PathId,
1214+
remote: TransportAddr,
12131215
#[debug(skip)]
12141216
handle: WeakConnectionHandle,
12151217
stats: PathStats,
1216-
remote: TransportAddr,
12171218
is_selected: bool,
12181219
}
12191220

@@ -1222,6 +1223,7 @@ impl PartialEq for PathInfo {
12221223
self.path_id == other.path_id
12231224
&& self.remote == other.remote
12241225
&& self.is_selected == other.is_selected
1226+
&& self.stats == other.stats
12251227
}
12261228
}
12271229

0 commit comments

Comments
 (0)