Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

example: Multiprovider downloader #62

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
fa8fab4
start downloader engine
rklaehn Jan 29, 2025
23817cc
WIP
rklaehn Jan 29, 2025
090e54b
WIP
rklaehn Jan 29, 2025
8caf034
WIP
rklaehn Jan 30, 2025
e945d1c
WIP
rklaehn Jan 30, 2025
7f3f22f
WIP
rklaehn Jan 30, 2025
9af5698
wip
rklaehn Jan 30, 2025
ebc8454
Add large test
rklaehn Jan 30, 2025
002503f
add planner
rklaehn Jan 31, 2025
f20bc2e
Add fast download start
rklaehn Jan 31, 2025
3885954
DRY
rklaehn Jan 31, 2025
ae5f05c
incremental scenario
rklaehn Jan 31, 2025
e46182e
delayed rebalance
rklaehn Jan 31, 2025
a14ab1f
plug in planner
rklaehn Jan 31, 2025
3bc8b35
newtypes
rklaehn Feb 3, 2025
a94d800
Add and handle PeerDownloadComplete
rklaehn Feb 3, 2025
871b343
WIP
rklaehn Feb 3, 2025
414715f
remame bitmap to bitfield part 1
rklaehn Feb 4, 2025
b39d1f9
rename bitmap to bitfield
rklaehn Feb 4, 2025
94eaf5f
Properly handle dropping downloads
rklaehn Feb 4, 2025
6073d6c
more tests
rklaehn Feb 7, 2025
04712e3
Fix warnings
rklaehn Feb 7, 2025
ff029a6
Add multiprovider example
rklaehn Feb 7, 2025
64d7ea8
Add debug output
rklaehn Feb 7, 2025
730bcf6
Working multiprovider example
rklaehn Feb 7, 2025
5c91fd4
Add way to keep node id stable in multiprovider example provide
rklaehn Feb 10, 2025
5a658f6
Show some progress bars
rklaehn Feb 10, 2025
9ba84c3
Don't use DownloadRequest for internal state
rklaehn Feb 10, 2025
87938af
start moving stuff into separate mods
rklaehn Feb 10, 2025
e3ff68f
Move things into modules
rklaehn Feb 10, 2025
0706098
more moving stuff around
rklaehn Feb 10, 2025
84fbe80
clippy
rklaehn Feb 11, 2025
dbf20de
more clippy
rklaehn Feb 11, 2025
a8015d5
Extend multiprovider example with persistent local storage
rklaehn Feb 11, 2025
d511286
fix off by one bug in the quick hacky valid_ranges
rklaehn Feb 11, 2025
b55267b
Make the test a real test
rklaehn Feb 12, 2025
ea3595a
Unify BitfieldEvent and BitfieldSubscriptionEvent
rklaehn Feb 12, 2025
704c7eb
Unify BitfieldEvents and start adding size (not used yet)
rklaehn Feb 12, 2025
ced9b95
Make use of the size
rklaehn Feb 12, 2025
43b9842
introduce BitfieldUpdate and BitfieldState structs
rklaehn Feb 12, 2025
18d7688
First somewhat working size handling approach
rklaehn Feb 12, 2025
54ec3e3
Even more size refactoring
rklaehn Feb 13, 2025
419f8d8
Move entire multiprovider downloader into an example.
rklaehn Feb 21, 2025
1905089
fmt & clippy
rklaehn Feb 21, 2025
c9146e9
codespell
rklaehn Feb 21, 2025
177dfce
Increase stripe size and add readme
rklaehn Feb 21, 2025
e5d231a
more limitations
rklaehn Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
rklaehn committed Jan 30, 2025
commit 9af5698aadd3d3b8a878b818a87df178468cfab5
36 changes: 29 additions & 7 deletions src/downloader2.rs
Original file line number Diff line number Diff line change
@@ -202,7 +202,9 @@ enum Command {
/// The ranges that were removed
removed: ChunkRanges,
},
/// A chunk was downloaded
/// A chunk was downloaded, but not yet stored
///
/// This can only be used for updating peer stats, not for completing downloads.
ChunksDownloaded {
/// Time when the download was received
time: Duration,
@@ -578,7 +580,10 @@ impl<S: Store, D: ContentDiscovery> DownloaderActor<S, D> {
let store = self.store.clone();
let task = self.local_pool.spawn(move || async move {
info!("Connecting to peer {peer}");
let conn = endpoint.connect(peer, crate::ALPN).await?;
let conn = endpoint.connect(peer, crate::ALPN).await;
info!("Got connection to peer {peer} {}", conn.is_err());
println!("{conn:?}");
let conn = conn?;
let spec = RangeSpec::new(ranges);
let initial = crate::get::fsm::start(conn, GetRequest { hash, ranges: RangeSpecSeq::new([spec]) });
stream_to_db(initial, store, hash, peer, send).await?;
@@ -679,10 +684,26 @@ where
mod tests {
use std::ops::Range;

use crate::net_protocol::Blobs;

use super::*;
use bao_tree::ChunkNum;
use iroh::protocol::Router;
use testresult::TestResult;
use tracing::info;

#[cfg(feature = "rpc")]
async fn make_test_node(data: &[u8]) -> anyhow::Result<(Router, NodeId, Hash)> {
let endpoint = iroh::Endpoint::builder().discovery_n0().bind().await?;
let node_id = endpoint.node_id();
let store = crate::store::mem::Store::new();
let blobs = Blobs::builder(store)
.build(&endpoint);
let hash = blobs.client().add_bytes(bytes::Bytes::copy_from_slice(data)).await?.hash;
let router = iroh::protocol::Router::builder(endpoint)
.accept(crate::ALPN, blobs)
.spawn().await?;
Ok((router, node_id, hash))
}

/// Create chunk ranges from an array of u64 ranges
fn chunk_ranges(ranges: impl IntoIterator<Item = Range<u64>>) -> ChunkRanges {
@@ -734,15 +755,16 @@ mod tests {
}

#[tokio::test]
#[cfg(feature = "rpc")]
async fn downloader_driver_smoke() -> TestResult<()> {
let _ = tracing_subscriber::fmt::try_init();
let peer_a = "1000000000000000000000000000000000000000000000000000000000000000".parse()?;
let hash = "0000000000000000000000000000000000000000000000000000000000000001".parse()?;
let (router, peer, hash) = make_test_node(b"test").await?;
let store = crate::store::mem::Store::new();
let endpoint = iroh::Endpoint::builder().alpns(vec![crate::protocol::ALPN.to_vec()]).bind().await?;
let discovery = StaticContentDiscovery { info: BTreeMap::new(), default: vec![peer_a] };
let endpoint = iroh::Endpoint::builder().alpns(vec![crate::protocol::ALPN.to_vec()]).discovery_n0().bind().await?;
let discovery = StaticContentDiscovery { info: BTreeMap::new(), default: vec![peer] };
let local_pool = LocalPool::single();
let downloader = Downloader::new(endpoint, store, discovery, local_pool);
tokio::time::sleep(Duration::from_secs(2)).await;
let fut = downloader.download(DownloadRequest { hash, ranges: ChunkRanges::all() });
fut.await?;
Ok(())