Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[target.wasm32-unknown-unknown]
runner = "wasm-bindgen-test-runner"
rustflags = ['--cfg', 'getrandom_backend="wasm_js"']
37 changes: 37 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,40 @@ jobs:
- uses: actions/checkout@v5
- run: pip install --user codespell[toml]
- run: codespell --ignore-words-list=ans,atmost,crate,inout,ratatui,ser,stayin,swarmin,worl --skip=CHANGELOG.md

wasm_build:
name: Build & test wasm32
runs-on: ubuntu-latest
env:
RUSTFLAGS: '--cfg getrandom_backend="wasm_js"'
steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Install Node.js
uses: actions/setup-node@v4
with:
node-version: 20

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@stable

- name: Add wasm target
run: rustup target add wasm32-unknown-unknown

- name: Install wasm-tools
uses: bytecodealliance/actions/wasm-tools/setup@v1

- name: Install wasm-pack
uses: taiki-e/install-action@v2
with:
tool: wasm-bindgen,wasm-pack

- name: wasm32 build
run: cargo build --target wasm32-unknown-unknown --no-default-features

# If the Wasm file contains any 'import "env"' declarations, then
# some non-Wasm-compatible code made it into the final code.
- name: Ensure no 'import "env"' in wasm
run: |
! wasm-tools print --skeleton target/wasm32-unknown-unknown/debug/iroh_blobs.wasm | grep 'import "env"'
19 changes: 7 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 22 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ bao-tree = { version = "0.15.1", features = ["experimental-mixed", "tokio_fsm",
bytes = { version = "1", features = ["serde"] }
derive_more = { version = "2.0.1", features = ["from", "try_from", "into", "debug", "display", "deref", "deref_mut"] }
futures-lite = "2.6.0"
quinn = { package = "iroh-quinn", version = "0.14.0" }
quinn = { package = "iroh-quinn", version = "0.14.0", optional = true }
n0-future = "0.3.0"
n0-snafu = "0.2.2"
range-collections = { version = "0.4.6", features = ["serde"] }
smallvec = { version = "1", features = ["serde", "const_new"] }
snafu = "0.8.5"
tokio = { version = "1.43.0", features = ["full"] }
tokio-util = { version = "0.7.13", features = ["full"] }
tokio = { version = "1.43.0", default-features = false, features = ["sync"] }
tracing = "0.1.41"
iroh-io = "0.6.1"
rand = "0.9.2"
Expand All @@ -36,12 +35,12 @@ chrono = "0.4.39"
nested_enum_utils = "0.2.1"
ref-cast = "1.0.24"
arrayvec = "0.7.6"
iroh = "0.94"
iroh = { version = "0.94", default-features = false }
self_cell = "1.1.0"
genawaiter = { version = "0.99.1", features = ["futures03"] }
iroh-base = "0.94"
iroh-tickets = "0.1"
irpc = { version = "0.10.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
irpc = { version = "0.10.0", features = ["spans", "stream", "derive", "varint-util"], default-features = false }
iroh-metrics = { version = "0.36" }
redb = { version = "2.6.3", optional = true }
reflink-copy = { version = "0.1.24", optional = true }
Expand All @@ -64,8 +63,24 @@ iroh = { version = "0.94", features = ["discovery-local-network"]}
async-compression = { version = "0.4.30", features = ["lz4", "tokio"] }
concat_const = "0.2.0"

[build-dependencies]
cfg_aliases = "0.2.1"

[features]
hide-proto-docs = []
metrics = []
default = ["hide-proto-docs", "fs-store"]
fs-store = ["dep:redb", "dep:reflink-copy"]
default = ["hide-proto-docs", "fs-store", "rpc"]
fs-store = ["dep:redb", "dep:reflink-copy", "bao-tree/fs"]
rpc = ["dep:quinn", "irpc/rpc", "irpc/quinn_endpoint_setup"]

[[example]]
name = "expiring-tags"
required-features = ["fs-store"]

[[example]]
name = "random_store"
required-features = ["fs-store"]

[patch.crates-io]
irpc = { git = "https://github.com/n0-computer/irpc", branch = "main" }
bao-tree = { git = "https://github.com/n0-computer/bao-tree", branch = "main" }
9 changes: 9 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use cfg_aliases::cfg_aliases;

fn main() {
// Setup cfg aliases
cfg_aliases! {
// Convenience aliases
wasm_browser: { all(target_family = "wasm", target_os = "unknown") },
}
}
6 changes: 3 additions & 3 deletions examples/expiring-tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> {
}

async fn info_task(store: Store) -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
n0_future::time::sleep(Duration::from_secs(1)).await;
loop {
print_store_info(&store).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
n0_future::time::sleep(Duration::from_secs(5)).await;
}
}

async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> {
loop {
delete_expired_tags(&store, prefix, false).await?;
tokio::time::sleep(Duration::from_secs(5)).await;
n0_future::time::sleep(Duration::from_secs(5)).await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ fn throttle(delay_ms: u64) -> EventSender {
);
// we could compute the delay from the size of the data to have a fixed rate.
// but the size is almost always 16 KiB (16 chunks).
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
n0_future::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
msg.tx.send(Ok(())).await.ok();
});
}
Expand Down
16 changes: 11 additions & 5 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
//!
//! You can also [`connect`](Store::connect) to a remote store that is listening
//! to rpc requests.
use std::{io, net::SocketAddr, ops::Deref};
use std::{io, ops::Deref};

use bao_tree::io::EncodeError;
use iroh::Endpoint;
use irpc::rpc::{listen, RemoteService};
use n0_snafu::SpanTrace;
use nested_enum_utils::common_fields;
use proto::{Request, ShutdownRequest, SyncDbRequest};
use proto::{ShutdownRequest, SyncDbRequest};
use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, IntoError, Snafu};
Expand Down Expand Up @@ -128,6 +127,7 @@ impl From<irpc::Error> for ExportBaoError {
irpc::Error::OneshotRecv(e) => OneshotRecvSnafu.into_error(e),
irpc::Error::Send(e) => SendSnafu.into_error(e),
irpc::Error::Request(e) => RequestSnafu.into_error(e),
#[cfg(feature = "rpc")]
irpc::Error::Write(e) => ExportBaoIoSnafu.into_error(e.into()),
}
}
Expand Down Expand Up @@ -220,6 +220,7 @@ impl From<irpc::channel::mpsc::RecvError> for Error {
}
}

#[cfg(feature = "rpc")]
impl From<irpc::rpc::WriteError> for Error {
fn from(e: irpc::rpc::WriteError) -> Self {
Self::Io(e.into())
Expand Down Expand Up @@ -298,16 +299,21 @@ impl Store {
}

/// Connect to a remote store as a rpc client.
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Self {
#[cfg(feature = "rpc")]
pub fn connect(endpoint: quinn::Endpoint, addr: std::net::SocketAddr) -> Self {
let sender = irpc::Client::quinn(endpoint, addr);
Store::from_sender(sender)
}

/// Listen on a quinn endpoint for incoming rpc connections.
#[cfg(feature = "rpc")]
pub async fn listen(self, endpoint: quinn::Endpoint) {
use irpc::rpc::RemoteService;

use self::proto::Request;
let local = self.client.as_local().unwrap().clone();
let handler = Request::remote_handler(local);
listen::<Request>(endpoint, handler).await
irpc::rpc::listen::<Request>(endpoint, handler).await
}

pub async fn sync_db(&self) -> RequestResult<()> {
Expand Down
9 changes: 4 additions & 5 deletions src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use anyhow::bail;
use genawaiter::sync::Gen;
use iroh::{Endpoint, EndpointId};
use irpc::{channel::mpsc, rpc_requests};
use n0_future::{future, stream, BufferedStreamExt, Stream, StreamExt};
use n0_future::{future, stream, task::JoinSet, BufferedStreamExt, Stream, StreamExt};
use rand::seq::SliceRandom;
use serde::{de::Error, Deserialize, Serialize};
use tokio::task::JoinSet;
use tracing::instrument::Instrument;

use super::Store;
Expand All @@ -31,7 +30,7 @@ pub struct Downloader {
client: irpc::Client<SwarmProtocol>,
}

#[rpc_requests(message = SwarmMsg, alias = "Msg")]
#[rpc_requests(message = SwarmMsg, alias = "Msg", rpc_feature = "rpc")]
#[derive(Debug, Serialize, Deserialize)]
enum SwarmProtocol {
#[rpc(tx = mpsc::Sender<DownloadProgressItem>)]
Expand All @@ -42,7 +41,7 @@ struct DownloaderActor {
store: Store,
pool: ConnectionPool,
tasks: JoinSet<()>,
running: HashSet<tokio::task::Id>,
running: HashSet<n0_future::task::Id>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -342,7 +341,7 @@ impl Downloader {
pub fn new(store: &Store, endpoint: &Endpoint) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel::<SwarmMsg>(32);
let actor = DownloaderActor::new(store.clone(), endpoint.clone());
tokio::spawn(actor.run(rx));
n0_future::task::spawn(actor.run(rx));
Self { client: tx.into() }
}

Expand Down
2 changes: 1 addition & 1 deletion src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl HashSpecific for CreateTagMsg {
}
}

#[rpc_requests(message = Command, alias = "Msg")]
#[rpc_requests(message = Command, alias = "Msg", rpc_feature = "rpc")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
Expand Down
3 changes: 2 additions & 1 deletion src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
//! [iroh]: https://docs.rs/iroh
use std::{
fmt::{self, Debug},
time::{Duration, Instant},
time::Duration,
};

use anyhow::Result;
use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
use fsm::RequestCounters;
use n0_future::time::Instant;
use n0_snafu::SpanTrace;
use nested_enum_utils::common_fields;
use serde::{Deserialize, Serialize};
Expand Down
14 changes: 4 additions & 10 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@
//! Note that while using this API directly is fine, the standard way
//! to provide data is to just register a [`crate::BlobsProtocol`] protocol
//! handler with an [`iroh::Endpoint`](iroh::protocol::Router).
use std::{
fmt::Debug,
future::Future,
io,
time::{Duration, Instant},
};
use std::{fmt::Debug, future::Future, io, time::Duration};

use anyhow::Result;
use bao_tree::ChunkRanges;
use iroh::endpoint::{self, VarInt};
use iroh::endpoint::{self, ConnectionError, VarInt};
use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
use n0_future::StreamExt;
use quinn::ConnectionError;
use n0_future::{time::Instant, StreamExt};
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use tokio::select;
Expand Down Expand Up @@ -309,7 +303,7 @@ pub async fn handle_connection(
while let Ok(pair) = StreamPair::accept(&connection, progress.clone()).await {
let span = debug_span!("stream", stream_id = %pair.stream_id());
let store = store.clone();
tokio::spawn(handle_stream(pair, store).instrument(span));
n0_future::task::spawn(handle_stream(pair, store).instrument(span));
}
progress
.connection_closed(|| ConnectionClosed { connection_id })
Expand Down
Loading
Loading