Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make rs-wnfs work in multithreaded contexts #372

Merged
merged 9 commits into from
Nov 28, 2023
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ That's the public filesystem, the private filesystem, on the other hand, is a bi
```rust
use anyhow::Result;
use chrono::Utc;
use rand::thread_rng;
use rand_chacha::ChaCha12Rng;
use rand_core::SeedableRng;
use wnfs::{
common::MemoryBlockStore,
private::{
Expand All @@ -230,7 +231,7 @@ async fn main() -> Result<()> {
let store = &MemoryBlockStore::default();

// A random number generator.
let rng = &mut thread_rng();
let rng = &mut ChaCha12Rng::from_entropy();

// Create a private forest.
let forest = &mut HamtForest::new_trusted_rc(rng);
Expand Down
8 changes: 5 additions & 3 deletions wnfs-bench/hamt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use criterion::{
Criterion, Throughput,
};
use proptest::{arbitrary::any, collection::vec, test_runner::TestRunner};
use std::{cmp, sync::Arc};
use std::cmp;
use wnfs_common::{
async_encode, decode, libipld::cbor::DagCborCodec, utils::Sampleable, BlockStore, Link,
MemoryBlockStore,
async_encode, decode,
libipld::cbor::DagCborCodec,
utils::{Arc, Sampleable},
BlockStore, Link, MemoryBlockStore,
};
use wnfs_hamt::{
diff, merge,
Expand Down
2 changes: 2 additions & 0 deletions wnfs-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ base64 = { version = "0.21", optional = true }
base64-serde = { version = "0.7", optional = true }
bytes = { version = "1.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
dashmap = "5.5.3"
futures = "0.3"
libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
multihash = "0.18"
once_cell = "1.16"
parking_lot = "0.12"
proptest = { version = "1.1", optional = true }
rand_core = "0.6"
serde = { version = "1.0", features = ["rc"] }
Expand Down
25 changes: 15 additions & 10 deletions wnfs-common/src/async_serialize.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::BlockStore;
use crate::{
utils::{Arc, CondSend, CondSync},
BlockStore,
};
use async_trait::async_trait;
use libipld::{error::SerdeError, serde as ipld_serde, Ipld};
use serde::{Serialize, Serializer};
use std::sync::Arc;

//--------------------------------------------------------------------------------------------------
// Macros
Expand All @@ -11,9 +13,10 @@ use std::sync::Arc;
macro_rules! impl_async_serialize {
( $( $ty:ty $( : < $( $generics:ident ),+ > )? ),+ ) => {
$(
#[async_trait(?Send)]
impl $( < $( $generics ),+ > )? AsyncSerialize for $ty $( where $( $generics: Serialize ),+ )? {
async fn async_serialize<S: Serializer, BS: BlockStore>(
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl $( < $( $generics ),+ > )? AsyncSerialize for $ty $( where $( $generics: Serialize + CondSync ),+ )? {
async fn async_serialize<S: Serializer + CondSend, BS: BlockStore>(
&self,
serializer: S,
_: &BS,
Expand All @@ -38,12 +41,13 @@ macro_rules! impl_async_serialize {
///
/// An example of this is the PublicDirectory which can contain links to other IPLD nodes.
/// These links need to be resolved to Cids during serialization if they aren't already.
#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait AsyncSerialize {
/// Serializes the type.
async fn async_serialize<S, B>(&self, serializer: S, store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer,
S: Serializer + CondSend,
B: BlockStore + ?Sized;

/// Serialize with an IPLD serializer.
Expand All @@ -59,11 +63,12 @@ pub trait AsyncSerialize {
// Implementations
//--------------------------------------------------------------------------------------------------

#[async_trait(?Send)]
impl<T: AsyncSerialize> AsyncSerialize for Arc<T> {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T: AsyncSerialize + CondSync> AsyncSerialize for Arc<T> {
async fn async_serialize<S, B>(&self, serializer: S, store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer,
S: Serializer + CondSend,
B: BlockStore + ?Sized,
{
self.as_ref().async_serialize(serializer, store).await
Expand Down
38 changes: 23 additions & 15 deletions wnfs-common/src/blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{decode, encode, AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE};
use crate::{
decode, encode,
utils::{Arc, CondSend, CondSync},
AsyncSerialize, BlockStoreError, MAX_BLOCK_SIZE,
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -8,8 +12,9 @@ use libipld::{
multihash::{Code, MultihashDigest},
serde as ipld_serde, Cid,
};
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{cell::RefCell, collections::HashMap};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
// Constants
Expand Down Expand Up @@ -44,10 +49,11 @@ pub const CODEC_RAW: u64 = 0x55;
//--------------------------------------------------------------------------------------------------

/// For types that implement block store operations like adding, getting content from the store.
#[async_trait(?Send)]
pub trait BlockStore: Sized {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BlockStore: Sized + CondSync {
async fn get_block(&self, cid: &Cid) -> Result<Bytes>;
async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid>;
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid>;

async fn get_deserializable<V>(&self, cid: &Cid) -> Result<V>
where
Expand All @@ -60,15 +66,15 @@ pub trait BlockStore: Sized {

async fn put_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: Serialize,
V: Serialize + CondSync,
{
let bytes = encode(&ipld_serde::to_ipld(value)?, DagCborCodec)?;
self.put_block(bytes, CODEC_DAG_CBOR).await
}

async fn put_async_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: AsyncSerialize,
V: AsyncSerialize + CondSync,
{
let ipld = value.async_serialize_ipld(self).await?;
let bytes = encode(&ipld, DagCborCodec)?;
Expand Down Expand Up @@ -99,11 +105,12 @@ pub trait BlockStore: Sized {
/// An in-memory block store to simulate IPFS.
///
/// IPFS is basically a glorified HashMap.

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct MemoryBlockStore(
#[serde(serialize_with = "crate::utils::serialize_cid_map")]
#[serde(deserialize_with = "crate::utils::deserialize_cid_map")]
pub(crate) RefCell<HashMap<Cid, Bytes>>,
pub(crate) Arc<Mutex<HashMap<Cid, Bytes>>>,
);

impl MemoryBlockStore {
Expand All @@ -113,13 +120,14 @@ impl MemoryBlockStore {
}
}

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for MemoryBlockStore {
/// Retrieves an array of bytes from the block store with given CID.
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self
.0
.borrow()
.lock()
.get(cid)
.ok_or(BlockStoreError::CIDNotFound(*cid))?
.clone();
Expand All @@ -128,15 +136,15 @@ impl BlockStore for MemoryBlockStore {
}

/// Stores an array of bytes in the block store.
async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid> {
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
// Convert the bytes into a Bytes object
let bytes: Bytes = bytes.into();

// Try to build the CID from the bytes and codec
let cid = self.create_cid(&bytes, codec)?;

// Insert the bytes into the HashMap using the CID as the key
self.0.borrow_mut().insert(cid, bytes);
self.0.lock().insert(cid, bytes);

Ok(cid)
}
Expand All @@ -149,7 +157,7 @@ impl BlockStore for MemoryBlockStore {
/// Tests the retrieval property of a BlockStore-conforming type.
pub async fn bs_retrieval_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + 'static,
T: BlockStore + 'static,
{
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
Expand All @@ -173,7 +181,7 @@ where
/// Tests the duplication of a BlockStore-conforming type.
pub async fn bs_duplication_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + 'static,
T: BlockStore + 'static,
{
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
Expand Down Expand Up @@ -203,7 +211,7 @@ where
/// Tests the serialization of a BlockStore-conforming type.
pub async fn bs_serialization_test<T>(store: &T) -> Result<()>
where
T: BlockStore + Send + Serialize + 'static + for<'de> Deserialize<'de>,
T: BlockStore + Serialize + 'static + for<'de> Deserialize<'de>,
{
// Example objects to insert and remove from the blockstore
let bytes = vec![1, 2, 3, 4, 5];
Expand Down
4 changes: 2 additions & 2 deletions wnfs-common/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{AsyncSerialize, BlockStore};
use crate::{utils::CondSync, AsyncSerialize, BlockStore};
use anyhow::Result;
use libipld::{
codec::{Decode, Encode},
Expand All @@ -24,7 +24,7 @@ where
/// Encodes an async serializable value into DagCbor bytes.
pub async fn async_encode<V, C>(value: &V, store: &impl BlockStore, codec: C) -> Result<Vec<u8>>
where
V: AsyncSerialize,
V: AsyncSerialize + CondSync,
C: Codec,
Ipld: Encode<C>,
{
Expand Down
37 changes: 21 additions & 16 deletions wnfs-common/src/link.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{traits::IpldEq, AsyncSerialize, BlockStore};
use crate::{
traits::IpldEq,
utils::{Arc, CondSync},
AsyncSerialize, BlockStore,
};
use anyhow::Result;
use async_once_cell::OnceCell;
use async_trait::async_trait;
use libipld::Cid;
use serde::de::DeserializeOwned;
use std::{
fmt::{self, Debug, Formatter},
sync::Arc,
};
use std::fmt::{self, Debug, Formatter};

//--------------------------------------------------------------------------------------------------
// Type Definitions
Expand All @@ -34,7 +35,7 @@ pub enum Link<T> {
// Implementations
//--------------------------------------------------------------------------------------------------

impl<T: RemembersCid> Link<T> {
impl<T: RemembersCid + CondSync> Link<T> {
/// Creates a new `Link` that starts out as a Cid.
pub fn from_cid(cid: Cid) -> Self {
Self::Encoded {
Expand Down Expand Up @@ -172,8 +173,9 @@ impl<T: RemembersCid> Link<T> {
}
}

#[async_trait(?Send)]
impl<T: PartialEq + AsyncSerialize + RemembersCid> IpldEq for Link<T> {
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T: PartialEq + AsyncSerialize + RemembersCid + CondSync> IpldEq for Link<T> {
async fn eq(&self, other: &Link<T>, store: &impl BlockStore) -> Result<bool> {
if self == other {
return Ok(true);
Expand Down Expand Up @@ -210,7 +212,7 @@ where
}
}

impl<T: RemembersCid> PartialEq for Link<T>
impl<T: RemembersCid + CondSync> PartialEq for Link<T>
where
T: PartialEq,
{
Expand Down Expand Up @@ -268,7 +270,9 @@ impl<T: RemembersCid> RemembersCid for Arc<T> {

#[cfg(test)]
mod tests {
use crate::{AsyncSerialize, BlockStore, Link, MemoryBlockStore, RemembersCid};
use crate::{
utils::CondSend, AsyncSerialize, BlockStore, Link, MemoryBlockStore, RemembersCid,
};
use ::serde::{Deserialize, Serialize};
use async_once_cell::OnceCell;
use async_trait::async_trait;
Expand All @@ -282,13 +286,14 @@ mod tests {
persisted_as: OnceCell<Cid>,
}

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl AsyncSerialize for Example {
async fn async_serialize<S: Serializer, BS: BlockStore + ?Sized>(
&self,
serializer: S,
_: &BS,
) -> Result<S::Ok, S::Error> {
async fn async_serialize<S, B>(&self, serializer: S, _store: &B) -> Result<S::Ok, S::Error>
where
S: Serializer + CondSend,
B: BlockStore + ?Sized,
{
self.serialize(serializer)
}
}
Expand Down
2 changes: 1 addition & 1 deletion wnfs-common/src/pathnodes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use crate::utils::Arc;

//--------------------------------------------------------------------------------------------------
// Type Definitions
Expand Down
3 changes: 2 additions & 1 deletion wnfs-common/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use async_trait::async_trait;
//--------------------------------------------------------------------------------------------------

/// Implements deep equality check for two types.
#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait IpldEq {
/// Checks if the two items are deeply equal.
async fn eq(&self, other: &Self, store: &impl BlockStore) -> Result<bool>;
Expand Down
12 changes: 7 additions & 5 deletions wnfs-common/src/utils/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::Arc;
use crate::HashOutput;
use anyhow::Result;
use bytes::Bytes;
use futures::{AsyncRead, AsyncReadExt};
use libipld::{Cid, IpldCodec};
use parking_lot::Mutex;
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize, Serializer};
use std::{cell::RefCell, collections::HashMap};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
// Functions
Expand Down Expand Up @@ -84,14 +86,14 @@ pub fn u64_to_ipld(value: u64) -> Result<IpldCodec> {
}

pub(crate) fn serialize_cid_map<S>(
map: &RefCell<HashMap<Cid, Bytes>>,
map: &Arc<Mutex<HashMap<Cid, Bytes>>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let map = map
.borrow()
.lock()
.iter()
.map(|(cid, bytes)| (cid.to_string(), bytes.to_vec()))
.collect::<HashMap<_, _>>();
Expand All @@ -101,7 +103,7 @@ where

pub(crate) fn deserialize_cid_map<'de, D>(
deserializer: D,
) -> Result<RefCell<HashMap<Cid, Bytes>>, D::Error>
) -> Result<Arc<Mutex<HashMap<Cid, Bytes>>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Expand All @@ -114,5 +116,5 @@ where
})
.collect::<Result<_, _>>()?;

Ok(RefCell::new(map))
Ok(Arc::new(Mutex::new(map)))
}
Loading
Loading