Skip to content

Commit 6189b7f

Browse files
committed
refactor: move discovery into EndpointStateActor
1 parent e0f10ce commit 6189b7f

File tree

8 files changed

+177
-405
lines changed

8 files changed

+177
-405
lines changed

Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ unused-async = "warn"
4343

4444

4545
[patch.crates-io]
46+
n0-error = { git = "https://github.com/n0-computer/n0-error", branch = "main" }
4647
iroh-quinn = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
4748
iroh-quinn-proto = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }
4849
iroh-quinn-udp = { git = "https://github.com/n0-computer/quinn", branch = "main-iroh" }

iroh/src/discovery.rs

Lines changed: 10 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,11 @@
113113
use std::sync::{Arc, RwLock};
114114

115115
use iroh_base::{EndpointAddr, EndpointId};
116-
use n0_error::{AnyError, e, ensure, stack_error};
117-
use n0_future::{
118-
boxed::BoxStream,
119-
stream::StreamExt,
120-
task::{self, AbortOnDropHandle},
121-
time::{self, Duration},
122-
};
123-
use tokio::sync::oneshot;
124-
use tracing::{Instrument, debug, error_span, warn};
116+
use n0_error::{AnyError, e, stack_error};
117+
use n0_future::boxed::BoxStream;
125118

119+
use crate::Endpoint;
126120
pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData};
127-
use crate::{Endpoint, magicsock::endpoint_map::Source};
128121

129122
#[cfg(not(wasm_browser))]
130123
pub mod dns;
@@ -218,6 +211,7 @@ impl IntoDiscoveryError {
218211
#[allow(missing_docs)]
219212
#[stack_error(derive, add_meta)]
220213
#[non_exhaustive]
214+
#[derive(Clone)]
221215
pub enum DiscoveryError {
222216
#[error("No discovery service configured")]
223217
NoServiceConfigured,
@@ -226,7 +220,7 @@ pub enum DiscoveryError {
226220
#[error("Service '{provenance}' error")]
227221
User {
228222
provenance: &'static str,
229-
source: AnyError,
223+
source: Arc<AnyError>,
230224
},
231225
}
232226

@@ -237,10 +231,7 @@ impl DiscoveryError {
237231
provenance: &'static str,
238232
source: T,
239233
) -> Self {
240-
e!(DiscoveryError::User {
241-
provenance,
242-
source: AnyError::from_std(source)
243-
})
234+
Self::from_err_any(provenance, AnyError::from_std(source))
244235
}
245236

246237
/// Creates a new user error from an arbitrary boxed error type.
@@ -249,18 +240,15 @@ impl DiscoveryError {
249240
provenance: &'static str,
250241
source: Box<dyn std::error::Error + Send + Sync + 'static>,
251242
) -> Self {
252-
e!(DiscoveryError::User {
253-
provenance,
254-
source: AnyError::from_std_box(source)
255-
})
243+
Self::from_err_any(provenance, AnyError::from_std_box(source))
256244
}
257245

258246
/// Creates a new user error from an arbitrary error type that can be converted into [`AnyError`].
259247
#[track_caller]
260248
pub fn from_err_any(provenance: &'static str, source: impl Into<AnyError>) -> Self {
261249
e!(DiscoveryError::User {
262250
provenance,
263-
source: source.into()
251+
source: Arc::new(source.into())
264252
})
265253
}
266254
}
@@ -502,148 +490,18 @@ impl Discovery for ConcurrentDiscovery {
502490
}
503491
}
504492

505-
/// A wrapper around a tokio task which runs a node discovery.
506-
pub(super) struct DiscoveryTask {
507-
on_first_rx: oneshot::Receiver<Result<(), DiscoveryError>>,
508-
_task: AbortOnDropHandle<()>,
509-
}
510-
511-
impl DiscoveryTask {
512-
/// Starts a discovery task.
513-
pub(super) fn start(ep: Endpoint, endpoint_id: EndpointId) -> Result<Self, DiscoveryError> {
514-
ensure!(
515-
!ep.discovery().is_empty(),
516-
DiscoveryError::NoServiceConfigured
517-
);
518-
let (on_first_tx, on_first_rx) = oneshot::channel();
519-
let me = ep.id();
520-
let task = task::spawn(
521-
async move { Self::run(ep, endpoint_id, on_first_tx).await }.instrument(
522-
error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()),
523-
),
524-
);
525-
Ok(Self {
526-
_task: AbortOnDropHandle::new(task),
527-
on_first_rx,
528-
})
529-
}
530-
531-
/// Starts a discovery task after a delay and only if no path to the endpoint was recently active.
532-
///
533-
/// This returns `None` if we received data or control messages from the remote endpoint
534-
/// recently enough. If not it returns a [`DiscoveryTask`].
535-
///
536-
/// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again
537-
/// if we recently received messages from remote endpoint. If true, the task will abort.
538-
/// Otherwise, or if no `delay` is set, the discovery will be started.
539-
pub(super) fn start_after_delay(
540-
ep: &Endpoint,
541-
endpoint_id: EndpointId,
542-
delay: Duration,
543-
) -> Result<Option<Self>, DiscoveryError> {
544-
// If discovery is not needed, don't even spawn a task.
545-
ensure!(
546-
!ep.discovery().is_empty(),
547-
DiscoveryError::NoServiceConfigured
548-
);
549-
let (on_first_tx, on_first_rx) = oneshot::channel();
550-
let ep = ep.clone();
551-
let me = ep.id();
552-
let task = task::spawn(
553-
async move {
554-
time::sleep(delay).await;
555-
Self::run(ep, endpoint_id, on_first_tx).await
556-
}
557-
.instrument(
558-
error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()),
559-
),
560-
);
561-
Ok(Some(Self {
562-
_task: AbortOnDropHandle::new(task),
563-
on_first_rx,
564-
}))
565-
}
566-
567-
/// Waits until the discovery task produced at least one result.
568-
pub(super) async fn first_arrived(&mut self) -> Result<(), DiscoveryError> {
569-
let fut = &mut self.on_first_rx;
570-
fut.await.expect("sender dropped")?;
571-
Ok(())
572-
}
573-
574-
fn create_stream(
575-
ep: &Endpoint,
576-
endpoint_id: EndpointId,
577-
) -> Result<BoxStream<Result<DiscoveryItem, DiscoveryError>>, DiscoveryError> {
578-
ensure!(
579-
!ep.discovery().is_empty(),
580-
DiscoveryError::NoServiceConfigured
581-
);
582-
let stream = ep
583-
.discovery()
584-
.resolve(endpoint_id)
585-
.ok_or_else(|| e!(DiscoveryError::NoResults { endpoint_id }))?;
586-
Ok(stream)
587-
}
588-
589-
async fn run(
590-
ep: Endpoint,
591-
endpoint_id: EndpointId,
592-
on_first_tx: oneshot::Sender<Result<(), DiscoveryError>>,
593-
) {
594-
let mut stream = match Self::create_stream(&ep, endpoint_id) {
595-
Ok(stream) => stream,
596-
Err(err) => {
597-
on_first_tx.send(Err(err)).ok();
598-
return;
599-
}
600-
};
601-
let mut on_first_tx = Some(on_first_tx);
602-
debug!("starting");
603-
loop {
604-
match stream.next().await {
605-
Some(Ok(r)) => {
606-
let provenance = r.provenance;
607-
let endpoint_addr = r.to_endpoint_addr();
608-
if endpoint_addr.is_empty() {
609-
debug!(%provenance, "empty address found");
610-
continue;
611-
}
612-
debug!(%provenance, addr = ?endpoint_addr, "new address found");
613-
let source = Source::Discovery {
614-
name: provenance.to_string(),
615-
};
616-
ep.add_endpoint_addr(endpoint_addr, source).await.ok();
617-
618-
if let Some(tx) = on_first_tx.take() {
619-
tx.send(Ok(())).ok();
620-
}
621-
}
622-
Some(Err(err)) => {
623-
warn!(?err, "discovery service produced error");
624-
break;
625-
}
626-
None => break,
627-
}
628-
}
629-
if let Some(tx) = on_first_tx.take() {
630-
tx.send(Err(e!(DiscoveryError::NoResults { endpoint_id })))
631-
.ok();
632-
}
633-
}
634-
}
635-
636493
#[cfg(test)]
637494
mod tests {
638495
use std::{
639496
collections::HashMap,
640497
net::SocketAddr,
641498
sync::{Arc, Mutex},
642-
time::SystemTime,
499+
time::{Duration, SystemTime},
643500
};
644501

645502
use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
646503
use n0_error::{AnyError as Error, Result, StackResultExt};
504+
use n0_future::{StreamExt, time};
647505
use quinn::{IdleTimeout, TransportConfig};
648506
use rand::{CryptoRng, Rng, SeedableRng};
649507
use tokio_util::task::AbortOnDropHandle;

0 commit comments

Comments
 (0)