Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Introduce Importable/Exportable Storage traits.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsantell committed Sep 13, 2023
1 parent 7155f86 commit 4facbd9
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 36 deletions.
4 changes: 1 addition & 3 deletions rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ impl BenchmarkStorage {
))]
let (storage, storage_name) = {
(
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
storage_path.into(),
))?,
noosphere_storage::SledStorage::new(storage_path.into())?,
"SledDbStorage",
)
};
Expand Down
176 changes: 176 additions & 0 deletions rust/noosphere-storage/src/exportable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use crate::{Storage, Store, SPHERE_DB_STORE_NAMES};
use anyhow::Result;
use async_trait::async_trait;
use noosphere_common::ConditionalSync;
use std::pin::Pin;
use tokio_stream::{Stream, StreamExt};

#[cfg(target_arch = "wasm32")]
pub type IterableStream<'a> = Pin<Box<dyn Stream<Item = Result<(Vec<u8>, Option<Vec<u8>>)>> + 'a>>;
#[cfg(not(target_arch = "wasm32"))]
pub type IterableStream<'a> =
Pin<Box<dyn Stream<Item = Result<(Vec<u8>, Option<Vec<u8>>)>> + Send + 'a>>;

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait Iterable: Store {
fn get_all_entries<'a>(&'a self) -> IterableStream<'a>;
}

/// A blanket implementation for [Storage]s that can be
/// imported via [Importable::import].
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait Exportable
where
Self: Storage,
<Self as Storage>::KeyValueStore: Iterable,
{
async fn get_all_store_names(&self) -> Result<Vec<String>> {
let mut names = vec![];
names.extend(SPHERE_DB_STORE_NAMES.iter().map(|name| String::from(*name)));
Ok(names)
}
}

impl<S> Exportable for S
where
S: Storage,
S::KeyValueStore: Iterable,
{
}

/// A blanket implementation for all [Storage]s to import
/// an [Exportable] storage.
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait Importable<'a, E>
where
Self: Storage,
Self::KeyValueStore: Store,
E: Exportable + ConditionalSync + 'a,
<E as Storage>::KeyValueStore: Iterable,
{
async fn import(&'a mut self, exportable: E) -> Result<()> {
for store_name in exportable.get_all_store_names().await? {
let mut store = self.get_key_value_store(&store_name).await?;
let export_store = exportable.get_key_value_store(&store_name).await?;
{
let mut stream = export_store.get_all_entries();
while let Some((key, value)) = stream.try_next().await? {
if let Some(value) = value {
Store::write(&mut store, key.as_ref(), value.as_ref()).await?;
}
}
}
drop(export_store)
}
Ok(())
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<'a, T, E> Importable<'a, E> for T
where
T: Storage,
T::KeyValueStore: Store,
E: Exportable + ConditionalSync + 'a,
<E as Storage>::KeyValueStore: Iterable,
{
}

#[cfg(test)]
mod test {
use super::*;
use std::path::PathBuf;

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure};
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test_configure!(run_in_browser);

struct TempPath {
#[cfg(target_arch = "wasm32")]
pub path: String,
#[cfg(not(target_arch = "wasm32"))]
pub path: PathBuf,
#[cfg(not(target_arch = "wasm32"))]
#[allow(unused)]
temp_dir: tempfile::TempDir,
}

impl TempPath {
pub fn new() -> Result<Self> {
#[cfg(target_arch = "wasm32")]
let path: String = witty_phrase_generator::WPGen::new()
.with_words(3)
.unwrap()
.into_iter()
.map(|word| String::from(word))
.collect();

#[cfg(not(target_arch = "wasm32"))]
let temp_dir = tempfile::TempDir::new()?;
#[cfg(not(target_arch = "wasm32"))]
let path = temp_dir.path().to_owned();

#[cfg(target_arch = "wasm32")]
let obj = Self { path };
#[cfg(not(target_arch = "wasm32"))]
let obj = Self { temp_dir, path };

Ok(obj)
}
}

/// wasm32: MemoryStorage -> IndexedDbStorage
/// native: SledStorage -> MemoryStorage
/// native+rocks: SledStorage -> RocksDbStorage
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
pub async fn it_can_migrate_to_new_storage_backend() -> Result<()> {
noosphere_core::tracing::initialize_tracing(None);

#[allow(unused)]
let from_temp_path = TempPath::new()?;
#[allow(unused)]
let to_temp_path = TempPath::new()?;

#[cfg(target_arch = "wasm32")]
let from_storage = crate::MemoryStorage::default();
#[cfg(not(target_arch = "wasm32"))]
let from_storage = crate::SledStorage::new(from_temp_path.path.clone())?;

#[cfg(target_arch = "wasm32")]
let mut to_storage = crate::IndexedDbStorage::new(&to_temp_path.path).await?;
#[cfg(all(feature = "rocksdb", not(target_arch = "wasm32")))]
let mut to_storage = crate::RocksDbStorage::new(to_temp_path.path.clone())?;
#[cfg(all(not(feature = "rocksdb"), not(target_arch = "wasm32")))]
let mut to_storage = crate::MemoryStorage::default();

{
let mut store = from_storage.get_key_value_store("links").await?;
for n in 0..10 {
let bytes = vec![n; 10];
store.write(&[n], bytes.as_ref()).await?;
}
}

to_storage.import(from_storage).await?;

{
let store = to_storage.get_key_value_store("links").await?;
for n in 0..10 {
let expected_bytes = vec![n; 10];

if let Some(bytes) = store.read(&[n]).await? {
assert_eq!(bytes, expected_bytes);
} else {
panic!("Expected key `{n}` to exist in new db");
}
}
}
Ok(())
}
}
4 changes: 2 additions & 2 deletions rust/noosphere-storage/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::Storage;
use anyhow::Result;

#[cfg(not(target_arch = "wasm32"))]
use crate::{SledStorage, SledStorageInit, SledStore};
use crate::{SledStorage, SledStore};

#[cfg(not(target_arch = "wasm32"))]
pub async fn make_disposable_store() -> Result<SledStore> {
Expand All @@ -13,7 +13,7 @@ pub async fn make_disposable_store() -> Result<SledStore> {
.into_iter()
.map(String::from)
.collect();
let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?;
let provider = SledStorage::new(temp_dir.join(temp_name))?;
provider.get_block_store("foo").await
}

Expand Down
14 changes: 14 additions & 0 deletions rust/noosphere-storage/src/implementation/memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use async_stream::try_stream;
use async_trait::async_trait;
use cid::Cid;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -131,6 +132,19 @@ impl Store for MemoryStore {
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl crate::Iterable for MemoryStore {
fn get_all_entries<'a>(&'a self) -> crate::IterableStream<'a> {
Box::pin(try_stream! {
let dags = self.entries.lock().await;
for key in dags.keys() {
yield (key.to_owned(), dags.get(key).cloned());
}
})
}
}

#[cfg(feature = "performance")]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
Expand Down
41 changes: 18 additions & 23 deletions rust/noosphere-storage/src/implementation/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,22 @@ use crate::storage::Storage;
use crate::store::Store;

use anyhow::Result;
use async_stream::try_stream;
use async_trait::async_trait;
use sled::{Db, Tree};

pub enum SledStorageInit {
Path(PathBuf),
Db(Db),
}

#[derive(Clone, Debug)]
pub struct SledStorage {
db: Db,
#[allow(unused)]
path: Option<PathBuf>,
path: PathBuf,
}

impl SledStorage {
pub fn new(init: SledStorageInit) -> Result<Self> {
let mut db_path = None;
let db: Db = match init {
SledStorageInit::Path(path) => {
std::fs::create_dir_all(&path)?;
db_path = Some(path.clone().canonicalize()?);
sled::open(path)?
}
SledStorageInit::Db(db) => db,
};
pub fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir_all(&path)?;
let db_path = path.canonicalize()?;
let db = sled::open(&db_path)?;

Ok(SledStorage { db, path: db_path })
}
Expand Down Expand Up @@ -104,16 +94,21 @@ impl Drop for SledStorage {
}
}

impl crate::Iterable for SledStore {
fn get_all_entries<'a>(&'a self) -> crate::IterableStream<'a> {
Box::pin(try_stream! {
for entry in self.db.iter() {
let (key, value) = entry?;
yield (Vec::from(key.as_ref()), Some(Vec::from(value.as_ref())));
}
})
}
}

#[cfg(feature = "performance")]
#[async_trait]
impl crate::Space for SledStorage {
async fn get_space_usage(&self) -> Result<u64> {
if let Some(path) = &self.path {
crate::get_dir_size(path).await
} else {
Err(anyhow::anyhow!(
"Could not calculate storage space, requires usage of a path constructor."
))
}
crate::get_dir_size(&self.path).await
}
}
2 changes: 2 additions & 0 deletions rust/noosphere-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod key_value;

mod db;
mod encoding;
mod exportable;
mod retry;
mod storage;
mod store;
Expand All @@ -22,6 +23,7 @@ pub use crate::ucan::*;
pub use block::*;
pub use db::*;
pub use encoding::*;
pub use exportable::*;
pub use implementation::*;
pub use key_value::*;
pub use retry::*;
Expand Down
12 changes: 4 additions & 8 deletions rust/noosphere/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ impl From<StorageLayout> for PathBuf {
impl StorageLayout {
pub async fn to_storage(&self) -> Result<PrimitiveStorage> {
#[cfg(sled)]
{
noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path(
PathBuf::from(self),
))
}
let storage = noosphere_storage::SledStorage::new(PathBuf::from(self));
#[cfg(rocksdb)]
{
noosphere_storage::RocksDbStorage::new(PathBuf::from(self))
}
let storage = noosphere_storage::RocksDbStorage::new(PathBuf::from(self));

storage
}
}

Expand Down

0 comments on commit 4facbd9

Please sign in to comment.