Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ http = "1"
iroh-base = { version = "0.95.1", default-features = false, features = ["key", "relay"], path = "../iroh-base" }
iroh-relay = { version = "0.95", path = "../iroh-relay", default-features = false }
n0-future = "0.3.0"
n0-error = "0.1.0"
n0-error = "0.1.2"
n0-watcher = "0.5"
netwatch = { version = "0.12" }
pin-project = "1"
Expand All @@ -50,6 +50,7 @@ rustls = { version = "0.23.33", default-features = false, features = ["ring"] }
serde = { version = "1.0.219", features = ["derive", "rc"] }
smallvec = "1.11.1"
strum = { version = "0.27", features = ["derive"] }
sync_wrapper = { version = "1.0.2", features = ["futures"] }
tokio = { version = "1.44.1", features = [
"io-util",
"macros",
Expand Down
162 changes: 10 additions & 152 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,11 @@
use std::sync::{Arc, RwLock};

use iroh_base::{EndpointAddr, EndpointId};
use n0_error::{AnyError, e, ensure, stack_error};
use n0_future::{
boxed::BoxStream,
stream::StreamExt,
task::{self, AbortOnDropHandle},
time::{self, Duration},
};
use tokio::sync::oneshot;
use tracing::{Instrument, debug, error_span, warn};
use n0_error::{AnyError, e, stack_error};
use n0_future::boxed::BoxStream;

use crate::Endpoint;
pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData};
use crate::{Endpoint, magicsock::endpoint_map::Source};

#[cfg(not(wasm_browser))]
pub mod dns;
Expand Down Expand Up @@ -218,6 +211,7 @@ impl IntoDiscoveryError {
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
#[derive(Clone)]
pub enum DiscoveryError {
#[error("No discovery service configured")]
NoServiceConfigured,
Expand All @@ -226,7 +220,7 @@ pub enum DiscoveryError {
#[error("Service '{provenance}' error")]
User {
provenance: &'static str,
source: AnyError,
source: Arc<AnyError>,
},
}

Expand All @@ -237,10 +231,7 @@ impl DiscoveryError {
provenance: &'static str,
source: T,
) -> Self {
e!(DiscoveryError::User {
provenance,
source: AnyError::from_std(source)
})
Self::from_err_any(provenance, AnyError::from_std(source))
}

/// Creates a new user error from an arbitrary boxed error type.
Expand All @@ -249,18 +240,15 @@ impl DiscoveryError {
provenance: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> Self {
e!(DiscoveryError::User {
provenance,
source: AnyError::from_std_box(source)
})
Self::from_err_any(provenance, AnyError::from_std_box(source))
}

/// Creates a new user error from an arbitrary error type that can be converted into [`AnyError`].
#[track_caller]
pub fn from_err_any(provenance: &'static str, source: impl Into<AnyError>) -> Self {
e!(DiscoveryError::User {
provenance,
source: source.into()
source: Arc::new(source.into())
})
}
}
Expand Down Expand Up @@ -502,148 +490,18 @@ impl Discovery for ConcurrentDiscovery {
}
}

/// A wrapper around a tokio task which runs a node discovery.
pub(super) struct DiscoveryTask {
on_first_rx: oneshot::Receiver<Result<(), DiscoveryError>>,
_task: AbortOnDropHandle<()>,
}

impl DiscoveryTask {
/// Starts a discovery task.
pub(super) fn start(ep: Endpoint, endpoint_id: EndpointId) -> Result<Self, DiscoveryError> {
ensure!(
!ep.discovery().is_empty(),
DiscoveryError::NoServiceConfigured
);
let (on_first_tx, on_first_rx) = oneshot::channel();
let me = ep.id();
let task = task::spawn(
async move { Self::run(ep, endpoint_id, on_first_tx).await }.instrument(
error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()),
),
);
Ok(Self {
_task: AbortOnDropHandle::new(task),
on_first_rx,
})
}

/// Starts a discovery task after a delay and only if no path to the endpoint was recently active.
///
/// This returns `None` if we received data or control messages from the remote endpoint
/// recently enough. If not it returns a [`DiscoveryTask`].
///
/// If `delay` is set, the [`DiscoveryTask`] will first wait for `delay` and then check again
/// if we recently received messages from remote endpoint. If true, the task will abort.
/// Otherwise, or if no `delay` is set, the discovery will be started.
pub(super) fn start_after_delay(
ep: &Endpoint,
endpoint_id: EndpointId,
delay: Duration,
) -> Result<Option<Self>, DiscoveryError> {
// If discovery is not needed, don't even spawn a task.
ensure!(
!ep.discovery().is_empty(),
DiscoveryError::NoServiceConfigured
);
let (on_first_tx, on_first_rx) = oneshot::channel();
let ep = ep.clone();
let me = ep.id();
let task = task::spawn(
async move {
time::sleep(delay).await;
Self::run(ep, endpoint_id, on_first_tx).await
}
.instrument(
error_span!("discovery", me = %me.fmt_short(), endpoint = %endpoint_id.fmt_short()),
),
);
Ok(Some(Self {
_task: AbortOnDropHandle::new(task),
on_first_rx,
}))
}

/// Waits until the discovery task produced at least one result.
pub(super) async fn first_arrived(&mut self) -> Result<(), DiscoveryError> {
let fut = &mut self.on_first_rx;
fut.await.expect("sender dropped")?;
Ok(())
}

fn create_stream(
ep: &Endpoint,
endpoint_id: EndpointId,
) -> Result<BoxStream<Result<DiscoveryItem, DiscoveryError>>, DiscoveryError> {
ensure!(
!ep.discovery().is_empty(),
DiscoveryError::NoServiceConfigured
);
let stream = ep
.discovery()
.resolve(endpoint_id)
.ok_or_else(|| e!(DiscoveryError::NoResults { endpoint_id }))?;
Ok(stream)
}

async fn run(
ep: Endpoint,
endpoint_id: EndpointId,
on_first_tx: oneshot::Sender<Result<(), DiscoveryError>>,
) {
let mut stream = match Self::create_stream(&ep, endpoint_id) {
Ok(stream) => stream,
Err(err) => {
on_first_tx.send(Err(err)).ok();
return;
}
};
let mut on_first_tx = Some(on_first_tx);
debug!("starting");
loop {
match stream.next().await {
Some(Ok(r)) => {
let provenance = r.provenance;
let endpoint_addr = r.to_endpoint_addr();
if endpoint_addr.is_empty() {
debug!(%provenance, "empty address found");
continue;
}
debug!(%provenance, addr = ?endpoint_addr, "new address found");
let source = Source::Discovery {
name: provenance.to_string(),
};
ep.add_endpoint_addr(endpoint_addr, source).await.ok();

if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
}
}
Some(Err(err)) => {
warn!(?err, "discovery service produced error");
break;
}
None => break,
}
}
if let Some(tx) = on_first_tx.take() {
tx.send(Err(e!(DiscoveryError::NoResults { endpoint_id })))
.ok();
}
}
}

#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
time::SystemTime,
time::{Duration, SystemTime},
};

use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
use n0_error::{AnyError as Error, Result, StackResultExt};
use n0_future::{StreamExt, time};
use quinn::{IdleTimeout, TransportConfig};
use rand::{CryptoRng, Rng, SeedableRng};
use tokio_util::task::AbortOnDropHandle;
Expand Down
Loading
Loading