Skip to content

chore: upgrade to iroh and iroh-blobs @ v0.32.0 #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
@@ -27,6 +27,6 @@ missing_debug_implementations = "warn"
unused-async = "warn"

[workspace.dependencies]
iroh = "0.31"
iroh-base = "0.31"
iroh-blobs = { version = "0.31", features = ["rpc"] }
iroh = { version ="0.32", features = ["discovery-pkarr-dht"] }
iroh-base = "0.32"
iroh-blobs = { version = "0.32", features = ["rpc"] }
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
let connection = iroh_endpoint
.connect(tracker, iroh_mainline_content_discovery::protocol::ALPN)
.await?;
iroh_mainline_content_discovery::announce(connection, signed_announce).await?;
iroh_mainline_content_discovery::announce_iroh(connection, signed_announce).await?;
}
}
if !args.quic_tracker.is_empty() {
@@ -82,7 +82,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
for tracker in args.quic_tracker {
println!("announcing via quic to {:?}: {}", tracker, content);
let connection = quinn_endpoint.connect(tracker, "localhost")?.await?;
iroh_mainline_content_discovery::announce(connection, signed_announce).await?;
iroh_mainline_content_discovery::announce_quinn(connection, signed_announce).await?;
}
}

Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ hex = "0.4.3"

# Optional features for the client functionality
tracing = { version = "0.1", optional = true }
iroh-quinn = { version = "0.12", optional = true }
iroh-quinn = { version = "0.13", optional = true }
mainline = { version = "2.0.0", optional = true, features = ["async"] }
anyhow = { version = "1", features = ["backtrace"], optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"], optional = true }
72 changes: 62 additions & 10 deletions content-discovery/iroh-mainline-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
@@ -32,7 +32,30 @@ use crate::protocol::{
/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol.
/// `content` is the content to announce.
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
pub async fn announce(
pub async fn announce_quinn(
connection: iroh_quinn::Connection,
signed_announce: SignedAnnounce,
) -> anyhow::Result<()> {
let (mut send, mut recv) = connection.open_bi().await?;
tracing::debug!("opened bi stream");
let request = Request::Announce(signed_announce);
let request = postcard::to_stdvec(&request)?;
tracing::debug!("sending announce");
send.write_all(&request).await?;
send.finish()?;
let _response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
Ok(())
}

/// Announce to a tracker.
///
/// You can only announce content you yourself claim to have, to avoid spamming other nodes.
///
/// `endpoint` is the iroh endpoint to use for announcing.
/// `tracker` is the node id of the tracker to announce to. It must understand the [TRACKER_ALPN] protocol.
/// `content` is the content to announce.
/// `kind` is the kind of the announcement. We can claim to have the complete data or only some of it.
pub async fn announce_iroh(
connection: iroh::endpoint::Connection,
signed_announce: SignedAnnounce,
) -> anyhow::Result<()> {
@@ -80,7 +103,7 @@ async fn query_socket_one(
args: Query,
) -> anyhow::Result<Vec<SignedAnnounce>> {
let connection = endpoint.connect(addr).await?;
let result = query(connection, args).await?;
let result = query_quinn(connection, args).await?;
Ok(result.hosts)
}

@@ -90,7 +113,7 @@ async fn query_iroh_one(
args: Query,
) -> anyhow::Result<Vec<SignedAnnounce>> {
let connection = endpoint.connect(node_id, ALPN).await?;
let result = query(connection, args).await?;
let result = query_iroh(connection, args).await?;
Ok(result.hosts)
}

@@ -185,9 +208,29 @@ pub fn announce_dht(
}

/// Assume an existing connection to a tracker and query it for peers for some content.
pub async fn query(
pub async fn query_iroh(
connection: iroh::endpoint::Connection,
args: Query,
) -> anyhow::Result<QueryResponse> {
tracing::info!("connected to {:?}", connection.remote_node_id()?);
let (mut send, mut recv) = connection.open_bi().await?;
tracing::info!("opened bi stream");
let request = Request::Query(args);
let request = postcard::to_stdvec(&request)?;
tracing::info!("sending query");
send.write_all(&request).await?;
send.finish()?;
let response = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
let response = postcard::from_bytes::<Response>(&response)?;
Ok(match response {
Response::QueryResponse(response) => response,
})
}

/// Assume an existing connection to a tracker and query it for peers for some content.
pub async fn query_quinn(
connection: iroh_quinn::Connection,
args: Query,
) -> anyhow::Result<QueryResponse> {
tracing::info!("connected to {:?}", connection.remote_address());
let (mut send, mut recv) = connection.open_bi().await?;
@@ -283,14 +326,23 @@ pub async fn connect(
tracker: &TrackerId,
local_ipv4_addr: SocketAddrV4,
local_ipv6_addr: SocketAddrV6,
) -> anyhow::Result<iroh::endpoint::Connection> {
) -> anyhow::Result<Connection> {
match tracker {
TrackerId::Quinn(tracker) => connect_socket(*tracker, local_ipv4_addr.into()).await,
TrackerId::Iroh(tracker) => connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await,
TrackerId::Quinn(tracker) => Ok(Connection::Quinn(
connect_socket(*tracker, local_ipv4_addr.into()).await?,
)),
TrackerId::Iroh(tracker) => Ok(Connection::Iroh(
connect_iroh(*tracker, local_ipv4_addr, local_ipv6_addr).await?,
)),
TrackerId::Udp(_) => anyhow::bail!("can not connect to udp tracker"),
}
}

pub enum Connection {
Iroh(iroh::endpoint::Connection),
Quinn(iroh_quinn::Connection),
}

/// Create a iroh endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol.
async fn connect_iroh(
tracker: NodeId,
@@ -307,13 +359,13 @@ async fn connect_iroh(
Ok(connection)
}

/// Create a quinn endpoint and connect to a tracker using the [crate::protocol::ALPN] protocol.
/// Create a quinn endpoint and connect to a tracker using the [crate] protocol.
async fn connect_socket(
tracker: SocketAddr,
local_addr: SocketAddr,
) -> anyhow::Result<iroh::endpoint::Connection> {
) -> anyhow::Result<iroh_quinn::Connection> {
let endpoint = create_quinn_client(local_addr, vec![ALPN.to_vec()], false)?;
tracing::info!("trying to connect to tracker at {:?}", tracker);
tracing::info!("trying t?o )connect to tracker at {:?}", tracker);
let connection = endpoint.connect(tracker, "localhost")?.await?;
Ok(connection)
}
3 changes: 2 additions & 1 deletion content-discovery/iroh-mainline-tracker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,11 +23,12 @@ iroh-blobs = { workspace = true }
mainline = { version = "2.0.0", features = ["async"] }
pkarr = { version = "1.0.1", features = ["async"] }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std"] }
iroh-quinn = "0.12"
iroh-quinn = "0.13"
rand = "0.8"
rcgen = "0.12.0"
redb = "1.5.0"
rustls = "0.21"
rustls-pki-types = "1.11"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.107"
tempfile = "3.4"
2 changes: 1 addition & 1 deletion content-discovery/iroh-mainline-tracker/src/io.rs
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ pub fn log_connection_attempt(
path: &Option<PathBuf>,
host: &NodeId,
t0: Instant,
outcome: &anyhow::Result<iroh_quinn::Connection>,
outcome: &anyhow::Result<iroh::endpoint::Connection>,
) -> anyhow::Result<()> {
if let Some(path) = path {
let now = SystemTime::now()
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ use rand::Rng;
/// This is just reading the size header and then immediately closing the connection.
/// It can be used to check if a peer has any data at all.
pub async fn unverified_size(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
let request = iroh_blobs::protocol::GetRequest::new(
@@ -42,7 +42,7 @@ pub async fn unverified_size(
/// This asks for the last chunk of the blob and validates the response.
/// Note that this does not validate that the peer has all the data.
pub async fn verified_size(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(u64, Stats)> {
tracing::debug!("Getting verified size of {}", hash.to_hex());
@@ -81,7 +81,7 @@ pub async fn verified_size(
}

pub async fn get_hash_seq_and_sizes(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
max_size: u64,
) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
@@ -135,7 +135,7 @@ pub async fn get_hash_seq_and_sizes(

/// Probe for a single chunk of a blob.
pub async fn chunk_probe(
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
chunk: ChunkNum,
) -> anyhow::Result<Stats> {
12 changes: 5 additions & 7 deletions content-discovery/iroh-mainline-tracker/src/main.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use std::{
};

use clap::Parser;
use iroh::{discovery::pkarr::dht::DhtDiscovery, endpoint::get_remote_node_id, Endpoint, NodeId};
use iroh::{discovery::pkarr::dht::DhtDiscovery, Endpoint, NodeId};
use iroh_blobs::util::fs::load_secret_key;
use iroh_mainline_content_discovery::protocol::ALPN;
use iroh_mainline_tracker::{
@@ -24,8 +24,6 @@ use iroh_mainline_tracker::{

use crate::args::Args;

use iroh_mainline_tracker::tracker::get_alpn;

static VERBOSE: AtomicBool = AtomicBool::new(false);

fn set_verbose(verbose: bool) {
@@ -82,11 +80,11 @@ async fn create_endpoint(

/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
pub async fn accept_conn(
mut conn: iroh_quinn::Connecting,
) -> anyhow::Result<(NodeId, String, iroh_quinn::Connection)> {
let alpn = get_alpn(&mut conn).await?;
mut conn: iroh::endpoint::Connecting,
) -> anyhow::Result<(NodeId, String, iroh::endpoint::Connection)> {
let alpn = String::from_utf8(conn.alpn().await?)?;
let conn = conn.await?;
let peer_id = get_remote_node_id(&conn)?;
let peer_id = conn.remote_node_id()?;
Ok((peer_id, alpn, conn))
}

75 changes: 62 additions & 13 deletions content-discovery/iroh-mainline-tracker/src/tracker.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use std::{
};

use bao_tree::ChunkNum;
use iroh::{endpoint::get_remote_node_id, Endpoint, NodeId};
use iroh::{Endpoint, NodeId};
use iroh_blobs::{
get::{fsm::EndBlobNext, Stats},
hashseq::HashSeq,
@@ -876,7 +876,7 @@ impl Tracker {
remote_node_id,
std::str::from_utf8(&alpn)
);
if let Err(cause) = tracker.handle_connection(conn).await {
if let Err(cause) = tracker.handle_iroh_connection(conn).await {
tracing::error!("error handling connection: {}", cause);
}
});
@@ -886,7 +886,7 @@ impl Tracker {

pub async fn quinn_accept_loop(self, endpoint: iroh_quinn::Endpoint) -> std::io::Result<()> {
let local_addr = endpoint.local_addr()?;
println!("quinn listening on {}", local_addr);
println!("quinn listening on {local_addr:?}");
while let Some(incoming) = endpoint.accept().await {
tracing::info!("got incoming");
let connecting = incoming.accept()?;
@@ -899,7 +899,7 @@ impl Tracker {
};
// if we were supporting multiple protocols, we'd need to check the ALPN here.
tracing::info!("got connection from {} {}", remote_node_id, alpn);
if let Err(cause) = tracker.handle_connection(conn).await {
if let Err(cause) = tracker.handle_quinn_connection(conn).await {
tracing::error!("error handling connection: {}", cause);
}
});
@@ -947,7 +947,7 @@ impl Tracker {
}

/// Handle a single incoming connection on the tracker ALPN.
pub async fn handle_connection(
pub async fn handle_quinn_connection(
&self,
connection: iroh_quinn::Connection,
) -> anyhow::Result<()> {
@@ -975,6 +975,35 @@ impl Tracker {
Ok(())
}

/// Handle a single incoming connection on the tracker ALPN.
pub async fn handle_iroh_connection(
&self,
connection: iroh::endpoint::Connection,
) -> anyhow::Result<()> {
tracing::debug!("calling accept_bi");
let (mut send, mut recv) = connection.accept_bi().await?;
tracing::debug!("got bi stream");
let request = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
let request = postcard::from_bytes::<Request>(&request)?;
match request {
Request::Announce(announce) => {
tracing::debug!("got announce: {:?}", announce);
self.handle_announce(announce).await?;
send.finish()?;
}

Request::Query(query) => {
tracing::debug!("handle query: {:?}", query);
let response = self.handle_query(query).await?;
let response = Response::QueryResponse(response);
let response = postcard::to_stdvec(&response)?;
send.write_all(&response).await?;
send.finish()?;
}
}
Ok(())
}

async fn get_size(&self, hash: Hash) -> anyhow::Result<Option<u64>> {
let (tx, rx) = oneshot::channel();
self.0
@@ -1017,7 +1046,7 @@ impl Tracker {

async fn get_or_insert_size(
&self,
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<u64> {
let size_opt = self.get_size(*hash).await?;
@@ -1034,7 +1063,7 @@ impl Tracker {

async fn get_or_insert_sizes(
&self,
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
hash: &Hash,
) -> anyhow::Result<(HashSeq, Arc<[u64]>)> {
let sizes = self.get_sizes(*hash).await?;
@@ -1052,7 +1081,7 @@ impl Tracker {

async fn probe(
&self,
connection: &iroh_quinn::Connection,
connection: &iroh::endpoint::Connection,
host: &NodeId,
content: &HashAndFormat,
probe_kind: ProbeKind,
@@ -1245,8 +1274,8 @@ async fn accept_conn(
) -> anyhow::Result<(NodeId, String, iroh_quinn::Connection)> {
let alpn = get_alpn(&mut conn).await?;
let conn = conn.await?;
let peer_id = get_remote_node_id(&conn)?;
Ok((peer_id, alpn, conn))
let node_id = get_remote_node_id(&conn)?;
Ok((node_id, alpn, conn))
}

/// Extract the ALPN protocol from the peer's TLS certificate.
@@ -1261,12 +1290,32 @@ pub async fn get_alpn(connecting: &mut iroh_quinn::Connecting) -> anyhow::Result
}
}

pub fn get_remote_node_id(connection: &iroh_quinn::Connection) -> anyhow::Result<iroh::NodeId> {
let data = connection.peer_identity();
match data {
None => anyhow::bail!("no peer certificate found"),
Some(data) => match data.downcast::<Vec<rustls_pki_types::CertificateDer>>() {
Ok(certs) => {
if certs.len() != 1 {
anyhow::bail!(
"expected a single peer certificate, but {} found",
certs.len()
);
}
let cert = tls::certificate::parse(&certs[0])?;
Ok(cert.peer_id())
}
Err(_) => anyhow::bail!("invalid peer certificate"),
},
}
}

/// Accept an incoming connection and extract the client-provided [`NodeId`] and ALPN protocol.
async fn iroh_accept_conn(
mut conn: iroh::endpoint::Connecting,
) -> anyhow::Result<(NodeId, Vec<u8>, iroh_quinn::Connection)> {
) -> anyhow::Result<(NodeId, Vec<u8>, iroh::endpoint::Connection)> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = get_remote_node_id(&conn)?;
Ok((peer_id, alpn, conn))
let node_id = conn.remote_node_id()?;
Ok((node_id, alpn, conn))
}
2 changes: 1 addition & 1 deletion content-discovery/tls/Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ license = "MIT OR Apache-2.0"
iroh-base = { workspace = true }
der = { version = "0.7", features = ["alloc", "derive"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
quinn = { package = "iroh-quinn", version = "0.12.0" }
quinn = { package = "iroh-quinn", version = "0.13.0" }
rand = "0.8.5"
rcgen = "0.13"
ring = "0.17"
4 changes: 2 additions & 2 deletions h3-iroh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,8 +15,8 @@ http-body = { version = "1", optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.5", optional = true }
hyper-util = { version = "0.1", optional = true }
iroh = "0.31"
iroh-base = { version = "0.31", features = ["ticket"] }
iroh = "0.32"
iroh-base = { version = "0.32", features = ["ticket"] }
tokio = { version = "1", features = ["io-util"], default-features = false}
tokio-util = "0.7"
tower = { version = "0.5", optional = true }
4 changes: 2 additions & 2 deletions h3-iroh/examples/server.rs
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ use h3::error::ErrorLevel;
use h3::quic::BidiStream;
use h3::server::RequestStream;
use http::{Request, StatusCode};
use iroh::endpoint::{self, Incoming};
use iroh::endpoint::Incoming;
use iroh_base::ticket::NodeTicket;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
@@ -79,7 +79,7 @@ async fn main() -> Result<()> {

async fn handle_connection(incoming: Incoming, root: Arc<Option<PathBuf>>) -> Result<()> {
let conn = incoming.accept()?.await?;
let remote_node_id = endpoint::get_remote_node_id(&conn)?;
let remote_node_id = conn.remote_node_id()?;
let span = Span::current();
span.record("remote_node_id", remote_node_id.fmt_short());
info!("new connection");
4 changes: 2 additions & 2 deletions h3-iroh/src/lib.rs
Original file line number Diff line number Diff line change
@@ -492,7 +492,7 @@ impl quic::RecvStream for RecvStream {
.as_ref()
.unwrap()
.id()
.0
.index()
.try_into()
.expect("invalid stream id")
}
@@ -639,7 +639,7 @@ where
.as_ref()
.unwrap()
.id()
.0
.index()
.try_into()
.expect("invalid stream id")
}
9 changes: 4 additions & 5 deletions iroh-dag-sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ version = "0.1.0"
edition = "2021"

[dependencies]
iroh-blobs = "0.31"
iroh-gossip = "0.31"
iroh = "0.31"
iroh-base = { version ="0.31", features = ["ticket"] }
iroh-blobs = "0.32"
iroh-gossip = "0.32"
iroh = "0.32"
iroh-base = { version ="0.32", features = ["ticket"] }
iroh-car = "0.5.0"
redb = "2.1.1"
clap = { version = "4.5.7", features = ["derive"] }
@@ -21,7 +21,6 @@ iroh-io = "0.6"
tokio-util = { version = "0.7.11", features = ["rt"] }
bao-tree = "0.13.0"
genawaiter = "0.99.1"
iroh-quinn = "0.12"
bytes = "1.6.0"
hex = "0.4.3"
ron = "0.8.1"
4 changes: 2 additions & 2 deletions iroh-pkarr-naming-system/Cargo.toml
Original file line number Diff line number Diff line change
@@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
anyhow = "1.0.79"
derive_more = "0.99.17"
iroh = "0.31"
iroh-blobs = "0.31"
iroh = "0.32"
iroh-blobs = "0.32"
pkarr = { version = "2.3.1", features = ["async", "dht"] }
tokio = "1.35.1"
tokio-util = "0.7.12"
4 changes: 2 additions & 2 deletions iroh-s3-bao-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ flume = "0.11.0"
futures-lite = "2.3"
hex = "0.4.3"
indicatif = "0.17.7"
iroh = "0.31"
iroh-blobs = "0.31"
iroh = "0.32"
iroh-blobs = "0.32"
iroh-io = { version = "0.6", features = ["x-http"] }
num_cpus = "1.16.0"
rand = "0.8.5"