Skip to content

Commit 1d5937c

Browse files
committed
refactor: use Connection::on_closed in endpoint state actor (#3627)
In the endpoint state actor, this uses the `Connection::on_closed` future added in n0-computer/quinn#153 to remove connections once they are closed instead of relying on manual cleanup.
1 parent 7887fb5 commit 1d5937c

File tree

1 file changed

+38
-12
lines changed

1 file changed

+38
-12
lines changed

iroh/src/magicsock/endpoint_map/endpoint_state.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use std::{
33
net::SocketAddr,
44
pin::Pin,
55
sync::Arc,
6+
task::Poll,
67
};
78

89
use iroh_base::{EndpointAddr, EndpointId, RelayUrl, TransportAddr};
910
use n0_error::StdResultExt;
1011
use n0_future::{
11-
MergeUnbounded, Stream, StreamExt,
12+
FuturesUnordered, MergeUnbounded, Stream, StreamExt,
1213
task::{self, AbortOnDropHandle},
1314
time::{self, Duration, Instant},
1415
};
@@ -111,6 +112,8 @@ pub(super) struct EndpointStateActor {
111112
//
112113
/// All connections we have to this remote endpoint.
113114
connections: FxHashMap<ConnId, ConnectionState>,
115+
/// Notifications when connections are closed.
116+
connections_close: FuturesUnordered<OnClosed>,
114117
/// Events emitted by Quinn about path changes, for all paths, all connections.
115118
path_events: PathEvents,
116119

@@ -161,6 +164,7 @@ impl EndpointStateActor {
161164
relay_mapped_addrs,
162165
disco,
163166
connections: FxHashMap::default(),
167+
connections_close: Default::default(),
164168
path_events: Default::default(),
165169
paths: FxHashMap::default(),
166170
last_holepunch: None,
@@ -234,6 +238,9 @@ impl EndpointStateActor {
234238
Some((id, evt)) = self.path_events.next() => {
235239
self.handle_path_event(id, evt);
236240
}
241+
Some(conn_id) = self.connections_close.next(), if !self.connections_close.is_empty() => {
242+
self.connections.remove(&conn_id);
243+
}
237244
_ = self.local_addrs.updated() => {
238245
trace!("local addrs updated, triggering holepunching");
239246
self.trigger_holepunching().await;
@@ -334,13 +341,11 @@ impl EndpointStateActor {
334341
let conn_id = ConnId(conn.stable_id());
335342
self.connections.remove(&conn_id);
336343

337-
// This is a good time to clean up connections.
338-
self.cleanup_connections();
339-
340344
// Store the connection and hook up paths events stream.
341345
let events = BroadcastStream::new(conn.path_events());
342346
let stream = events.map(move |evt| (conn_id, evt));
343347
self.path_events.push(Box::pin(stream));
348+
self.connections_close.push(OnClosed::new(&conn));
344349
let conn_state = self
345350
.connections
346351
.entry(conn_id)
@@ -864,12 +869,6 @@ impl EndpointStateActor {
864869
}
865870
}
866871

867-
/// Clean up connections which no longer exist.
868-
// TODO: Call this on a schedule.
869-
fn cleanup_connections(&mut self) {
870-
self.connections.retain(|_, c| c.handle.upgrade().is_some());
871-
}
872-
873872
/// Selects the path with the lowest RTT, prefers direct paths.
874873
///
875874
/// If there are direct paths, this selects the direct path with the lowest RTT. If
@@ -1296,7 +1295,34 @@ impl PathInfo {
12961295
fn now_or_never<T, F: Future<Output = T>>(fut: F) -> Option<T> {
12971296
let fut = std::pin::pin!(fut);
12981297
match fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop())) {
1299-
std::task::Poll::Ready(res) => Some(res),
1300-
std::task::Poll::Pending => None,
1298+
Poll::Ready(res) => Some(res),
1299+
Poll::Pending => None,
1300+
}
1301+
}
1302+
1303+
/// Future that resolves to the `conn_id` once a connection is closed.
1304+
///
1305+
/// This uses [`quinn::Connection::on_closed`], which does not keep the connection alive
1306+
/// while awaiting the future.
1307+
struct OnClosed {
1308+
conn_id: ConnId,
1309+
inner: quinn::OnClosed,
1310+
}
1311+
1312+
impl OnClosed {
1313+
fn new(conn: &quinn::Connection) -> Self {
1314+
Self {
1315+
conn_id: ConnId(conn.stable_id()),
1316+
inner: conn.on_closed(),
1317+
}
1318+
}
1319+
}
1320+
1321+
impl Future for OnClosed {
1322+
type Output = ConnId;
1323+
1324+
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
1325+
let (_close_reason, _stats) = std::task::ready!(Pin::new(&mut self.inner).poll(cx));
1326+
Poll::Ready(self.conn_id)
13011327
}
13021328
}

0 commit comments

Comments
 (0)