Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: simplify LocalPool handling #47

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,18 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`:

```rust
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool};
use iroh_blobs::net_protocol::Blobs;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// create an iroh endpoint that includes the standard discovery mechanisms
// we've built at number0
let endpoint = Endpoint::builder().discovery_n0().bind().await?;

// spawn a local pool with one thread per CPU
// for a single threaded pool use `LocalPool::single`
let local_pool = LocalPool::default();

// create an in-memory blob store
// use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a
// persistent blob store from a path
let blobs = Blobs::memory().build(local_pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);

// turn on the "rpc" feature if you need to create blobs and tags clients
let blobs_client = blobs.client();
Expand All @@ -60,9 +56,7 @@ async fn main() -> anyhow::Result<()> {
.await?;

// do fun stuff with the blobs protocol!
// make sure not to drop the local_pool before you are finished
router.shutdown().await?;
drop(local_pool);
drop(tags_client);
Ok(())
}
Expand All @@ -89,4 +83,3 @@ at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in this project by you, as defined in the Apache-2.0 license,
shall be dual licensed as above, without any additional terms or conditions.

7 changes: 2 additions & 5 deletions examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ use iroh::{
protocol::{ProtocolHandler, Router},
Endpoint, NodeId,
};
use iroh_blobs::{
net_protocol::Blobs, rpc::client::blobs::MemClient, util::local_pool::LocalPool, Hash,
};
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, Hash};
use tracing_subscriber::{prelude::*, EnvFilter};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -89,8 +87,7 @@ async fn main() -> Result<()> {
// Build a in-memory node. For production code, you'd want a persistent node instead usually.
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let blobs_client = blobs.client();

Expand Down
7 changes: 2 additions & 5 deletions examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::{env, str::FromStr};

use anyhow::{bail, ensure, Context, Result};
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{
net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool, BlobFormat,
};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, BlobFormat};
use tracing_subscriber::{prelude::*, EnvFilter};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
Expand Down Expand Up @@ -39,8 +37,7 @@ async fn main() -> Result<()> {
// create a new node
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let node = builder.spawn().await?;
let blobs_client = blobs.client();
Expand Down
5 changes: 2 additions & 3 deletions examples/hello-world-provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! run this example from the project root:
//! $ cargo run --example hello-world-provide
use iroh::{protocol::Router, Endpoint};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool};
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket};
use tracing_subscriber::{prelude::*, EnvFilter};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
Expand All @@ -24,8 +24,7 @@ async fn main() -> anyhow::Result<()> {
// create a new node
let endpoint = Endpoint::builder().bind().await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let blobs_client = blobs.client();
let node = builder.spawn().await?;
Expand Down
7 changes: 2 additions & 5 deletions examples/local-swarm-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use iroh::{
discovery::local_swarm_discovery::LocalSwarmDiscovery, protocol::Router, Endpoint, NodeAddr,
PublicKey, RelayMode, SecretKey,
};
use iroh_blobs::{
net_protocol::Blobs, rpc::client::blobs::WrapOption, util::local_pool::LocalPool, Hash,
};
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::WrapOption, Hash};
use tracing_subscriber::{prelude::*, EnvFilter};

use self::progress::show_download_progress;
Expand Down Expand Up @@ -73,8 +71,7 @@ async fn main() -> anyhow::Result<()> {
.bind()
.await?;
let builder = Router::builder(endpoint);
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
let blobs = Blobs::memory().build(builder.endpoint());
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let node = builder.spawn().await?;
let blobs_client = blobs.client();
Expand Down
7 changes: 2 additions & 5 deletions examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use iroh_blobs::{
net_protocol::Blobs,
rpc::client::blobs::{ReadAtLen, WrapOption},
ticket::BlobTicket,
util::{local_pool::LocalPool, SetTagOption},
util::SetTagOption,
};

#[tokio::main]
async fn main() -> Result<()> {
// Create an endpoint, it allows creating and accepting
// connections in the iroh p2p world
let endpoint = Endpoint::builder().discovery_n0().bind().await?;

// We initialize the Blobs protocol in-memory
let local_pool = LocalPool::default();
let blobs = Blobs::memory().build(&local_pool, &endpoint);
let blobs = Blobs::memory().build(&endpoint);

// Now we build a router that accepts blobs connections & routes them
// to the blobs protocol.
Expand Down Expand Up @@ -85,7 +83,6 @@ async fn main() -> Result<()> {
// Gracefully shut down the node
println!("Shutting down.");
node.shutdown().await?;
local_pool.shutdown().await;

Ok(())
}
58 changes: 49 additions & 9 deletions src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
// TODO: reduce API surface and add documentation
#![allow(missing_docs)]

use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};
use std::{
collections::BTreeSet,
fmt::Debug,
ops::{Deref, DerefMut},
sync::Arc,
};

use anyhow::{bail, Result};
use futures_lite::future::Boxed as BoxedFuture;
Expand All @@ -17,7 +22,7 @@ use crate::{
provider::EventSender,
store::GcConfig,
util::{
local_pool::{self, LocalPoolHandle},
local_pool::{self, LocalPool, LocalPoolHandle},
SetTagOption,
},
BlobFormat, Hash,
Expand All @@ -41,9 +46,26 @@ impl Default for GcState {
}
}

#[derive(Debug)]
enum Rt {
Handle(LocalPoolHandle),
Owned(LocalPool),
}

impl Deref for Rt {
type Target = LocalPoolHandle;

fn deref(&self) -> &Self::Target {
match self {
Self::Handle(ref handle) => handle,
Self::Owned(ref pool) => pool.handle(),
}
}
}

#[derive(Debug)]
pub(crate) struct BlobsInner<S> {
pub(crate) rt: LocalPoolHandle,
rt: Rt,
pub(crate) store: S,
events: EventSender,
pub(crate) downloader: Downloader,
Expand All @@ -53,6 +75,12 @@ pub(crate) struct BlobsInner<S> {
pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
}

impl<S> BlobsInner<S> {
pub(crate) fn rt(&self) -> &LocalPoolHandle {
&self.rt
}
}

#[derive(Debug, Clone)]
pub struct Blobs<S> {
pub(crate) inner: Arc<BlobsInner<S>>,
Expand Down Expand Up @@ -119,6 +147,7 @@ impl BlobBatches {
pub struct Builder<S> {
store: S,
events: Option<EventSender>,
rt: Option<LocalPoolHandle>,
}

impl<S: crate::store::Store> Builder<S> {
Expand All @@ -128,13 +157,23 @@ impl<S: crate::store::Store> Builder<S> {
self
}

/// Set a custom `LocalPoolHandle` to use.
pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
self.rt = Some(rt);
self
}

/// Build the Blobs protocol handler.
/// You need to provide a local pool handle and an endpoint.
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
/// You need to provide a the endpoint.
pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
let rt = self
.rt
.map(Rt::Handle)
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
Comment on lines +171 to +172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map(Rt::Handle)
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
.map_or_else(|| Rt::Owned(LocalPool::default()), Rt::Handle);

But like... could also be a match statement.

let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
Blobs::new(
self.store,
rt.clone(),
rt,
self.events.unwrap_or_default(),
downloader,
endpoint.clone(),
Expand All @@ -148,6 +187,7 @@ impl<S> Blobs<S> {
Builder {
store,
events: None,
rt: None,
}
}
}
Expand All @@ -169,9 +209,9 @@ impl Blobs<crate::store::fs::Store> {
}

impl<S: crate::store::Store> Blobs<S> {
pub fn new(
fn new(
store: S,
rt: LocalPoolHandle,
rt: Rt,
events: EventSender,
downloader: Downloader,
endpoint: Endpoint,
Expand Down Expand Up @@ -201,7 +241,7 @@ impl<S: crate::store::Store> Blobs<S> {
}

pub fn rt(&self) -> &LocalPoolHandle {
&self.inner.rt
self.inner.rt()
}

pub fn downloader(&self) -> &Downloader {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl<D: crate::store::Store> Handler<D> {
}

fn rt(&self) -> &LocalPoolHandle {
&self.0.rt
self.0.rt()
}

fn endpoint(&self) -> &Endpoint {
Expand Down
17 changes: 3 additions & 14 deletions src/rpc/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,11 +1015,9 @@ mod tests {

use super::RpcService;
use crate::{
downloader::Downloader,
net_protocol::Blobs,
provider::{CustomEventSender, EventSender},
rpc::client::{blobs, tags},
util::local_pool::LocalPool,
};

type RpcClient = quic_rpc::RpcClient<RpcService>;
Expand All @@ -1029,7 +1027,6 @@ mod tests {
pub struct Node {
router: iroh::protocol::Router,
client: RpcClient,
_local_pool: LocalPool,
_rpc_task: AbortOnDropHandle<()>,
}

Expand Down Expand Up @@ -1067,19 +1064,12 @@ mod tests {
.unwrap_or_else(|| Endpoint::builder().discovery_n0())
.bind()
.await?;
let local_pool = LocalPool::single();
let mut router = Router::builder(endpoint.clone());

// Setup blobs
let downloader =
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
let blobs = Blobs::new(
store.clone(),
local_pool.handle().clone(),
events,
downloader,
endpoint.clone(),
);
let blobs = Blobs::builder(store.clone())
.events(events)
.build(&endpoint);
router = router.accept(crate::ALPN, blobs.clone());

// Build the router
Expand All @@ -1096,7 +1086,6 @@ mod tests {
router,
client,
_rpc_task,
_local_pool: local_pool,
})
}
}
Expand Down
8 changes: 3 additions & 5 deletions tests/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use std::{
};

use iroh::Endpoint;
use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool};
use iroh_blobs::{net_protocol::Blobs, store::GcConfig};
use testresult::TestResult;

#[tokio::test]
async fn blobs_gc_smoke() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);
let client = blobs.client();
blobs.start_gc(GcConfig {
period: Duration::from_millis(1),
Expand All @@ -29,9 +28,8 @@ async fn blobs_gc_smoke() -> TestResult<()> {

#[tokio::test]
async fn blobs_gc_protected() -> TestResult<()> {
let pool = LocalPool::default();
let endpoint = Endpoint::builder().bind().await?;
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
let blobs = Blobs::memory().build(&endpoint);
let client = blobs.client();
let h1 = client.add_bytes(b"test".to_vec()).await?;
let protected = Arc::new(Mutex::new(Vec::new()));
Expand Down
Loading
Loading