Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Evolve BlockStore trait #402

Merged
merged 20 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
323e434
feat: Implement `BlockStore` for derefs, too
matheus23 Feb 13, 2024
e818482
refactor: Use new RPITIT feature for `trait BlockStore`
matheus23 Feb 13, 2024
5e44fdc
refactor: Move `trait PrivateForest` to RPITIT
matheus23 Feb 13, 2024
29917b8
refactor: Use RPITIT in `trait PrivateKey` & `trait ExchangeKey`
matheus23 Feb 13, 2024
c0cbc5d
refactor: Completely remove `async_trait`
matheus23 Feb 14, 2024
92dc8cd
chore: Fix warnings, remove unused `IpldEq` trait
matheus23 Feb 14, 2024
80b90f3
fix: Update rug & enable std feature
matheus23 Feb 14, 2024
19ce569
feat: don't require `std` for `rug`, more efficient `to_bytes_be`
matheus23 Feb 14, 2024
4b67fd5
chore: Fix nightly warning
matheus23 Feb 14, 2024
f121a79
refactor: Blanket-impl for `&B` and `Box<B>` instead of `Deref`
matheus23 Feb 14, 2024
73cb792
refactor: Remove serializable things from `BlockStore`
matheus23 Feb 14, 2024
11d7a67
feat: Add `has_block` to `trait BlockStore`
matheus23 Feb 14, 2024
983ae3f
fix: Update accesskey snapshot
matheus23 Feb 15, 2024
e2a2e2f
fix: Implement `has_block` for `ForeignBlockStore`
matheus23 Feb 15, 2024
32e3528
feat: Add `get_block_keyed` to `trait BlockStore`, fix wasm
matheus23 Feb 15, 2024
adc7823
refactor: Move blockstore interface close to extern
matheus23 Feb 15, 2024
df5b83a
refactor: Use precise error type in `trait BlockStore`
matheus23 Feb 15, 2024
3e2ae56
feat: Return correct error in `ForeignBlockStore::get_block`
matheus23 Feb 15, 2024
500a6c2
refactor: Use `libipld_core::serde::to_ipld` instead of dag-cbor
matheus23 Feb 15, 2024
3d5793e
docs: Add comments explaining use of `boxed_fut`
matheus23 Feb 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion wnfs-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ categories = [
license = "Apache-2.0"
readme = "README.md"
edition = "2021"
rust-version = "1.75"
repository = "https://github.com/wnfs-wg/rs-wnfs/tree/main/wnfs-common"
homepage = "https://fission.codes"
authors = ["The Fission Authors"]

[dependencies]
anyhow = "1.0"
async-once-cell = "0.5"
async-trait = "0.1"
base64 = { version = "0.21", optional = true }
base64-serde = { version = "0.7", optional = true }
bytes = { version = "1.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
cid = "0.10"
dashmap = "5.5.3"
futures = "0.3"
libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
Expand Down
214 changes: 150 additions & 64 deletions wnfs-common/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ use crate::{
utils::{Arc, CondSend, CondSync},
BlockStoreError, MAX_BLOCK_SIZE,
};
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::Future;
use libipld::{
cbor::DagCborCodec,
cid::Version,
multihash::{Code, MultihashDigest},
serde as ipld_serde, Cid,
Cid,
};
use parking_lot::Mutex;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

//--------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -45,38 +44,74 @@ pub const CODEC_DAG_PB: u64 = 0x70;
pub const CODEC_RAW: u64 = 0x55;

//--------------------------------------------------------------------------------------------------
// Type Definitions
// Traits
//--------------------------------------------------------------------------------------------------

/// For types that implement block store operations like adding, getting content from the store.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BlockStore: Sized + CondSync {
async fn get_block(&self, cid: &Cid) -> Result<Bytes>;
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid>;

async fn get_deserializable<V>(&self, cid: &Cid) -> Result<V>
where
V: DeserializeOwned,
{
let bytes = self.get_block(cid).await?;
let ipld = decode(bytes.as_ref(), DagCborCodec)?;
Ok(ipld_serde::from_ipld::<V>(ipld)?)
pub trait BlockStore: CondSync {
/// Retrieve a block from this store via its hash (`Cid`).
///
/// If this store can't find the block, it may raise an error like `BlockNotFound`.
fn get_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<Bytes, BlockStoreError>> + CondSend;

/// Put some bytes into the blockstore. These bytes should be encoded with the given codec.
///
/// E.g. `CODEC_RAW` for raw bytes blocks, `CODEC_DAG_CBOR` for dag-cbor, etc.
///
/// This codec will determine the codec encoded in the final `Cid` that's returned.
///
/// If the codec is incorrect, this function won't fail, but any tools that depend on the
/// correctness of the codec may fail. (E.g. tools that follow the links of blocks).
///
/// This funciton allows the blockstore to choose the hashing function itself.
/// The hashing function that was chosen will be readable from the `Cid` metadata.
///
/// If you need control over the concrete hashing function that's used, see `put_block_keyed`.
fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> impl Future<Output = Result<Cid, BlockStoreError>> + CondSend {
let bytes = bytes.into();
async move {
let cid = self.create_cid(&bytes, codec)?;
self.put_block_keyed(cid, bytes).await?;
Ok(cid)
}
}

async fn put_serializable<V>(&self, value: &V) -> Result<Cid>
where
V: Serialize + CondSync,
{
let bytes = encode(&ipld_serde::to_ipld(value)?, DagCborCodec)?;
self.put_block(bytes, CODEC_DAG_CBOR).await
}
/// Put a block of data into this blockstore. The block's CID needs to match the CID given.
///
/// It's up to the blockstore whether to check this fact or assume it when this function is called.
///
/// The default implementation of `put_block` will use this function under the hood and use
/// the correct CID provided by the `create_cid` function.
///
/// This is useful to be able to add blocks that were generated from other
/// clients with differently configured hashing functions to this blockstore.
fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;

/// Find out whether a call to `get_block` would return with a result or not.
///
/// This is useful for data exchange protocols to find out what needs to be fetched
/// externally and what doesn't.
fn has_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<bool, BlockStoreError>> + CondSend;

// This should be the same in all implementations of BlockStore
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid> {
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
// If there are too many bytes, abandon this task
if bytes.len() > MAX_BLOCK_SIZE {
bail!(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()))
return Err(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()));
}

// Compute the Blake3 hash of the bytes
Expand All @@ -93,6 +128,66 @@ pub trait BlockStore: Sized + CondSync {
// Implementations
//--------------------------------------------------------------------------------------------------

impl<B: BlockStore> BlockStore for &B {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}

impl<B: BlockStore> BlockStore for Box<B> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}

/// An in-memory block store to simulate IPFS.
///
/// IPFS is basically a glorified HashMap.
Expand All @@ -111,11 +206,8 @@ impl MemoryBlockStore {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for MemoryBlockStore {
/// Retrieves an array of bytes from the block store with given CID.
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
let bytes = self
.0
.lock()
Expand All @@ -126,18 +218,18 @@ impl BlockStore for MemoryBlockStore {
Ok(bytes)
}

/// Stores an array of bytes in the block store.
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
// Convert the bytes into a Bytes object
let bytes: Bytes = bytes.into();
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.0.lock().insert(cid, bytes.into());

// Try to build the CID from the bytes and codec
let cid = self.create_cid(&bytes, codec)?;

// Insert the bytes into the HashMap using the CID as the key
self.0.lock().insert(cid, bytes);
Ok(())
}

Ok(cid)
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
Ok(self.0.lock().contains_key(cid))
}
}

Expand All @@ -146,21 +238,18 @@ impl BlockStore for MemoryBlockStore {
//--------------------------------------------------------------------------------------------------

/// Tests the retrieval property of a BlockStore-conforming type.
pub async fn bs_retrieval_test<T>(store: &T) -> Result<()>
where
T: BlockStore + 'static,
{
pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = b"hello world".to_vec();

// Insert the objects into the blockstore
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;

// Retrieve the objects from the blockstore
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
let first_loaded = store.get_block(&first_cid).await?;
let second_loaded = store.get_block(&second_cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(first_loaded, first_bytes);
Expand All @@ -170,24 +259,21 @@ where
}

/// Tests the duplication of a BlockStore-conforming type.
pub async fn bs_duplication_test<T>(store: &T) -> Result<()>
where
T: BlockStore + 'static,
{
pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = first_bytes.clone();

// Insert the objects into the blockstore
let first_cid = store.put_serializable(&first_bytes).await?;
let second_cid = store.put_serializable(&second_bytes).await?;
let first_cid = store.put_block(first_bytes.clone(), CODEC_RAW).await?;
let second_cid = store.put_block(second_bytes.clone(), CODEC_RAW).await?;

// Assert that the two vecs produced the same CID
assert_eq!(first_cid, second_cid);

// Retrieve the objects from the blockstore
let first_loaded: Vec<u8> = store.get_deserializable(&first_cid).await?;
let second_loaded: Vec<u8> = store.get_deserializable(&second_cid).await?;
let first_loaded = store.get_block(&first_cid).await?;
let second_loaded = store.get_block(&second_cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(first_loaded, first_bytes);
Expand All @@ -200,22 +286,22 @@ where
}

/// Tests the serialization of a BlockStore-conforming type.
pub async fn bs_serialization_test<T>(store: &T) -> Result<()>
pub async fn bs_serialization_test<T>(store: &T) -> Result<(), BlockStoreError>
where
T: BlockStore + Serialize + 'static + for<'de> Deserialize<'de>,
T: BlockStore + Serialize + for<'de> Deserialize<'de>,
{
// Example objects to insert and remove from the blockstore
let bytes = vec![1, 2, 3, 4, 5];

// Insert the object into the blockstore
let cid = store.put_serializable(&bytes).await?;
let cid = store.put_block(bytes.clone(), CODEC_RAW).await?;

// Serialize the BlockStore
let serial_store: Vec<u8> = encode(&store, DagCborCodec)?;
// Construct a new BlockStore from the Serialized object
let deserial_store: T = decode(&serial_store, DagCborCodec)?;
// Retrieve the object from the blockstore
let loaded: Vec<u8> = deserial_store.get_deserializable(&cid).await?;
let loaded = deserial_store.get_block(&cid).await?;

// Assert that the objects are the same as the ones we inserted
assert_eq!(loaded, bytes);
Expand All @@ -231,9 +317,9 @@ mod tests {
#[async_std::test]
async fn memory_blockstore() -> Result<()> {
let store = &MemoryBlockStore::new();
bs_retrieval_test(store).await?;
bs_duplication_test(store).await?;
bs_serialization_test(store).await?;
bs_retrieval_test::<MemoryBlockStore>(store).await?;
bs_duplication_test::<MemoryBlockStore>(store).await?;
bs_serialization_test::<MemoryBlockStore>(store).await?;
Ok(())
}
}
8 changes: 4 additions & 4 deletions wnfs-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub enum BlockStoreError {
#[error("Cannot find specified CID in block store: {0}")]
CIDNotFound(Cid),

#[error("Cannot find handler for block with CID: {0}")]
BlockHandlerNotFound(Cid),
#[error("CID error during blockstore operation: {0}")]
CIDError(#[from] cid::Error),

#[error("Lock poisoned")]
LockPoisoned,
#[error(transparent)]
Custom(#[from] anyhow::Error),
}
1 change: 0 additions & 1 deletion wnfs-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod link;
mod metadata;
mod pathnodes;
mod storable;
mod traits;
pub mod utils;

pub use blockstore::*;
Expand Down
Loading
Loading