From c3d514418a5d13d4f9740651d078f7ebbb297b35 Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Wed, 13 Sep 2023 12:58:22 -0700 Subject: [PATCH] feat: Introduce storage migrations via ImportStorage/ExportStorage traits, and other trait-based operations. --- .vscode/settings.json | 5 +- rust/noosphere-common/src/unshared.rs | 2 +- rust/noosphere-storage/Cargo.toml | 20 +- rust/noosphere-storage/examples/bench/main.rs | 6 +- rust/noosphere-storage/src/backup.rs | 157 ++++++++++++ rust/noosphere-storage/src/db.rs | 6 + rust/noosphere-storage/src/fs.rs | 68 +++++ rust/noosphere-storage/src/helpers.rs | 34 --- .../src/implementation/indexed_db.rs | 159 ++++++++---- .../src/implementation/memory.rs | 48 ++-- .../src/implementation/rocks_db.rs | 29 ++- .../src/implementation/sled.rs | 65 ++--- .../src/implementation/tracking.rs | 20 +- rust/noosphere-storage/src/lib.rs | 35 ++- rust/noosphere-storage/src/migration.rs | 235 ++++++++++++++++++ rust/noosphere-storage/src/ops.rs | 37 +++ rust/noosphere-storage/src/retry.rs | 4 +- rust/noosphere-storage/src/storage.rs | 2 + rust/noosphere-storage/src/tap.rs | 10 +- rust/noosphere-storage/src/temp.rs | 81 ++++++ rust/noosphere-storage/src/ucan.rs | 1 + rust/noosphere/src/sphere/builder/recover.rs | 14 +- rust/noosphere/src/storage.rs | 12 +- 23 files changed, 866 insertions(+), 184 deletions(-) create mode 100644 rust/noosphere-storage/src/backup.rs create mode 100644 rust/noosphere-storage/src/fs.rs delete mode 100644 rust/noosphere-storage/src/helpers.rs create mode 100644 rust/noosphere-storage/src/migration.rs create mode 100644 rust/noosphere-storage/src/ops.rs create mode 100644 rust/noosphere-storage/src/temp.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index b41d47713..7f295ee8b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,7 +12,6 @@ }, "rust-analyzer.cargo.features": [ "test-kubo", - "helpers", - "performance" + "helpers" ] -} \ No newline at end of file +} diff --git a/rust/noosphere-common/src/unshared.rs b/rust/noosphere-common/src/unshared.rs index 000b9a1c3..6fc14518c 100644 --- a/rust/noosphere-common/src/unshared.rs +++ b/rust/noosphere-common/src/unshared.rs @@ -1,7 +1,7 @@ use crate::ConditionalSend; use futures_util::Stream; -/// NOTE: This type was adapted from https://github.com/Nullus157/async-compression/blob/main/src/unshared.rs +/// NOTE: This type was adapted from /// Original implementation licensed MIT/Apache 2 /// /// Wraps a type and only allows unique borrowing, the main usecase is to wrap a `!Sync` type and diff --git a/rust/noosphere-storage/Cargo.toml b/rust/noosphere-storage/Cargo.toml index d91db9e40..4907b393b 100644 --- a/rust/noosphere-storage/Cargo.toml +++ b/rust/noosphere-storage/Cargo.toml @@ -21,30 +21,28 @@ readme = "README.md" anyhow = { workspace = true } async-trait = "~0.1" async-stream = { workspace = true } -tokio-stream = { workspace = true } +base64 = "=0.21.2" cid = { workspace = true } -noosphere-common = { version = "0.1.0", path = "../noosphere-common" } -tracing = "~0.1" -ucan = { workspace = true } +instant = { version = "0.1.12", features = ["wasm-bindgen"] } libipld-core = { workspace = true } libipld-cbor = { workspace = true } +noosphere-common = { version = "0.1.0", path = "../noosphere-common" } +rand = { workspace = true } serde = { workspace = true } -base64 = "=0.21.2" +tokio-stream = { workspace = true } +tracing = "~0.1" +ucan = { workspace = true } url = { version = "^2" } +witty-phrase-generator = "~0.2" [dev-dependencies] -witty-phrase-generator = "~0.2" wasm-bindgen-test = { workspace = true } -rand = { workspace = true } noosphere-core-dev = { path = "../noosphere-core", features = ["helpers"], package = "noosphere-core" } noosphere-common = { path = "../noosphere-common", features = ["helpers"] } -instant = { version = "0.1.12", features = ["wasm-bindgen"] } - -[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] -tempfile = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] sled = "~0.34" +tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } rocksdb = { version = "0.21.0", optional = true } diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 7bcd796f9..3a7deef2c 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -132,9 +132,7 @@ impl BenchmarkStorage { ))] let (storage, storage_name) = { ( - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - storage_path.into(), - ))?, + noosphere_storage::SledStorage::new(&storage_path)?, "SledDbStorage", ) }; @@ -142,7 +140,7 @@ impl BenchmarkStorage { #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] let (storage, storage_name) = { ( - noosphere_storage::RocksDbStorage::new(storage_path.into())?, + noosphere_storage::RocksDbStorage::new(&storage_path)?, "RocksDbStorage", ) }; diff --git a/rust/noosphere-storage/src/backup.rs b/rust/noosphere-storage/src/backup.rs new file mode 100644 index 000000000..b7fd8215e --- /dev/null +++ b/rust/noosphere-storage/src/backup.rs @@ -0,0 +1,157 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::path::{Path, PathBuf}; + +#[cfg(not(target_arch = "wasm32"))] +use crate::FsBackedStorage; + +#[cfg(not(target_arch = "wasm32"))] +fn create_backup_path>(path: P) -> Result { + use instant::SystemTime; + use rand::Rng; + + let mut path = path.as_ref().to_owned(); + let timestamp = SystemTime::UNIX_EPOCH + .elapsed() + .map_err(|_| anyhow::anyhow!("Could not generate timestamp."))? + .as_secs(); + let nonce = rand::thread_rng().gen::(); + path.set_extension(format!("backup.{}-{}", timestamp, nonce)); + Ok(path) +} + +/// [Storage] that can be backed up and restored. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait BackupStorage: Storage { + /// Backup [Storage] located at `path`, moving to a backup location. + async fn backup + ConditionalSend>(path: P) -> Result; + /// Backup [Storage] at `restore_to`, moving [Storage] from `backup_path` to `restore_to`. + async fn restore + ConditionalSend, Q: AsRef + ConditionalSend>( + backup_path: P, + restore_to: Q, + ) -> Result; + /// List paths to backups for `path`. + async fn list_backups + ConditionalSend>(path: P) -> Result>; +} + +#[cfg(not(target_arch = "wasm32"))] +#[async_trait] +impl BackupStorage for T +where + T: FsBackedStorage, +{ + async fn backup + ConditionalSend>(path: P) -> Result { + let backup_path = create_backup_path(path.as_ref())?; + T::rename(path, &backup_path).await?; + Ok(backup_path) + } + + async fn restore + ConditionalSend, Q: AsRef + ConditionalSend>( + backup_path: P, + restore_to: Q, + ) -> Result { + let restoration_path = restore_to.as_ref().to_owned(); + let original_backup = T::backup(&restoration_path).await?; + T::rename(backup_path, &restoration_path).await?; + Ok(original_backup) + } + + async fn list_backups + ConditionalSend>(path: P) -> Result> { + let mut backups = vec![]; + let matcher = format!( + "{}.backup.", + path.as_ref() + .file_name() + .ok_or_else(|| anyhow::anyhow!("Could not stringify path."))? + .to_str() + .ok_or_else(|| anyhow::anyhow!("Could not stringify path."))? + ); + let parent_dir = path + .as_ref() + .parent() + .ok_or_else(|| anyhow::anyhow!("Could not find storage parent directory."))?; + let mut stream = tokio::fs::read_dir(parent_dir).await?; + while let Ok(Some(entry)) = stream.next_entry().await { + if let Ok(file_name) = entry.file_name().into_string() { + if file_name.starts_with(&matcher) { + backups.push(entry.path()); + } + } + } + Ok(backups) + } +} + +#[cfg(all(not(target_arch = "wasm32"), test))] +mod test { + use crate::{OpenStorage, PreferredPlatformStorage, Store}; + + use super::*; + + #[tokio::test] + pub async fn it_can_backup_storages() -> Result<()> { + noosphere_core_dev::tracing::initialize_tracing(None); + + let temp_dir = tempfile::TempDir::new()?; + let db_source = temp_dir.path().join("db"); + + { + let storage = PreferredPlatformStorage::open(&db_source).await?; + let mut store = storage.get_key_value_store("links").await?; + store.write(b"1", b"1").await?; + } + + let backup_1 = PreferredPlatformStorage::backup(&db_source).await?; + + { + let storage = PreferredPlatformStorage::open(&db_source).await?; + let mut store = storage.get_key_value_store("links").await?; + assert!(store.read(b"1").await?.is_none(), "Backup is a move"); + store.write(b"2", b"2").await?; + } + + let backup_2 = PreferredPlatformStorage::backup(&db_source).await?; + + { + let storage = PreferredPlatformStorage::open(&db_source).await?; + let mut store = storage.get_key_value_store("links").await?; + assert!(store.read(b"1").await?.is_none(), "Backup is a move"); + assert!(store.read(b"2").await?.is_none(), "Backup is a move"); + store.write(b"3", b"3").await?; + } + + let backups = PreferredPlatformStorage::list_backups(&db_source).await?; + assert_eq!(backups.len(), 2); + assert!(backups.contains(&backup_1)); + assert!(backups.contains(&backup_2)); + + let backup_3 = PreferredPlatformStorage::restore(&backup_1, &db_source).await?; + { + let storage = PreferredPlatformStorage::open(&db_source).await?; + let store = storage.get_key_value_store("links").await?; + assert_eq!(store.read(b"1").await?.unwrap(), b"1"); + assert!(store.read(b"2").await?.is_none(), "Backup is a move"); + assert!(store.read(b"3").await?.is_none(), "Backup is a move"); + } + + let backups = PreferredPlatformStorage::list_backups(db_source).await?; + assert_eq!(backups.len(), 2); + assert!( + backups.contains(&backup_3), + "contains backup from restoration." + ); + assert!( + !backups.contains(&backup_1), + "moves backup that was restored." + ); + assert!( + backups.contains(&backup_2), + "contains backups that were untouched." + ); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/db.rs b/rust/noosphere-storage/src/db.rs index e54501032..f377b8aee 100644 --- a/rust/noosphere-storage/src/db.rs +++ b/rust/noosphere-storage/src/db.rs @@ -18,11 +18,16 @@ use crate::{BlockStore, KeyValueStore, MemoryStore, Storage}; use async_stream::try_stream; +/// Key for the block store in a [SphereDb]'s [Storage]. pub const BLOCK_STORE: &str = "blocks"; +/// Key for the link store in a [SphereDb]'s [Storage]. pub const LINK_STORE: &str = "links"; +/// Key for the version store in a [SphereDb]'s [Storage]. pub const VERSION_STORE: &str = "versions"; +/// Key for the metadata store in a [SphereDb]'s [Storage]. pub const METADATA_STORE: &str = "metadata"; +/// All store keys used by [SphereDb]. pub const SPHERE_DB_STORE_NAMES: &[&str] = &[BLOCK_STORE, LINK_STORE, VERSION_STORE, METADATA_STORE]; @@ -46,6 +51,7 @@ impl SphereDb where S: Storage, { + /// Creates a new [SphereDb] using underlying `storage`. pub async fn new(storage: &S) -> Result> { Ok(SphereDb { block_store: storage.get_block_store(BLOCK_STORE).await?, diff --git a/rust/noosphere-storage/src/fs.rs b/rust/noosphere-storage/src/fs.rs new file mode 100644 index 000000000..392d9aa01 --- /dev/null +++ b/rust/noosphere-storage/src/fs.rs @@ -0,0 +1,68 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::path::Path; + +/// [Storage] that is based on a file system. Implementing [FsBackedStorage] +/// provides blanket implementations for other trait-based [Storage] operations. +#[cfg(not(target_arch = "wasm32"))] +#[async_trait] +pub trait FsBackedStorage: Storage + Sized { + /// Deletes the storage located at `path` directory. Returns `Ok(())` if + /// the directory is successfully removed, or if it already does not exist. + async fn delete + ConditionalSend>(path: P) -> Result<()> { + match std::fs::metadata(path.as_ref()) { + Ok(_) => std::fs::remove_dir_all(path.as_ref()).map_err(|e| e.into()), + Err(_) => Ok(()), + } + } + + /// Moves the storage located at `from` to the `to` location. + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()> { + std::fs::rename(from, to).map_err(|e| e.into()) + } +} + +/// [Storage] that is based on a file system. +#[cfg(target_arch = "wasm32")] +#[async_trait(?Send)] +pub trait FsBackedStorage: Storage + Sized { + /// Deletes the storage located at `path` directory. Returns `Ok(())` if + /// the directory is successfully removed, or if it already does not exist. + async fn delete + ConditionalSend>(path: P) -> Result<()>; + + /// Moves the storage located at `from` to the `to` location. + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()>; +} + +#[cfg(not(target_arch = "wasm32"))] +#[async_trait] +impl crate::ops::DeleteStorage for T +where + T: FsBackedStorage, +{ + async fn delete + ConditionalSend>(path: P) -> Result<()> { + ::delete(path).await + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[async_trait] +impl crate::ops::RenameStorage for T +where + T: FsBackedStorage, +{ + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()> { + ::rename(from, to).await + } +} diff --git a/rust/noosphere-storage/src/helpers.rs b/rust/noosphere-storage/src/helpers.rs deleted file mode 100644 index 5c531ef3c..000000000 --- a/rust/noosphere-storage/src/helpers.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::Storage; -use anyhow::Result; - -#[cfg(not(target_arch = "wasm32"))] -use crate::{SledStorage, SledStorageInit, SledStore}; - -#[cfg(not(target_arch = "wasm32"))] -pub async fn make_disposable_store() -> Result { - let temp_dir = std::env::temp_dir(); - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(String::from) - .collect(); - let provider = SledStorage::new(SledStorageInit::Path(temp_dir.join(temp_name)))?; - provider.get_block_store("foo").await -} - -#[cfg(target_arch = "wasm32")] -use crate::{IndexedDbStorage, IndexedDbStore}; - -#[cfg(target_arch = "wasm32")] -pub async fn make_disposable_store() -> Result { - let temp_name: String = witty_phrase_generator::WPGen::new() - .with_words(3) - .unwrap() - .into_iter() - .map(|word| String::from(word)) - .collect(); - - let provider = IndexedDbStorage::new(&temp_name).await?; - provider.get_block_store(crate::db::BLOCK_STORE).await -} diff --git a/rust/noosphere-storage/src/implementation/indexed_db.rs b/rust/noosphere-storage/src/implementation/indexed_db.rs index f80113738..e5934928e 100644 --- a/rust/noosphere-storage/src/implementation/indexed_db.rs +++ b/rust/noosphere-storage/src/implementation/indexed_db.rs @@ -1,17 +1,21 @@ use crate::store::Store; use crate::{db::SPHERE_DB_STORE_NAMES, storage::Storage}; use anyhow::{anyhow, Error, Result}; +use async_stream::try_stream; use async_trait::async_trait; use js_sys::Uint8Array; +use noosphere_common::ConditionalSend; use rexie::{ KeyRange, ObjectStore, Rexie, RexieBuilder, Store as IdbStore, Transaction, TransactionMode, }; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, rc::Rc}; +use std::{fmt::Debug, path::Path, rc::Rc}; use wasm_bindgen::{JsCast, JsValue}; +/// Current SphereDb migration version. pub const INDEXEDDB_STORAGE_VERSION: u32 = 1; +/// An [IndexedDB](https://web.dev/indexeddb/)-backed implementation for `wasm32-unknown-unknown` targets. #[derive(Clone)] pub struct IndexedDbStorage { db: Rc, @@ -25,6 +29,7 @@ impl Debug for IndexedDbStorage { } impl IndexedDbStorage { + /// Open or create a database with key `db_name`. pub async fn new(db_name: &str) -> Result { Self::configure(INDEXEDDB_STORAGE_VERSION, db_name, SPHERE_DB_STORE_NAMES).await } @@ -70,7 +75,12 @@ impl IndexedDbStorage { let db = Rc::into_inner(self.db) .ok_or_else(|| anyhow!("Could not unwrap inner during database clear."))?; db.close(); - Rexie::delete(&name) + Self::delete(&name).await + } + + /// Deletes database with key `db_name` from origin storage. + pub async fn delete(db_name: &str) -> Result<()> { + Rexie::delete(db_name) .await .map_err(|error| anyhow!("{:?}", error)) } @@ -91,7 +101,32 @@ impl Storage for IndexedDbStorage { } } +#[async_trait(?Send)] +impl crate::ops::OpenStorage for IndexedDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + IndexedDbStorage::new( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + +#[async_trait(?Send)] +impl crate::ops::DeleteStorage for IndexedDbStorage { + async fn delete + ConditionalSend>(path: P) -> Result<()> { + Self::delete( + path.as_ref() + .to_str() + .ok_or_else(|| anyhow!("Could not stringify path."))?, + ) + .await + } +} + #[derive(Clone)] +/// A [Store] implementation for [IndexedDbStorage]. pub struct IndexedDbStore { db: Rc, store_name: String, @@ -115,87 +150,104 @@ impl IndexedDbStore { Ok(()) } - fn bytes_to_typed_array(bytes: &[u8]) -> Result { - let array = Uint8Array::new_with_length(bytes.len() as u32); - array.copy_from(&bytes); - Ok(JsValue::from(array)) - } - - async fn contains(key: &JsValue, store: &IdbStore) -> Result { + async fn contains(key: &[u8], store: &IdbStore) -> Result { + let key_js = bytes_to_typed_array(key)?; let count = store .count(Some( - &KeyRange::only(key).map_err(|error| anyhow!("{:?}", error))?, + &KeyRange::only(&key_js).map_err(|error| anyhow!("{:?}", error))?, )) .await .map_err(|error| anyhow!("{:?}", error))?; Ok(count > 0) } - async fn read(key: &JsValue, store: &IdbStore) -> Result>> { + async fn read(key: &[u8], store: &IdbStore) -> Result>> { + let key_js = bytes_to_typed_array(key)?; Ok(match IndexedDbStore::contains(&key, &store).await? { - true => Some( + true => Some(typed_array_to_bytes( store - .get(&key) + .get(&key_js) .await - .map_err(|error| anyhow!("{:?}", error))? - .dyn_into::() - .map_err(|error| anyhow!("{:?}", error))? - .to_vec(), - ), + .map_err(|error| anyhow!("{:?}", error))?, + )?), false => None, }) } + + async fn put(key: &[u8], value: &[u8], store: &IdbStore) -> Result<()> { + let key_js = bytes_to_typed_array(key)?; + let value_js = bytes_to_typed_array(value)?; + store + .put(&value_js, Some(&key_js)) + .await + .map_err(|error| anyhow!("{:?}", error))?; + Ok(()) + } + + async fn delete(key: &[u8], store: &IdbStore) -> Result<()> { + let key_js = bytes_to_typed_array(key)?; + store + .delete(&key_js) + .await + .map_err(|error| anyhow!("{:?}", error))?; + Ok(()) + } } #[async_trait(?Send)] impl Store for IndexedDbStore { async fn read(&self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadOnly)?; - let key = IndexedDbStore::bytes_to_typed_array(key)?; - - let maybe_dag = IndexedDbStore::read(&key, &store).await?; - + let maybe_dag = IndexedDbStore::read(key, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(maybe_dag) } async fn write(&mut self, key: &[u8], bytes: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - - let key = IndexedDbStore::bytes_to_typed_array(key)?; - let value = IndexedDbStore::bytes_to_typed_array(bytes)?; - let old_bytes = IndexedDbStore::read(&key, &store).await?; - - store - .put(&value, Some(&key)) - .await - .map_err(|error| anyhow!("{:?}", error))?; - + IndexedDbStore::put(key, bytes, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(old_bytes) } async fn remove(&mut self, key: &[u8]) -> Result>> { let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; - - let key = IndexedDbStore::bytes_to_typed_array(key)?; - - let old_value = IndexedDbStore::read(&key, &store).await?; - - store - .delete(&key) - .await - .map_err(|error| anyhow!("{:?}", error))?; - + let old_value = IndexedDbStore::read(key, &store).await?; + IndexedDbStore::delete(key, &store).await?; IndexedDbStore::finish_transaction(tx).await?; - Ok(old_value) } } +impl crate::IterableStore for IndexedDbStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + let (store, tx) = self.start_transaction(TransactionMode::ReadWrite)?; + let limit = 100; + let mut offset = 0; + loop { + let results = store.get_all(None, Some(limit), Some(offset), None).await + .map_err(|error| anyhow!("{:?}", error))?; + let count = results.len(); + if count == 0 { + IndexedDbStore::finish_transaction(tx).await?; + break; + } + + offset += count as u32; + + for (key_js, value_js) in results { + yield ( + typed_array_to_bytes(JsValue::from(Uint8Array::new(&key_js)))?, + Some(typed_array_to_bytes(value_js)?) + ); + } + } + }) + } +} + struct JsError(Error); impl From for JsError { @@ -222,7 +274,7 @@ impl From for Error { } #[derive(Debug, Serialize, Deserialize)] -pub struct StorageEstimate { +struct StorageEstimate { pub quota: u64, pub usage: u64, #[serde(rename = "usageDetails")] @@ -230,7 +282,7 @@ pub struct StorageEstimate { } #[derive(Debug, Serialize, Deserialize)] -pub struct UsageDetails { +struct UsageDetails { #[serde(rename = "indexedDB")] pub indexed_db: u64, } @@ -263,3 +315,16 @@ impl crate::Space for IndexedDbStorage { } } } + +fn bytes_to_typed_array(bytes: &[u8]) -> Result { + let array = Uint8Array::new_with_length(bytes.len() as u32); + array.copy_from(&bytes); + Ok(JsValue::from(array)) +} + +fn typed_array_to_bytes(js_value: JsValue) -> Result> { + Ok(js_value + .dyn_into::() + .map_err(|error| anyhow!("{:?}", error))? + .to_vec()) +} diff --git a/rust/noosphere-storage/src/implementation/memory.rs b/rust/noosphere-storage/src/implementation/memory.rs index 8fd92e692..a45ccca54 100644 --- a/rust/noosphere-storage/src/implementation/memory.rs +++ b/rust/noosphere-storage/src/implementation/memory.rs @@ -1,26 +1,21 @@ use anyhow::{anyhow, Result}; +use async_stream::try_stream; use async_trait::async_trait; use cid::Cid; +use noosphere_common::ConditionalSend; use std::{collections::HashMap, sync::Arc}; use tokio::sync::Mutex; use crate::storage::Storage; use crate::store::Store; -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -pub trait StoreContainsCid { - async fn contains_cid(&self, cid: &Cid) -> Result; -} - -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -impl StoreContainsCid for S { - async fn contains_cid(&self, cid: &Cid) -> Result { - Ok(self.read(&cid.to_bytes()).await?.is_some()) - } +async fn contains_cid(store: &S, cid: &Cid) -> Result { + Ok(store.read(&cid.to_bytes()).await?.is_some()) } +/// A memory-backed [Storage] implementation. +/// +/// Useful for small, short-lived storages and testing. #[derive(Default, Clone, Debug)] pub struct MemoryStorage { stores: Arc>>, @@ -57,12 +52,23 @@ impl Storage for MemoryStorage { } } +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::ops::OpenStorage for MemoryStorage { + async fn open + ConditionalSend>(_: P) -> Result { + Ok(MemoryStorage::default()) + } +} + +/// [Store] implementation for [MemoryStorage]. #[derive(Clone, Default, Debug)] pub struct MemoryStore { + /// Underlying key-value store. pub entries: Arc, Vec>>>, } impl MemoryStore { + /// Return all [Cid] keys from the store. pub async fn get_stored_cids(&self) -> Vec { self.entries .lock() @@ -75,6 +81,7 @@ impl MemoryStore { .collect() } + /// Returns `Ok` if all entries in this store are replicated by `other`. pub async fn expect_replica_in(&self, other: &S) -> Result<()> { let cids = self.get_stored_cids().await; let mut missing = Vec::new(); @@ -82,7 +89,7 @@ impl MemoryStore { for cid in cids { trace!("Checking for {}", cid); - if !other.contains_cid(&cid).await? { + if !contains_cid(other, &cid).await? { trace!("Not found!"); missing.push(cid); } @@ -101,6 +108,7 @@ impl MemoryStore { Ok(()) } + /// Clones this store, sharing the underlying data. pub async fn fork(&self) -> Self { MemoryStore { entries: Arc::new(Mutex::new(self.entries.lock().await.clone())), @@ -131,7 +139,19 @@ impl Store for MemoryStore { } } -#[cfg(feature = "performance")] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl crate::IterableStore for MemoryStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + let dags = self.entries.lock().await; + for key in dags.keys() { + yield (key.to_owned(), dags.get(key).cloned()); + } + }) + } +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl crate::Space for MemoryStorage { diff --git a/rust/noosphere-storage/src/implementation/rocks_db.rs b/rust/noosphere-storage/src/implementation/rocks_db.rs index cd2dd4bf0..0f74eb3fd 100644 --- a/rust/noosphere-storage/src/implementation/rocks_db.rs +++ b/rust/noosphere-storage/src/implementation/rocks_db.rs @@ -1,6 +1,7 @@ use crate::{storage::Storage, store::Store, SPHERE_DB_STORE_NAMES}; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use noosphere_common::ConditionalSend; use rocksdb::{ColumnFamilyDescriptor, DBWithThreadMode, Options}; use std::{ path::{Path, PathBuf}, @@ -16,11 +17,11 @@ type DbInner = DBWithThreadMode; #[cfg(feature = "rocksdb-multi-thread")] type ColumnType<'a> = Arc>; -/// A RocksDB implementation of [Storage]. +/// A [RocksDB](https://rocksdb.org/) [Storage] implementation. /// /// Caveats: -/// * Values are limited to 4GB(?) [https://github.com/facebook/rocksdb/wiki/Basic-Operations#reads] -/// TODO(#631): Further improvements to the implementation. +/// * [Values are limited to 4GB](https://github.com/facebook/rocksdb/wiki/Basic-Operations#reads)? +/// * TODO(#631): Further improvements to the implementation. #[derive(Clone, Debug)] pub struct RocksDbStorage { db: Arc, @@ -29,9 +30,10 @@ pub struct RocksDbStorage { } impl RocksDbStorage { - pub fn new(path: PathBuf) -> Result { - std::fs::create_dir_all(&path)?; - let canonicalized = path.canonicalize()?; + /// Open or create a database at directory `path`. + pub fn new>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let canonicalized = path.as_ref().canonicalize()?; let db = Arc::new(RocksDbStorage::init_db(canonicalized.clone())?); Ok(RocksDbStorage { db, @@ -51,7 +53,7 @@ impl RocksDbStorage { RocksDbStore::new(self.db.clone(), name.to_owned()) } - /// Configures a databasea at `path` and initializes the expected configurations. + /// Configures a database at `path` and initializes the expected configurations. fn init_db>(path: P) -> Result { let mut cfs: Vec = Vec::with_capacity(SPHERE_DB_STORE_NAMES.len()); @@ -83,6 +85,17 @@ impl Storage for RocksDbStorage { } } +#[async_trait] +impl crate::FsBackedStorage for RocksDbStorage {} + +#[async_trait] +impl crate::OpenStorage for RocksDbStorage { + async fn open + ConditionalSend>(path: P) -> Result { + RocksDbStorage::new(path) + } +} + +/// A [Store] implementation for [RocksDbStorage]. #[derive(Clone)] pub struct RocksDbStore { name: String, @@ -90,7 +103,7 @@ pub struct RocksDbStore { } impl RocksDbStore { - pub fn new(db: Arc, name: String) -> Result { + pub(crate) fn new(db: Arc, name: String) -> Result { Ok(RocksDbStore { db, name }) } diff --git a/rust/noosphere-storage/src/implementation/sled.rs b/rust/noosphere-storage/src/implementation/sled.rs index afab87527..06bb2b53e 100644 --- a/rust/noosphere-storage/src/implementation/sled.rs +++ b/rust/noosphere-storage/src/implementation/sled.rs @@ -1,35 +1,28 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use crate::storage::Storage; use crate::store::Store; use anyhow::Result; +use async_stream::try_stream; use async_trait::async_trait; +use noosphere_common::ConditionalSend; use sled::{Db, Tree}; -pub enum SledStorageInit { - Path(PathBuf), - Db(Db), -} - +/// A [Sled](https://github.com/spacejam/sled) [Storage] implementation. #[derive(Clone, Debug)] pub struct SledStorage { db: Db, #[allow(unused)] - path: Option, + path: PathBuf, } impl SledStorage { - pub fn new(init: SledStorageInit) -> Result { - let mut db_path = None; - let db: Db = match init { - SledStorageInit::Path(path) => { - std::fs::create_dir_all(&path)?; - db_path = Some(path.clone().canonicalize()?); - sled::open(path)? - } - SledStorageInit::Db(db) => db, - }; + /// Open or create a database at directory `path`. + pub fn new>(path: P) -> Result { + std::fs::create_dir_all(path.as_ref())?; + let db_path = path.as_ref().canonicalize()?; + let db = sled::open(&db_path)?; Ok(SledStorage { db, path: db_path }) } @@ -54,13 +47,31 @@ impl Storage for SledStorage { } } +#[async_trait] +impl crate::FsBackedStorage for SledStorage {} + +#[async_trait] +impl crate::OpenStorage for SledStorage { + async fn open + ConditionalSend>(path: P) -> Result { + SledStorage::new(path) + } +} + +#[async_trait] +impl crate::Space for SledStorage { + async fn get_space_usage(&self) -> Result { + self.db.size_on_disk().map_err(|e| e.into()) + } +} + +/// [Store] implementation for [SledStorage]. #[derive(Clone)] pub struct SledStore { db: Tree, } impl SledStore { - pub fn new(db: &Tree) -> Self { + pub(crate) fn new(db: &Tree) -> Self { SledStore { db: db.clone() } } } @@ -98,15 +109,13 @@ impl Store for SledStore { } } -impl Drop for SledStorage { - fn drop(&mut self) { - let _ = self.db.flush(); - } -} - -#[async_trait] -impl crate::Space for SledStorage { - async fn get_space_usage(&self) -> Result { - self.db.size_on_disk().map_err(|e| e.into()) +impl crate::IterableStore for SledStore { + fn get_all_entries(&self) -> std::pin::Pin>> { + Box::pin(try_stream! { + for entry in self.db.iter() { + let (key, value) = entry?; + yield (Vec::from(key.as_ref()), Some(Vec::from(value.as_ref()))); + } + }) } } diff --git a/rust/noosphere-storage/src/implementation/tracking.rs b/rust/noosphere-storage/src/implementation/tracking.rs index ba690d755..d49605ff8 100644 --- a/rust/noosphere-storage/src/implementation/tracking.rs +++ b/rust/noosphere-storage/src/implementation/tracking.rs @@ -5,20 +5,26 @@ use tokio::sync::Mutex; use crate::{store::Store, MemoryStorage, MemoryStore, Storage}; +/// Stats derived from a [TrackingStore]. #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct StoreStats { + /// Number of reads performed. pub reads: usize, + /// Number of writes performed. pub writes: usize, + /// Number of removes performed. pub removes: usize, + /// Number of bytes read. pub bytes_read: usize, + /// Number of bytes written. pub bytes_written: usize, + /// Number of bytes removed. pub bytes_removed: usize, + /// Number of flushes. pub flushes: usize, } -/// This is a store wrapper that tracks I/O. It is inspired by the testing -/// utility originally created for the Forest HAMT implementation. This wrapper -/// is all runtime overhead and should only be used for testing. +/// A [Store] implementation for [TrackingStorage]. #[derive(Debug, Clone)] pub struct TrackingStore { stats: Arc>, @@ -26,10 +32,12 @@ pub struct TrackingStore { } impl TrackingStore { + /// Returns the current [StoreStats] snapshot. pub async fn to_stats(&self) -> StoreStats { self.stats.lock().await.clone() } + /// Create a new [TrackingStore] wrapping a [Store]. pub fn wrap(store: S) -> Self { TrackingStore { store, @@ -75,12 +83,18 @@ impl Store for TrackingStore { } } +/// A [Storage] wrapper that tracks I/O. +/// +/// It is inspired by the testing utility originally created for +/// the Forest HAMT implementation. This wrapper is all runtime +/// overhead and should only be used for testing. #[derive(Clone, Debug)] pub struct TrackingStorage { storage: S, } impl TrackingStorage { + /// Create a new [TrackingStorage] wrapping a [MemoryStorage]. pub fn wrap(other: MemoryStorage) -> Self { TrackingStorage { storage: other } } diff --git a/rust/noosphere-storage/src/lib.rs b/rust/noosphere-storage/src/lib.rs index 17835e0d8..b998d7411 100644 --- a/rust/noosphere-storage/src/lib.rs +++ b/rust/noosphere-storage/src/lib.rs @@ -1,3 +1,5 @@ +#![warn(missing_docs)] + //! This crate contains generic interfaces and concrete implementations to //! support a common API for data persistance in Noosphere on many different //! platforms. Current platforms include native targets (via disk-persisted K/V @@ -10,34 +12,53 @@ mod block; mod implementation; mod key_value; +mod backup; mod db; mod encoding; +mod fs; +mod migration; +mod ops; mod retry; mod storage; mod store; mod tap; +mod temp; mod ucan; pub use crate::ucan::*; +pub use backup::*; pub use block::*; pub use db::*; pub use encoding::*; +pub use fs::*; pub use implementation::*; pub use key_value::*; +pub use migration::*; +pub use ops::*; pub use retry::*; pub use storage::*; pub use store::*; pub use tap::*; +pub use temp::*; mod space; pub use space::*; #[cfg(test)] -pub mod helpers; +mod inner { + #[cfg(all(not(target_arch = "wasm32"), not(feature = "rocksdb")))] + pub type PreferredPlatformStorage = crate::SledStorage; + #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] + pub type PreferredPlatformStorage = crate::RocksDbStorage; + #[cfg(target_arch = "wasm32")] + pub type PreferredPlatformStorage = crate::IndexedDbStorage; +} +#[cfg(test)] +pub use inner::*; #[cfg(test)] mod tests { - use crate::{block::BlockStore, helpers::make_disposable_store}; + use crate::{block::BlockStore, PreferredPlatformStorage, Storage, TempStorage, BLOCK_STORE}; use libipld_cbor::DagCborCodec; #[cfg(target_arch = "wasm32")] @@ -47,13 +68,15 @@ mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] - async fn it_can_store_and_retrieve_bytes() { - let mut storage = make_disposable_store().await.unwrap(); + async fn it_can_store_and_retrieve_bytes() -> anyhow::Result<()> { + let storage = TempStorage::::new().await?; + let mut store = storage.get_block_store(BLOCK_STORE).await?; let bytes = b"I love every kind of cat"; - let cid = storage.save::(bytes).await.unwrap(); - let retrieved = storage.load::>(&cid).await.unwrap(); + let cid = store.save::(bytes).await?; + let retrieved = store.load::>(&cid).await?; assert_eq!(retrieved, bytes); + Ok(()) } } diff --git a/rust/noosphere-storage/src/migration.rs b/rust/noosphere-storage/src/migration.rs new file mode 100644 index 000000000..0e1d36599 --- /dev/null +++ b/rust/noosphere-storage/src/migration.rs @@ -0,0 +1,235 @@ +use crate::{Storage, Store, SPHERE_DB_STORE_NAMES}; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSync; +use std::pin::Pin; +use tokio_stream::{Stream, StreamExt}; + +#[cfg(not(target_arch = "wasm32"))] +use crate::{BackupStorage, OpenStorage}; + +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(not(target_arch = "wasm32"))] +pub type IterableStoreStream<'a> = + dyn Stream, Option>)>> + Send + 'a; +/// An async stream of key/value pairs from an [IterableStore]. +#[cfg(target_arch = "wasm32")] +pub type IterableStoreStream<'a> = dyn Stream, Option>)>> + 'a; + +/// A store that can iterate over all of its entries. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait IterableStore { + /// Retrieve all key/value pairs from this store as an async stream. + fn get_all_entries(&self) -> Pin>>; +} + +/// [ExportStorage] [Storage] can be imported by an [ImportStorage]. A [Storage] +/// is [ExportStorage] if its `KeyValueStore` also implements [IterableStore]. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait ExportStorage +where + Self: Storage, + ::KeyValueStore: IterableStore, +{ + /// Returns all active store names in this [Storage]. + async fn get_all_store_names(&self) -> Result> { + let mut names = vec![]; + names.extend(SPHERE_DB_STORE_NAMES.iter().map(|name| String::from(*name))); + Ok(names) + } +} + +impl ExportStorage for S +where + S: Storage, + S::KeyValueStore: IterableStore, +{ +} + +/// A blanket implementation for [Storage]s to import +/// an [ExportStorage] storage. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait ImportStorage<'a, E> +where + Self: Storage, + Self::KeyValueStore: Store, + E: ExportStorage + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, +{ + /// Copy all stores' entries from `exportable` into this [Storage]. + async fn import(&'a mut self, exportable: &E) -> Result<()> { + for store_name in exportable.get_all_store_names().await? { + let mut store = self.get_key_value_store(&store_name).await?; + let export_store = exportable.get_key_value_store(&store_name).await?; + let mut stream = export_store.get_all_entries(); + while let Some((key, value)) = stream.try_next().await? { + if let Some(value) = value { + Store::write(&mut store, key.as_ref(), value.as_ref()).await?; + } + } + } + Ok(()) + } +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +impl<'a, T, E> ImportStorage<'a, E> for T +where + T: Storage, + T::KeyValueStore: Store, + E: ExportStorage + ConditionalSync + 'a, + ::KeyValueStore: IterableStore, +{ +} + +/// Opens the [Storage] at `path` as storage type `S`, creates a new [Storage] +/// of type `T`, copies over all data, and moves the new storage to `path` upon +/// success. +#[cfg(not(target_arch = "wasm32"))] +pub async fn migrate_storage(path: impl AsRef) -> Result +where + for<'a> T: BackupStorage + ImportStorage<'a, S> + OpenStorage, + ::KeyValueStore: Store, + S: BackupStorage + OpenStorage + ConditionalSync, + ::KeyValueStore: IterableStore, +{ + let storage_path = path.as_ref(); + let temp_dir = tempfile::TempDir::new()?; + let temp_path = temp_dir.path(); + { + let mut to_storage = T::open(temp_path).await?; + let from_storage = S::open(storage_path).await?; + to_storage.import(&from_storage).await?; + } + // Note that we use `T: BackupStorage` to restore, which will + // call `T::backup` on `S`. While we ensure that `S: BackupStorage` also, + // and this works for filesystems storages, we may need to rethink backups + // in the context of multiple storage types. + T::restore(temp_path, storage_path).await?; + T::open(storage_path).await +} + +#[cfg(test)] +mod test { + use crate::TempStorage; + + use super::*; + + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + #[cfg(target_arch = "wasm32")] + wasm_bindgen_test_configure!(run_in_browser); + + /// wasm32: IndexedDbStorage -> MemoryStorage + /// native: SledStorage -> MemoryStorage + /// native+rocks: SledStorage -> RocksDbStorage + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] + pub async fn it_can_import_export_storages() -> Result<()> { + noosphere_core_dev::tracing::initialize_tracing(None); + + #[cfg(target_arch = "wasm32")] + type FromStorage = crate::IndexedDbStorage; + #[cfg(not(target_arch = "wasm32"))] + type FromStorage = crate::SledStorage; + + #[cfg(target_arch = "wasm32")] + type ToStorage = crate::MemoryStorage; + #[cfg(all(feature = "rocksdb", not(target_arch = "wasm32")))] + type ToStorage = crate::RocksDbStorage; + #[cfg(all(not(feature = "rocksdb"), not(target_arch = "wasm32")))] + type ToStorage = crate::MemoryStorage; + + let from_storage = TempStorage::::new().await?; + let mut to_storage = TempStorage::::new().await?; + { + let mut store = from_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let bytes = vec![n as u8; 10]; + store.write(slug.as_ref(), bytes.as_ref()).await?; + } + } + + to_storage.import(from_storage.as_ref()).await?; + + { + let store = to_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let expected_bytes = vec![n as u8; 10]; + + if let Some(bytes) = store.read(slug.as_ref()).await? { + assert_eq!(bytes, expected_bytes); + } else { + panic!("Expected key `{n}` to exist in new db"); + } + } + } + Ok(()) + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "rocksdb"))] + #[tokio::test] + pub async fn it_can_migrate_native_dbs() -> Result<()> { + noosphere_core_dev::tracing::initialize_tracing(None); + let temp_dir = tempfile::TempDir::new()?; + let storage_path = temp_dir.path().join("db"); + + { + let from_storage = crate::SledStorage::new(&storage_path)?; + let mut store = from_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let bytes = vec![n as u8; 10]; + store.write(slug.as_ref(), bytes.as_ref()).await?; + } + } + + { + let to_storage: crate::RocksDbStorage = + migrate_storage::(&storage_path).await?; + + let store = to_storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let expected_bytes = vec![n as u8; 10]; + + if let Some(bytes) = store.read(slug.as_ref()).await? { + assert_eq!(bytes, expected_bytes); + } else { + panic!("Expected key `{n}` to exist in new db"); + } + } + } + + // Ensure we can open via the expected path + { + let storage = crate::RocksDbStorage::open(&storage_path).await?; + let store = storage.get_key_value_store("links").await?; + for n in 0..500u32 { + let slug = format!("slug-{}", n); + let expected_bytes = vec![n as u8; 10]; + + if let Some(bytes) = store.read(slug.as_ref()).await? { + assert_eq!(bytes, expected_bytes); + } else { + panic!("Expected key `{n}` to exist in new db"); + } + } + } + + assert_eq!( + crate::RocksDbStorage::list_backups(&storage_path) + .await? + .len(), + 1, + "Backup of old DB created." + ); + Ok(()) + } +} diff --git a/rust/noosphere-storage/src/ops.rs b/rust/noosphere-storage/src/ops.rs new file mode 100644 index 000000000..3c5380de4 --- /dev/null +++ b/rust/noosphere-storage/src/ops.rs @@ -0,0 +1,37 @@ +use crate::storage::Storage; +use anyhow::Result; +use async_trait::async_trait; +use noosphere_common::ConditionalSend; +use std::path::Path; + +#[cfg(doc)] +use crate::FsBackedStorage; + +/// [Storage] that can be opened via [Path] reference. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait OpenStorage: Storage + Sized { + /// Open [Storage] at `path`. + async fn open + ConditionalSend>(path: P) -> Result; +} + +/// [Storage] that can be deleted via [Path] reference. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait DeleteStorage: Storage + Sized { + /// Delete/clear [Storage] at `path`. + async fn delete + ConditionalSend>(path: P) -> Result<()>; +} + +/// [Storage] that can be moved/renamed via [Path] reference. +/// [FsBackedStorage] types get a blanket implementation. +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +pub trait RenameStorage: Storage + Sized { + /// Rename/move [Storage] at `path`. + async fn rename + ConditionalSend, Q: AsRef + ConditionalSend>( + from: P, + to: Q, + ) -> Result<()>; +} diff --git a/rust/noosphere-storage/src/retry.rs b/rust/noosphere-storage/src/retry.rs index 0d1260d85..d52be91db 100644 --- a/rust/noosphere-storage/src/retry.rs +++ b/rust/noosphere-storage/src/retry.rs @@ -54,8 +54,8 @@ impl Backoff { /// will be made to load the block. /// /// Local [BlockStore] implementations won't benefit a lot from this, but -/// network implementations such as [IpfsStore] can be made more reliable with a -/// modest retry policy (and timeouts will help make sure we don't hang +/// network implementations such as [noosphere_ipfs::IpfsStore](https://docs.rs/noosphere-ipfs/latest/noosphere_ipfs/struct.IpfsStore.html) +/// can be made more reliable with a modest retry policy (and timeouts will help make sure we don't hang /// indefinitely waiting for an implementation like Kubo to get its act /// together). #[derive(Clone)] diff --git a/rust/noosphere-storage/src/storage.rs b/rust/noosphere-storage/src/storage.rs index 86a585596..fb9e86d1c 100644 --- a/rust/noosphere-storage/src/storage.rs +++ b/rust/noosphere-storage/src/storage.rs @@ -14,7 +14,9 @@ use std::fmt::Debug; #[cfg_attr(not(target_arch = "wasm32"), async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait Storage: Clone + ConditionalSync + Debug { + /// Type of [BlockStore] used in [Storage::get_block_store]. type BlockStore: BlockStore; + /// Type of [KeyValueStore] used in [Storage::get_key_value_store]. type KeyValueStore: KeyValueStore; /// Get a [BlockStore] where all values stored in it are scoped to the given diff --git a/rust/noosphere-storage/src/tap.rs b/rust/noosphere-storage/src/tap.rs index b93684bfa..cb00f0582 100644 --- a/rust/noosphere-storage/src/tap.rs +++ b/rust/noosphere-storage/src/tap.rs @@ -4,10 +4,10 @@ use async_trait::async_trait; use cid::Cid; use tokio::sync::mpsc::{channel, Receiver, Sender}; -/// Wraps any [BlockStore] and "taps" it by cloning any block successfully -/// retrieved from the store and sending over an MPSC channel. This allows an -/// observer to record all the blocks needed to load arbitrarily deep and -/// complex DAGs into memory without orchestrating a dedicated callback for the +/// Instruments a [BlockStore], sending a copy of each block read to a [Receiver]. +/// +/// This allows an observer to record all the blocks needed to load arbitrarily deep +/// and complex DAGs into memory without orchestrating a dedicated callback for the /// DAG implementations to invoke. /// /// Note that the [Receiver] end of the channel will consider the channel open @@ -29,6 +29,8 @@ impl BlockStoreTap where S: BlockStore, { + /// Wraps a [BlockStore], setting the channel capacity to `capacity`, returning + /// the wrapped store and [Receiver]. pub fn new(store: S, capacity: usize) -> (Self, Receiver<(Cid, Vec)>) { let (tx, rx) = channel(capacity); (BlockStoreTap { store, tx }, rx) diff --git a/rust/noosphere-storage/src/temp.rs b/rust/noosphere-storage/src/temp.rs new file mode 100644 index 000000000..67577823c --- /dev/null +++ b/rust/noosphere-storage/src/temp.rs @@ -0,0 +1,81 @@ +use crate::{ops::OpenStorage, storage::Storage}; +use anyhow::Result; +use std::ops::{Deref, DerefMut}; + +#[cfg(doc)] +use crate::MemoryStorage; + +/// An ephemeral [Storage] that does not persist after dropping. +/// Currently, native builds create a temp dir syncing lifetimes, and web +/// builds use a randomly generated database name. +/// In the future, we may have web builds that use +/// a file-system backed Storage, or native builds that do not use +/// the file-system (currently the case with [MemoryStorage]), where +/// a more complex configuration is needed. Mostly used in tests. +pub struct TempStorage +where + S: Storage + OpenStorage, +{ + inner: S, + #[cfg(not(target_arch = "wasm32"))] + _temp_dir: tempfile::TempDir, +} + +impl TempStorage +where + S: Storage + OpenStorage, +{ + /// Create a new [TempStorage], wrapping a new [Storage] + /// that will be cleared after dropping. + pub async fn new() -> Result { + #[cfg(target_arch = "wasm32")] + let key: String = witty_phrase_generator::WPGen::new() + .with_words(3) + .unwrap() + .into_iter() + .map(|word| String::from(word)) + .collect(); + #[cfg(target_arch = "wasm32")] + let inner = S::open(&key).await?; + #[cfg(target_arch = "wasm32")] + let out = Self { inner }; + + #[cfg(not(target_arch = "wasm32"))] + let _temp_dir = tempfile::TempDir::new()?; + #[cfg(not(target_arch = "wasm32"))] + let inner = S::open(_temp_dir.path()).await?; + #[cfg(not(target_arch = "wasm32"))] + let out = Self { _temp_dir, inner }; + + Ok(out) + } +} + +impl Deref for TempStorage +where + S: Storage + OpenStorage, +{ + type Target = S; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TempStorage +where + S: Storage + OpenStorage, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AsRef for TempStorage +where + S: Storage + OpenStorage, +{ + fn as_ref(&self) -> &S { + &self.inner + } +} diff --git a/rust/noosphere-storage/src/ucan.rs b/rust/noosphere-storage/src/ucan.rs index 0b238f57f..fbca2a781 100644 --- a/rust/noosphere-storage/src/ucan.rs +++ b/rust/noosphere-storage/src/ucan.rs @@ -10,6 +10,7 @@ use ucan::store::{UcanStore as UcanStoreTrait, UcanStoreConditionalSend}; use crate::block::BlockStore; +/// [BlockStore] wrapper implementing [ucan::store::UcanStore]. pub struct UcanStore(pub S); #[cfg_attr(not(target_arch = "wasm32"), async_trait)] diff --git a/rust/noosphere/src/sphere/builder/recover.rs b/rust/noosphere/src/sphere/builder/recover.rs index b7810d87f..db88f349b 100644 --- a/rust/noosphere/src/sphere/builder/recover.rs +++ b/rust/noosphere/src/sphere/builder/recover.rs @@ -41,7 +41,7 @@ pub async fn recover_a_sphere( #[cfg(not(target_arch = "wasm32"))] { use crate::storage::StorageLayout; - use rand::Rng; + use noosphere_storage::BackupStorage; use std::path::PathBuf; let storage_layout: StorageLayout = ( @@ -51,21 +51,13 @@ pub async fn recover_a_sphere( ) .try_into()?; - let database_root: PathBuf = storage_layout.into(); + let database_root = PathBuf::from(storage_layout); debug!(?database_root); if database_root.exists() { - let timestamp = std::time::SystemTime::UNIX_EPOCH.elapsed()?; - let nonce = rand::thread_rng().gen::(); - let backup_id = format!("backup.{}-{}", timestamp.as_nanos(), nonce); - - let mut backup_root = database_root.clone(); - backup_root.set_extension(backup_id); - info!("Backing up existing database..."); - - tokio::fs::rename(database_root, backup_root).await?; + crate::platform::PrimitiveStorage::backup(&database_root).await?; } } diff --git a/rust/noosphere/src/storage.rs b/rust/noosphere/src/storage.rs index e9631eaef..2bcf04fac 100644 --- a/rust/noosphere/src/storage.rs +++ b/rust/noosphere/src/storage.rs @@ -50,15 +50,11 @@ impl From for PathBuf { impl StorageLayout { pub(crate) async fn to_storage(&self) -> Result { #[cfg(sled)] - { - noosphere_storage::SledStorage::new(noosphere_storage::SledStorageInit::Path( - PathBuf::from(self), - )) - } + let storage = noosphere_storage::SledStorage::new(PathBuf::from(self)); #[cfg(rocksdb)] - { - noosphere_storage::RocksDbStorage::new(PathBuf::from(self)) - } + let storage = noosphere_storage::RocksDbStorage::new(PathBuf::from(self)); + + storage } }