Skip to content

Commit b29991d

Browse files
feat!: simplify LocalPool handling (#47)
## Description It has been a footgun for a few users, including myself, to make sure to keep around the `LocalPool`. This changes the behaviour to construct a `LocalPool` and keep it around by default. If necessary, in the builder one can provide a custom handle, if there is the need for a custom pool. ## Breaking Changes - remove `net_protocol::Blobs::new`, use the builder instead - remove the `LocalPoolHandle` argument from `net_protocol::Builder::build`
1 parent 5cacccb commit b29991d

12 files changed

+71
-71
lines changed

README.md

+2-9
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,18 @@ Here is a basic example of how to set up `iroh-blobs` with `iroh`:
3232

3333
```rust
3434
use iroh::{protocol::Router, Endpoint};
35-
use iroh_blobs::{net_protocol::Blobs, util::local_pool::LocalPool};
35+
use iroh_blobs::net_protocol::Blobs;
3636

3737
#[tokio::main]
3838
async fn main() -> anyhow::Result<()> {
3939
// create an iroh endpoint that includes the standard discovery mechanisms
4040
// we've built at number0
4141
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
4242

43-
// spawn a local pool with one thread per CPU
44-
// for a single threaded pool use `LocalPool::single`
45-
let local_pool = LocalPool::default();
46-
4743
// create an in-memory blob store
4844
// use `iroh_blobs::net_protocol::Blobs::persistent` to load or create a
4945
// persistent blob store from a path
50-
let blobs = Blobs::memory().build(local_pool.handle(), &endpoint);
46+
let blobs = Blobs::memory().build(&endpoint);
5147

5248
// turn on the "rpc" feature if you need to create blobs and tags clients
5349
let blobs_client = blobs.client();
@@ -60,9 +56,7 @@ async fn main() -> anyhow::Result<()> {
6056
.await?;
6157

6258
// do fun stuff with the blobs protocol!
63-
// make sure not to drop the local_pool before you are finished
6459
router.shutdown().await?;
65-
drop(local_pool);
6660
drop(tags_client);
6761
Ok(())
6862
}
@@ -89,4 +83,3 @@ at your option.
8983
Unless you explicitly state otherwise, any contribution intentionally submitted
9084
for inclusion in this project by you, as defined in the Apache-2.0 license,
9185
shall be dual licensed as above, without any additional terms or conditions.
92-

examples/custom-protocol.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,7 @@ use iroh::{
4848
protocol::{ProtocolHandler, Router},
4949
Endpoint, NodeId,
5050
};
51-
use iroh_blobs::{
52-
net_protocol::Blobs, rpc::client::blobs::MemClient, util::local_pool::LocalPool, Hash,
53-
};
51+
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::MemClient, Hash};
5452
use tracing_subscriber::{prelude::*, EnvFilter};
5553

5654
#[derive(Debug, Parser)]
@@ -89,8 +87,7 @@ async fn main() -> Result<()> {
8987
// Build a in-memory node. For production code, you'd want a persistent node instead usually.
9088
let endpoint = Endpoint::builder().bind().await?;
9189
let builder = Router::builder(endpoint);
92-
let local_pool = LocalPool::default();
93-
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
90+
let blobs = Blobs::memory().build(builder.endpoint());
9491
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
9592
let blobs_client = blobs.client();
9693

examples/hello-world-fetch.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ use std::{env, str::FromStr};
77

88
use anyhow::{bail, ensure, Context, Result};
99
use iroh::{protocol::Router, Endpoint};
10-
use iroh_blobs::{
11-
net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool, BlobFormat,
12-
};
10+
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, BlobFormat};
1311
use tracing_subscriber::{prelude::*, EnvFilter};
1412

1513
// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
@@ -39,8 +37,7 @@ async fn main() -> Result<()> {
3937
// create a new node
4038
let endpoint = Endpoint::builder().bind().await?;
4139
let builder = Router::builder(endpoint);
42-
let local_pool = LocalPool::default();
43-
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
40+
let blobs = Blobs::memory().build(builder.endpoint());
4441
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
4542
let node = builder.spawn().await?;
4643
let blobs_client = blobs.client();

examples/hello-world-provide.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! run this example from the project root:
55
//! $ cargo run --example hello-world-provide
66
use iroh::{protocol::Router, Endpoint};
7-
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket, util::local_pool::LocalPool};
7+
use iroh_blobs::{net_protocol::Blobs, ticket::BlobTicket};
88
use tracing_subscriber::{prelude::*, EnvFilter};
99

1010
// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
@@ -24,8 +24,7 @@ async fn main() -> anyhow::Result<()> {
2424
// create a new node
2525
let endpoint = Endpoint::builder().bind().await?;
2626
let builder = Router::builder(endpoint);
27-
let local_pool = LocalPool::default();
28-
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
27+
let blobs = Blobs::memory().build(builder.endpoint());
2928
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
3029
let blobs_client = blobs.client();
3130
let node = builder.spawn().await?;

examples/local-swarm-discovery.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use iroh::{
1313
discovery::local_swarm_discovery::LocalSwarmDiscovery, protocol::Router, Endpoint, NodeAddr,
1414
PublicKey, RelayMode, SecretKey,
1515
};
16-
use iroh_blobs::{
17-
net_protocol::Blobs, rpc::client::blobs::WrapOption, util::local_pool::LocalPool, Hash,
18-
};
16+
use iroh_blobs::{net_protocol::Blobs, rpc::client::blobs::WrapOption, Hash};
1917
use tracing_subscriber::{prelude::*, EnvFilter};
2018

2119
use self::progress::show_download_progress;
@@ -73,8 +71,7 @@ async fn main() -> anyhow::Result<()> {
7371
.bind()
7472
.await?;
7573
let builder = Router::builder(endpoint);
76-
let local_pool = LocalPool::default();
77-
let blobs = Blobs::memory().build(local_pool.handle(), builder.endpoint());
74+
let blobs = Blobs::memory().build(builder.endpoint());
7875
let builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
7976
let node = builder.spawn().await?;
8077
let blobs_client = blobs.client();

examples/transfer.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,16 @@ use iroh_blobs::{
66
net_protocol::Blobs,
77
rpc::client::blobs::{ReadAtLen, WrapOption},
88
ticket::BlobTicket,
9-
util::{local_pool::LocalPool, SetTagOption},
9+
util::SetTagOption,
1010
};
1111

1212
#[tokio::main]
1313
async fn main() -> Result<()> {
1414
// Create an endpoint, it allows creating and accepting
1515
// connections in the iroh p2p world
1616
let endpoint = Endpoint::builder().discovery_n0().bind().await?;
17-
1817
// We initialize the Blobs protocol in-memory
19-
let local_pool = LocalPool::default();
20-
let blobs = Blobs::memory().build(&local_pool, &endpoint);
18+
let blobs = Blobs::memory().build(&endpoint);
2119

2220
// Now we build a router that accepts blobs connections & routes them
2321
// to the blobs protocol.
@@ -85,7 +83,6 @@ async fn main() -> Result<()> {
8583
// Gracefully shut down the node
8684
println!("Shutting down.");
8785
node.shutdown().await?;
88-
local_pool.shutdown().await;
8986

9087
Ok(())
9188
}

src/net_protocol.rs

+49-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
// TODO: reduce API surface and add documentation
44
#![allow(missing_docs)]
55

6-
use std::{collections::BTreeSet, fmt::Debug, ops::DerefMut, sync::Arc};
6+
use std::{
7+
collections::BTreeSet,
8+
fmt::Debug,
9+
ops::{Deref, DerefMut},
10+
sync::Arc,
11+
};
712

813
use anyhow::{bail, Result};
914
use futures_lite::future::Boxed as BoxedFuture;
@@ -17,7 +22,7 @@ use crate::{
1722
provider::EventSender,
1823
store::GcConfig,
1924
util::{
20-
local_pool::{self, LocalPoolHandle},
25+
local_pool::{self, LocalPool, LocalPoolHandle},
2126
SetTagOption,
2227
},
2328
BlobFormat, Hash,
@@ -41,9 +46,26 @@ impl Default for GcState {
4146
}
4247
}
4348

49+
#[derive(Debug)]
50+
enum Rt {
51+
Handle(LocalPoolHandle),
52+
Owned(LocalPool),
53+
}
54+
55+
impl Deref for Rt {
56+
type Target = LocalPoolHandle;
57+
58+
fn deref(&self) -> &Self::Target {
59+
match self {
60+
Self::Handle(ref handle) => handle,
61+
Self::Owned(ref pool) => pool.handle(),
62+
}
63+
}
64+
}
65+
4466
#[derive(Debug)]
4567
pub(crate) struct BlobsInner<S> {
46-
pub(crate) rt: LocalPoolHandle,
68+
rt: Rt,
4769
pub(crate) store: S,
4870
events: EventSender,
4971
pub(crate) downloader: Downloader,
@@ -53,6 +75,12 @@ pub(crate) struct BlobsInner<S> {
5375
pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
5476
}
5577

78+
impl<S> BlobsInner<S> {
79+
pub(crate) fn rt(&self) -> &LocalPoolHandle {
80+
&self.rt
81+
}
82+
}
83+
5684
#[derive(Debug, Clone)]
5785
pub struct Blobs<S> {
5886
pub(crate) inner: Arc<BlobsInner<S>>,
@@ -119,6 +147,7 @@ impl BlobBatches {
119147
pub struct Builder<S> {
120148
store: S,
121149
events: Option<EventSender>,
150+
rt: Option<LocalPoolHandle>,
122151
}
123152

124153
impl<S: crate::store::Store> Builder<S> {
@@ -128,13 +157,23 @@ impl<S: crate::store::Store> Builder<S> {
128157
self
129158
}
130159

160+
/// Set a custom `LocalPoolHandle` to use.
161+
pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
162+
self.rt = Some(rt);
163+
self
164+
}
165+
131166
/// Build the Blobs protocol handler.
132-
/// You need to provide a local pool handle and an endpoint.
133-
pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Blobs<S> {
167+
/// You need to provide a the endpoint.
168+
pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
169+
let rt = self
170+
.rt
171+
.map(Rt::Handle)
172+
.unwrap_or_else(|| Rt::Owned(LocalPool::default()));
134173
let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
135174
Blobs::new(
136175
self.store,
137-
rt.clone(),
176+
rt,
138177
self.events.unwrap_or_default(),
139178
downloader,
140179
endpoint.clone(),
@@ -148,6 +187,7 @@ impl<S> Blobs<S> {
148187
Builder {
149188
store,
150189
events: None,
190+
rt: None,
151191
}
152192
}
153193
}
@@ -169,9 +209,9 @@ impl Blobs<crate::store::fs::Store> {
169209
}
170210

171211
impl<S: crate::store::Store> Blobs<S> {
172-
pub fn new(
212+
fn new(
173213
store: S,
174-
rt: LocalPoolHandle,
214+
rt: Rt,
175215
events: EventSender,
176216
downloader: Downloader,
177217
endpoint: Endpoint,
@@ -201,7 +241,7 @@ impl<S: crate::store::Store> Blobs<S> {
201241
}
202242

203243
pub fn rt(&self) -> &LocalPoolHandle {
204-
&self.inner.rt
244+
self.inner.rt()
205245
}
206246

207247
pub fn downloader(&self) -> &Downloader {

src/rpc.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl<D: crate::store::Store> Handler<D> {
110110
}
111111

112112
fn rt(&self) -> &LocalPoolHandle {
113-
&self.0.rt
113+
self.0.rt()
114114
}
115115

116116
fn endpoint(&self) -> &Endpoint {

src/rpc/client/blobs.rs

+3-14
Original file line numberDiff line numberDiff line change
@@ -1015,11 +1015,9 @@ mod tests {
10151015

10161016
use super::RpcService;
10171017
use crate::{
1018-
downloader::Downloader,
10191018
net_protocol::Blobs,
10201019
provider::{CustomEventSender, EventSender},
10211020
rpc::client::{blobs, tags},
1022-
util::local_pool::LocalPool,
10231021
};
10241022

10251023
type RpcClient = quic_rpc::RpcClient<RpcService>;
@@ -1029,7 +1027,6 @@ mod tests {
10291027
pub struct Node {
10301028
router: iroh::protocol::Router,
10311029
client: RpcClient,
1032-
_local_pool: LocalPool,
10331030
_rpc_task: AbortOnDropHandle<()>,
10341031
}
10351032

@@ -1067,19 +1064,12 @@ mod tests {
10671064
.unwrap_or_else(|| Endpoint::builder().discovery_n0())
10681065
.bind()
10691066
.await?;
1070-
let local_pool = LocalPool::single();
10711067
let mut router = Router::builder(endpoint.clone());
10721068

10731069
// Setup blobs
1074-
let downloader =
1075-
Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone());
1076-
let blobs = Blobs::new(
1077-
store.clone(),
1078-
local_pool.handle().clone(),
1079-
events,
1080-
downloader,
1081-
endpoint.clone(),
1082-
);
1070+
let blobs = Blobs::builder(store.clone())
1071+
.events(events)
1072+
.build(&endpoint);
10831073
router = router.accept(crate::ALPN, blobs.clone());
10841074

10851075
// Build the router
@@ -1096,7 +1086,6 @@ mod tests {
10961086
router,
10971087
client,
10981088
_rpc_task,
1099-
_local_pool: local_pool,
11001089
})
11011090
}
11021091
}

tests/blobs.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ use std::{
55
};
66

77
use iroh::Endpoint;
8-
use iroh_blobs::{net_protocol::Blobs, store::GcConfig, util::local_pool::LocalPool};
8+
use iroh_blobs::{net_protocol::Blobs, store::GcConfig};
99
use testresult::TestResult;
1010

1111
#[tokio::test]
1212
async fn blobs_gc_smoke() -> TestResult<()> {
13-
let pool = LocalPool::default();
1413
let endpoint = Endpoint::builder().bind().await?;
15-
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
14+
let blobs = Blobs::memory().build(&endpoint);
1615
let client = blobs.client();
1716
blobs.start_gc(GcConfig {
1817
period: Duration::from_millis(1),
@@ -29,9 +28,8 @@ async fn blobs_gc_smoke() -> TestResult<()> {
2928

3029
#[tokio::test]
3130
async fn blobs_gc_protected() -> TestResult<()> {
32-
let pool = LocalPool::default();
3331
let endpoint = Endpoint::builder().bind().await?;
34-
let blobs = Blobs::memory().build(pool.handle(), &endpoint);
32+
let blobs = Blobs::memory().build(&endpoint);
3533
let client = blobs.client();
3634
let h1 = client.add_bytes(b"test".to_vec()).await?;
3735
let protected = Arc::new(Mutex::new(Vec::new()));

0 commit comments

Comments
 (0)