Skip to content

Commit 60c7efc

Browse files
authored
Merge be70ea4 into 1bbdbf1
2 parents 1bbdbf1 + be70ea4 commit 60c7efc

File tree

9 files changed

+223
-18
lines changed

9 files changed

+223
-18
lines changed

Cargo.lock

Lines changed: 13 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l
4141
[workspace.lints.clippy]
4242
unused-async = "warn"
4343

44-
4544
[patch.crates-io]
46-
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
47-
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" }
45+
netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }
46+
portmapper = { git = "https://github.com/n0-computer/net-tools", branch = "Frando/on_closed" }
4847

49-
[patch."https://github.com/n0-computer/quinn"]
48+
# [patch."https://github.com/n0-computer/quinn"]
5049
# iroh-quinn = { path = "../iroh-quinn/quinn" }
5150
# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" }
5251
# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" }

iroh-relay/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ postcard = { version = "1", default-features = false, features = [
4242
"use-std",
4343
"experimental-derive",
4444
] }
45-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
46-
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
45+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
46+
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
4747
rand = "0.9.2"
4848
reqwest = { version = "0.12", default-features = false, features = [
4949
"rustls-tls",

iroh/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ n0-watcher = "0.5"
4444
netwatch = { version = "0.12" }
4545
pin-project = "1"
4646
pkarr = { version = "5", default-features = false, features = ["relays"] }
47-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["rustls-ring"] }
48-
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
49-
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
47+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["rustls-ring"] }
48+
quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
49+
quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
5050
rand = "0.9.2"
5151
reqwest = { version = "0.12", default-features = false, features = [
5252
"rustls-tls",
@@ -90,7 +90,7 @@ hickory-resolver = "0.25.1"
9090
igd-next = { version = "0.16", features = ["aio_tokio"] }
9191
netdev = { version = "0.38.1" }
9292
portmapper = { version = "0.12", default-features = false }
93-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
93+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed", default-features = false, features = ["runtime-tokio", "rustls-ring"] }
9494
tokio = { version = "1", features = [
9595
"io-util",
9696
"macros",

iroh/bench/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ iroh = { path = ".." }
1212
iroh-metrics = "0.37"
1313
n0-future = "0.3.0"
1414
n0-error = "0.1.0"
15-
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
15+
quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "Frando/on_closed" }
1616
rand = "0.9.2"
1717
rcgen = "0.14"
1818
rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use iroh::{
4+
Endpoint, RelayMode,
5+
endpoint::{ConnectionInfo, ConnectionMonitor},
6+
};
7+
use n0_error::{Result, StackResultExt, StdResultExt, ensure_any};
8+
use n0_future::task::AbortOnDropHandle;
9+
use tokio::{
10+
sync::mpsc::{UnboundedReceiver, UnboundedSender},
11+
task::JoinSet,
12+
};
13+
use tracing::{Instrument, info, info_span};
14+
15+
const ALPN: &[u8] = b"iroh/test";
16+
17+
#[tokio::main]
18+
async fn main() -> Result {
19+
tracing_subscriber::fmt()
20+
.with_env_filter(
21+
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
22+
)
23+
.init();
24+
25+
let monitor = Monitor::new();
26+
let server = Endpoint::empty_builder(RelayMode::Disabled)
27+
.alpns(vec![ALPN.to_vec()])
28+
.monitor_connections(monitor.clone())
29+
.bind()
30+
.instrument(info_span!("server"))
31+
.await?;
32+
let server_addr = server.addr();
33+
34+
let count = 2;
35+
36+
let client_task = tokio::spawn(
37+
async move {
38+
let client = Endpoint::empty_builder(RelayMode::Disabled)
39+
.bind()
40+
.instrument(info_span!("client"))
41+
.await?;
42+
for _i in 0..count {
43+
let conn = client.connect(server_addr.clone(), ALPN).await?;
44+
let mut s = conn.accept_uni().await.anyerr()?;
45+
let data = s.read_to_end(2).await.anyerr()?;
46+
ensure_any!(data == b"hi", "unexpected data");
47+
conn.close(23u32.into(), b"bye");
48+
}
49+
client.close().await;
50+
n0_error::Ok(client)
51+
}
52+
.instrument(info_span!("client")),
53+
);
54+
55+
let server_task = tokio::spawn(
56+
async move {
57+
for _i in 0..count {
58+
let conn = server
59+
.accept()
60+
.await
61+
.context("server endpoint closed")?
62+
.await?;
63+
let mut s = conn.open_uni().await.anyerr()?;
64+
s.write_all(b"hi").await.anyerr()?;
65+
conn.closed().await;
66+
}
67+
info!("done");
68+
server.close().await;
69+
n0_error::Ok(())
70+
}
71+
.instrument(info_span!("server")),
72+
);
73+
client_task.await.std_context("client")?.context("client")?;
74+
server_task.await.std_context("server")?.context("server")?;
75+
tokio::time::sleep(Duration::from_secs(1)).await;
76+
drop(monitor);
77+
Ok(())
78+
}
79+
80+
/// Our connection monitor impl.
81+
///
82+
/// This here only logs connection open and close events via tracing.
83+
/// It could also maintain a datastructure of all connections, or send the stats to some metrics service.
84+
#[derive(Clone)]
85+
struct Monitor {
86+
tx: UnboundedSender<ConnectionInfo>,
87+
_task: Arc<AbortOnDropHandle<()>>,
88+
}
89+
90+
impl ConnectionMonitor for Monitor {
91+
fn on_connection(&self, connection: ConnectionInfo) {
92+
self.tx.send(connection).ok();
93+
}
94+
}
95+
96+
impl Monitor {
97+
fn new() -> Self {
98+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
99+
let task = tokio::spawn(Self::run(rx).instrument(info_span!("watcher")));
100+
Self {
101+
tx,
102+
_task: Arc::new(AbortOnDropHandle::new(task)),
103+
}
104+
}
105+
106+
async fn run(mut rx: UnboundedReceiver<ConnectionInfo>) {
107+
let mut tasks = JoinSet::new();
108+
loop {
109+
tokio::select! {
110+
Some(conn) = rx.recv() => {
111+
let alpn = String::from_utf8_lossy(conn.alpn()).to_string();
112+
let remote = conn.remote_id().fmt_short();
113+
info!(%remote, %alpn, rtt=?conn.rtt(), "new connection");
114+
tasks.spawn(async move {
115+
match conn.closed().await {
116+
Some((close_reason, stats)) => {
117+
// We have access to the final stats of the connection!
118+
info!(%remote, %alpn, ?close_reason, udp_rx=stats.udp_rx.bytes, udp_tx=stats.udp_tx.bytes, "connection closed");
119+
}
120+
None => {
121+
// The connection was closed before we could register our stats-on-close listener.
122+
info!(%remote, %alpn, "connection closed before tracking started");
123+
}
124+
}
125+
}.instrument(tracing::Span::current()));
126+
}
127+
Some(res) = tasks.join_next(), if !tasks.is_empty() => res.expect("conn close task panicked"),
128+
else => break,
129+
}
130+
while let Some(res) = tasks.join_next().await {
131+
res.expect("conn close task panicked");
132+
}
133+
}
134+
}
135+
}

iroh/src/endpoint.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ pub struct Builder {
117117
#[cfg(any(test, feature = "test-utils"))]
118118
path_selection: PathSelection,
119119
max_tls_tickets: usize,
120+
#[debug("{}", connection_monitor.as_ref().map(|_| "Some(Box<dyn ConnectionMonitor>)").unwrap_or("None"))]
121+
connection_monitor: Option<Box<dyn ConnectionMonitor>>,
120122
}
121123

122124
impl Builder {
@@ -158,6 +160,7 @@ impl Builder {
158160
#[cfg(any(test, feature = "test-utils"))]
159161
path_selection: PathSelection::default(),
160162
max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
163+
connection_monitor: None,
161164
}
162165
}
163166

@@ -208,6 +211,7 @@ impl Builder {
208211
// #[cfg(any(test, feature = "test-utils"))]
209212
// path_selection: self.path_selection,
210213
metrics,
214+
connection_monitor: self.connection_monitor,
211215
};
212216

213217
let msock = magicsock::MagicSock::spawn(msock_opts).await?;
@@ -432,6 +436,44 @@ impl Builder {
432436
self.max_tls_tickets = n;
433437
self
434438
}
439+
440+
// TODO docs
441+
/// Register a handler that is invoked for each connection the endpoint accepts or initiates.
442+
///
443+
/// The [`ConnectionMonitor::on_connection`] method is invoked synchronosuly, from within a tokio
444+
/// context. So you can spawn tasks if needed.
445+
/// Make sure that whatever you do with the connection info here is non-blocking.
446+
/// Usually you'd want to send the info over a broadcast or unbounded channel,
447+
/// or insert it into some persistent datastructure.
448+
///
449+
/// The `ConnectionInfo` internally contains a weak reference to the connection,
450+
/// so keeping the struct alive does not keep the connection alive.
451+
/// Note however that `ConnectionInfo` does keep an allocation per connection alive
452+
/// so to not leak memory you should drop the `ConnectionInfo` eventually
453+
///
454+
/// [`ConnectionMonitor`] is implemented for `Fn(ConnectionInfo)`, so you can
455+
/// also pass a closure that takes [`ConnectionInfo`] to this function.
456+
pub fn monitor_connections(mut self, monitor: impl ConnectionMonitor) -> Self {
457+
self.connection_monitor = Some(Box::new(monitor));
458+
self
459+
}
460+
}
461+
462+
/// Monitor each connection accepted or initiated by the endpoint.
463+
pub trait ConnectionMonitor: Send + Sync + 'static {
464+
/// Called for each new connection the endpoint accepts or initiates.
465+
///
466+
/// This is only called when a connection is fully established.
467+
fn on_connection(&self, connection: ConnectionInfo);
468+
}
469+
470+
impl<T> ConnectionMonitor for T
471+
where
472+
T: Fn(ConnectionInfo) + Send + Sync + 'static,
473+
{
474+
fn on_connection(&self, connection: ConnectionInfo) {
475+
(self)(connection)
476+
}
435477
}
436478

437479
/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.

iroh/src/endpoint/connection.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,6 +1608,14 @@ impl ConnectionInfo {
16081608
pub fn side(&self) -> Side {
16091609
self.side
16101610
}
1611+
1612+
/// Waits for the connection to be closed, and returns the close reason and final connection stats.
1613+
///
1614+
/// Returns `None` if the connection has been dropped already before this call.
1615+
pub async fn closed(&self) -> Option<(ConnectionError, ConnectionStats)> {
1616+
let fut = self.inner.upgrade()?.on_closed();
1617+
Some(fut.await)
1618+
}
16111619
}
16121620

16131621
#[cfg(test)]

0 commit comments

Comments
 (0)