diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1a1bb913c..9ea49b518 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -175,6 +175,39 @@ jobs: OPENRAFT_STORE_DEFENSIVE: on + # Test external stores that enable different openraft features. + external-stores: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + include: + - store: "stores/rocksstore-v2" + + steps: + - name: Setup | Checkout + uses: actions/checkout@v2 + + + - name: Setup | Toolchain + uses: actions-rs/toolchain@v1.0.6 + with: + toolchain: "nightly" + override: true + + + - name: Unit Tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --manifest-path "${{ matrix.store }}/Cargo.toml" + env: + # Parallel tests block each other and result in timeout. + RUST_TEST_THREADS: 2 + RUST_LOG: debug + RUST_BACKTRACE: full + OPENRAFT_STORE_DEFENSIVE: on # Feature "serde" will be enabled if one of the member carge enables # "serde", such as `memstore`, when building cargo workspace. @@ -261,7 +294,7 @@ jobs: shell: bash run: | cargo clippy --no-deps --workspace --all-targets -- -D warnings - cargo clippy --no-deps --workspace --all-targets --all-features -- -D warnings + cargo clippy --no-deps --workspace --all-targets --features "bt,serde,bench,single-term-leader,compat-07" -- -D warnings - name: Build-doc diff --git a/Cargo.toml b/Cargo.toml index 3606976bd..1d751f488 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,4 +47,8 @@ members = [ "rocksstore", "rocksstore-compat07", "sledstore"] -exclude = ["examples/raft-kv-memstore", "examples/raft-kv-rocksdb"] +exclude = [ + "stores/rocksstore-v2", + "examples/raft-kv-memstore", + "examples/raft-kv-rocksdb", +] diff --git a/guide/src/feature-flags.md b/guide/src/feature-flags.md index 6dff6b50e..75989c6d4 100644 --- a/guide/src/feature-flags.md +++ b/guide/src/feature-flags.md @@ -18,3 +18,9 @@ By default openraft enables no features. compat-07 = ["compat", "single-term-leader", "serde", "dep:or07", "compat-07-testing"] compat-07-testing = ["dep:tempdir", "anyhow", "dep:serde_json"] ``` + +- `storage-v2`: enables `RaftLogStorage` and `RaftStateMachine` as the v2 storage + This is a temporary feature flag, and will be removed in the future, when v2 storage is stable. + This feature disables `Adapter`, which is for v1 storage to be used as v2. + V2 storage separates log store and state machine store so that log IO and state machine IO can be parallelized naturally. + \ No newline at end of file diff --git a/openraft/Cargo.toml b/openraft/Cargo.toml index c11c34df8..9123039a1 100644 --- a/openraft/Cargo.toml +++ b/openraft/Cargo.toml @@ -78,6 +78,11 @@ compat = [] compat-07 = ["compat", "serde", "dep:or07", "compat-07-testing"] compat-07-testing = ["dep:tempfile", "anyhow", "dep:serde_json"] +# Allows an application to implement a custom the v2 storage API. +# See `openraft::storage::v2` for more details. +# V2 API are unstable and may change in the future. +storage-v2 = [] + # default = ["single-term-leader"] [package.metadata.docs.rs] diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 2a99e5895..0c309d8a6 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -1,6 +1,6 @@ //! The Raft storage interface and data types. -pub(crate) mod adapter; +#[cfg(not(feature = "storage-v2"))] pub(crate) mod adapter; mod callback; mod helper; mod log_store_ext; @@ -10,7 +10,7 @@ mod v2; use std::fmt::Debug; use std::ops::RangeBounds; -pub use adapter::Adaptor; +#[cfg(not(feature = "storage-v2"))] pub use adapter::Adaptor; use async_trait::async_trait; pub use helper::StorageHelper; pub use log_store_ext::RaftLogReaderExt; diff --git a/openraft/src/storage/v2.rs b/openraft/src/storage/v2.rs index 7937ef234..2a640bdf3 100644 --- a/openraft/src/storage/v2.rs +++ b/openraft/src/storage/v2.rs @@ -23,6 +23,11 @@ pub(crate) mod sealed { /// Seal [`RaftLogStorage`] and [`RaftStateMachine`]. This is to prevent users from implementing /// them before being stable. pub trait Sealed {} + + /// Implement non-public trait [`Sealed`] for all types so that [`RaftLogStorage`] and + /// [`RaftStateMachine`] can be implemented by 3rd party crates. + #[cfg(feature = "storage-v2")] + impl Sealed for T {} } #[async_trait] diff --git a/stores/README.md b/stores/README.md new file mode 100644 index 000000000..fa002e9b1 --- /dev/null +++ b/stores/README.md @@ -0,0 +1,4 @@ +Storage implementations. + +These crates are not members of the workspace, because they enable different features. +E.g., `rocksstore-v2` enables `storage-v2` feature. \ No newline at end of file diff --git a/stores/rocksstore-v2/Cargo.toml b/stores/rocksstore-v2/Cargo.toml new file mode 100644 index 000000000..9e39e1534 --- /dev/null +++ b/stores/rocksstore-v2/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "openraft-rocksstore-v2" +description = "A rocksdb based implementation of the `openraft::RaftStorage` trait." +documentation = "https://docs.rs/openraft-rocksstore" +readme = "README.md" + +version = "0.1.0" +edition = "2021" +authors = [ + "drdr xp ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT/Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +openraft = { path= "../../openraft", version = "0.8.4", features=["serde", "storage-v2"] } + +rocksdb = "0.20.1" +rand = "*" +byteorder = "1.4.3" +serde = { version = "1.0.114", features = ["derive"] } +serde_json = "1.0.57" +tracing = "0.1.29" + +[dev-dependencies] +async-trait = { version = "0.1.36" } +tempfile = { version = "3.4.0" } + +[package.metadata.docs.rs] +all-features = true diff --git a/stores/rocksstore-v2/README.md b/stores/rocksstore-v2/README.md new file mode 100644 index 000000000..0b86a1dd4 --- /dev/null +++ b/stores/rocksstore-v2/README.md @@ -0,0 +1,6 @@ +# openraft-rocksstore-v2 + +This is an example of v2 storage [`RaftLogStorage`] and [`RaftStateMachine`] implementation +with [`rocksdb`](https://docs.rs/rocksdb/latest/rocksdb/) based on [openraft-0.8](https://github.com/datafuselabs/openraft/tree/release-0.8). + +This crate is built mainly for testing or demonstrating purpose.:) diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs new file mode 100644 index 000000000..6d652f29c --- /dev/null +++ b/stores/rocksstore-v2/src/lib.rs @@ -0,0 +1,539 @@ +//! This rocks-db backed storage implement the v2 storage API: [`RaftLogStorage`] and +//! [`RaftStateMachine`] traits. Its state machine is pure in-memory store with persisted +//! snapshot. In other words, `applying` a log entry does not flush data to disk at once. +//! These data will be flushed to disk when a snapshot is created. +#![deny(unused_crate_dependencies)] +#![deny(unused_qualifications)] + +#[cfg(test)] mod test; + +use std::collections::BTreeMap; +use std::error::Error; +use std::fmt::Debug; +use std::io::Cursor; +use std::ops::RangeBounds; +use std::path::Path; +use std::sync::Arc; + +use byteorder::BigEndian; +use byteorder::ReadBytesExt; +use byteorder::WriteBytesExt; +use openraft::async_trait::async_trait; +use openraft::storage::LogFlushed; +use openraft::storage::LogState; +use openraft::storage::RaftLogStorage; +use openraft::storage::RaftStateMachine; +use openraft::storage::Snapshot; +use openraft::AnyError; +use openraft::BasicNode; +use openraft::Entry; +use openraft::EntryPayload; +use openraft::ErrorVerb; +use openraft::LogId; +use openraft::RaftLogId; +use openraft::RaftLogReader; +use openraft::RaftSnapshotBuilder; +use openraft::SnapshotMeta; +use openraft::StorageError; +use openraft::StorageIOError; +use openraft::StoredMembership; +use openraft::Vote; +use rand::Rng; +use rocksdb::ColumnFamily; +use rocksdb::ColumnFamilyDescriptor; +use rocksdb::Direction; +use rocksdb::Options; +use rocksdb::DB; +use serde::Deserialize; +use serde::Serialize; + +pub type RocksNodeId = u64; + +openraft::declare_raft_types!( + /// Declare the type configuration for `MemStore`. + pub Config: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, Entry = Entry +); + +/** + * Here you will set the types of request that will interact with the raft nodes. + * For example the `Set` will be used to write data (key and value) to the raft database. + * The `AddNode` will append a new node to the current existing shared list of nodes. + * You will want to add any request that can write data in all nodes here. + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum RocksRequest { + Set { key: String, value: String }, +} + +/** + * Here you will defined what type of answer you expect from reading the data of a node. + * In this example it will return a optional value from a given key in + * the `RocksRequest.Set`. + * + * TODO: Should we explain how to create multiple `AppDataResponse`? + * + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct RocksResponse { + pub value: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RocksSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +#[derive(Debug, Clone)] +#[derive(Default)] +#[derive(Serialize, Deserialize)] +pub struct StateMachine { + pub last_applied_log: Option>, + + pub last_membership: StoredMembership, + + /// Application data. + pub data: BTreeMap, +} + +// TODO: restore state from snapshot +/// State machine in this implementation is a pure in-memory store. +/// It depends on the latest snapshot to restore the state when restarted. +#[derive(Debug, Clone)] +pub struct RocksStateMachine { + db: Arc, + sm: StateMachine, +} + +impl RocksStateMachine { + async fn new(db: Arc) -> RocksStateMachine { + let mut state_machine = Self { + db, + sm: Default::default(), + }; + let snapshot = state_machine.get_current_snapshot().await.unwrap(); + + // Restore previous state from snapshot + if let Some(s) = snapshot { + let prev: StateMachine = serde_json::from_slice(s.snapshot.get_ref()).unwrap(); + state_machine.sm = prev; + } + + state_machine + } +} + +#[derive(Debug, Clone)] +pub struct RocksLogStore { + db: Arc, +} + +type StorageResult = Result>; + +/// converts an id to a byte vector for storing in the database. +/// Note that we're using big endian encoding to ensure correct sorting of keys +fn id_to_bin(id: u64) -> Vec { + let mut buf = Vec::with_capacity(8); + buf.write_u64::(id).unwrap(); + buf +} + +fn bin_to_id(buf: &[u8]) -> u64 { + (&buf[0..8]).read_u64::().unwrap() +} + +/// Meta data of a raft-store. +/// +/// In raft, except logs and state machine, the store also has to store several piece of metadata. +/// This sub mod defines the key-value pairs of these metadata. +mod meta { + use openraft::ErrorSubject; + use openraft::LogId; + + use crate::RocksNodeId; + + /// Defines metadata key and value + pub(crate) trait StoreMeta { + /// The key used to store in rocksdb + const KEY: &'static str; + + /// The type of the value to store + type Value: serde::Serialize + serde::de::DeserializeOwned; + + /// The subject this meta belongs to, and will be embedded into the returned storage error. + fn subject(v: Option<&Self::Value>) -> ErrorSubject; + } + + pub(crate) struct LastPurged {} + pub(crate) struct Vote {} + + impl StoreMeta for LastPurged { + const KEY: &'static str = "last_purged_log_id"; + type Value = LogId; + + fn subject(_v: Option<&Self::Value>) -> ErrorSubject { + ErrorSubject::Store + } + } + impl StoreMeta for Vote { + const KEY: &'static str = "vote"; + type Value = openraft::Vote; + + fn subject(_v: Option<&Self::Value>) -> ErrorSubject { + ErrorSubject::Vote + } + } +} + +impl RocksLogStore { + fn cf_meta(&self) -> &ColumnFamily { + self.db.cf_handle("meta").unwrap() + } + + fn cf_logs(&self) -> &ColumnFamily { + self.db.cf_handle("logs").unwrap() + } + + /// Get a store metadata. + /// + /// It returns `None` if the store does not have such a metadata stored. + fn get_meta(&self) -> Result, StorageError> { + let v = self + .db + .get_cf(self.cf_meta(), M::KEY) + .map_err(|e| StorageIOError::new(M::subject(None), ErrorVerb::Read, AnyError::new(&e)))?; + + let t = match v { + None => None, + Some(bytes) => Some( + serde_json::from_slice(&bytes) + .map_err(|e| StorageIOError::new(M::subject(None), ErrorVerb::Read, AnyError::new(&e)))?, + ), + }; + Ok(t) + } + + /// Save a store metadata. + fn put_meta(&self, value: &M::Value) -> Result<(), StorageError> { + let json_value = serde_json::to_vec(value) + .map_err(|e| StorageIOError::new(M::subject(Some(value)), ErrorVerb::Write, AnyError::new(&e)))?; + + self.db + .put_cf(self.cf_meta(), M::KEY, json_value) + .map_err(|e| StorageIOError::new(M::subject(Some(value)), ErrorVerb::Write, AnyError::new(&e)))?; + + Ok(()) + } +} + +#[async_trait] +impl RaftLogReader for RocksLogStore { + async fn get_log_state(&mut self) -> StorageResult> { + let last = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::End).next(); + + let last_log_id = match last { + None => None, + Some(res) => { + let (_log_index, entry_bytes) = res.map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(&entry_bytes).map_err(read_logs_err)?; + Some(ent.log_id) + } + }; + + let last_purged_log_id = self.get_meta::()?; + + let last_log_id = match last_log_id { + None => last_purged_log_id, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &mut self, + range: RB, + ) -> StorageResult>> { + let start = match range.start_bound() { + std::ops::Bound::Included(x) => id_to_bin(*x), + std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), + std::ops::Bound::Unbounded => id_to_bin(0), + }; + + let mut res = Vec::new(); + + let it = self.db.iterator_cf(self.cf_logs(), rocksdb::IteratorMode::From(&start, Direction::Forward)); + for item_res in it { + let (id, val) = item_res.map_err(read_logs_err)?; + + let id = bin_to_id(&id); + if !range.contains(&id) { + break; + } + + let entry: Entry<_> = serde_json::from_slice(&val).map_err(read_logs_err)?; + + assert_eq!(id, entry.log_id.index); + + res.push(entry); + } + Ok(res) + } +} + +#[async_trait] +impl RaftSnapshotBuilder>> for RocksStateMachine { + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot( + &mut self, + ) -> Result>>, StorageError> { + let data; + + // Serialize the data of the state machine. + data = serde_json::to_vec(&self.sm).map_err(|e| StorageIOError::read_state_machine(&e))?; + + let last_applied_log = self.sm.last_applied_log; + let last_membership = self.sm.last_membership.clone(); + + // Generate a random snapshot index. + let snapshot_idx: u64 = rand::thread_rng().gen_range(0..1000); + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx) + } else { + format!("--{}", snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = RocksSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + let serialized_snapshot = serde_json::to_vec(&snapshot) + .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; + + self.db + .put_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot", serialized_snapshot) + .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } +} + +#[async_trait] +impl RaftLogStorage for RocksLogStore { + type LogReader = Self; + + async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + self.put_meta::(vote)?; + self.db.flush_wal(true).map_err(|e| StorageIOError::write_vote(&e))?; + Ok(()) + } + + async fn read_vote(&mut self) -> Result>, StorageError> { + self.get_meta::() + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } + + async fn append( + &mut self, + entries: I, + callback: LogFlushed, + ) -> Result<(), StorageError> + where + I: IntoIterator> + Send, + { + for entry in entries { + let id = id_to_bin(entry.log_id.index); + assert_eq!(bin_to_id(&id), entry.log_id.index); + self.db + .put_cf( + self.cf_logs(), + id, + serde_json::to_vec(&entry).map_err(|e| StorageIOError::write_logs(&e))?, + ) + .map_err(|e| StorageIOError::write_logs(&e))?; + } + + self.db.flush_wal(true).map_err(|e| StorageIOError::write_logs(&e))?; + + // If there is error, the callback will be dropped. + callback.log_io_completed(Ok(())); + Ok(()) + } + + async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("truncate: [{:?}, +oo)", log_id); + + let from = id_to_bin(log_id.index); + let to = id_to_bin(0xff_ff_ff_ff_ff_ff_ff_ff); + self.db.delete_range_cf(self.cf_logs(), &from, &to).map_err(|e| StorageIOError::write_logs(&e))?; + + self.db.flush_wal(true).map_err(|e| StorageIOError::write_logs(&e))?; + Ok(()) + } + + async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [0, {:?}]", log_id); + + // Write the last-purged log id before purging the logs. + // The logs at and before last-purged log id will be ignored by openraft. + // Therefore there is no need to do it in a transaction. + self.put_meta::(&log_id)?; + + let from = id_to_bin(0); + let to = id_to_bin(log_id.index + 1); + self.db.delete_range_cf(self.cf_logs(), &from, &to).map_err(|e| StorageIOError::write_logs(&e))?; + + // Purging does not need to be persistent. + Ok(()) + } +} +#[async_trait] +impl RaftStateMachine for RocksStateMachine { + type SnapshotData = Cursor>; + type SnapshotBuilder = Self; + + async fn applied_state( + &mut self, + ) -> Result<(Option>, StoredMembership), StorageError> { + Ok((self.sm.last_applied_log, self.sm.last_membership.clone())) + } + + async fn apply(&mut self, entries: I) -> Result, StorageError> + where I: IntoIterator> + Send { + let entries_iter = entries.into_iter(); + let mut res = Vec::with_capacity(entries_iter.size_hint().0); + + let sm = &mut self.sm; + + for entry in entries_iter { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied_log = Some(*entry.get_log_id()); + + match entry.payload { + EntryPayload::Blank => res.push(RocksResponse { value: None }), + EntryPayload::Normal(ref req) => match req { + RocksRequest::Set { key, value } => { + sm.data.insert(key.clone(), value.clone()); + res.push(RocksResponse { + value: Some(value.clone()), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = StoredMembership::new(Some(entry.log_id), mem.clone()); + res.push(RocksResponse { value: None }) + } + }; + } + Ok(res) + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.clone() + } + + async fn begin_receiving_snapshot(&mut self) -> Result, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + async fn install_snapshot( + &mut self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result<(), StorageError> { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = RocksSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + let updated_state_machine: StateMachine = serde_json::from_slice(&new_snapshot.data) + .map_err(|e| StorageIOError::read_snapshot(Some(new_snapshot.meta.signature()), &e))?; + + self.sm = updated_state_machine; + + // Save snapshot + + let serialized_snapshot = serde_json::to_vec(&new_snapshot) + .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; + + self.db + .put_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot", serialized_snapshot) + .map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), AnyError::new(&e)))?; + + self.db.flush_wal(true).map_err(|e| StorageIOError::write_snapshot(Some(meta.signature()), &e))?; + Ok(()) + } + + async fn get_current_snapshot( + &mut self, + ) -> Result>, StorageError> { + let x = self + .db + .get_cf(self.db.cf_handle("sm_meta").unwrap(), "snapshot") + .map_err(|e| StorageIOError::write_snapshot(None, AnyError::new(&e)))?; + + let bytes = match x { + Some(x) => x, + None => return Ok(None), + }; + + let snapshot: RocksSnapshot = + serde_json::from_slice(&bytes).map_err(|e| StorageIOError::write_snapshot(None, AnyError::new(&e)))?; + + let data = snapshot.data.clone(); + + Ok(Some(Snapshot { + meta: snapshot.meta, + snapshot: Box::new(Cursor::new(data)), + })) + } +} + +/// Create a pair of `RocksLogStore` and `RocksStateMachine` that are backed by a same rocks db +/// instance. +pub async fn new>(db_path: P) -> (RocksLogStore, RocksStateMachine) { + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + let meta = ColumnFamilyDescriptor::new("meta", Options::default()); + let sm_meta = ColumnFamilyDescriptor::new("sm_meta", Options::default()); + let logs = ColumnFamilyDescriptor::new("logs", Options::default()); + + let db = DB::open_cf_descriptors(&db_opts, db_path, vec![meta, sm_meta, logs]).unwrap(); + + let db = Arc::new(db); + (RocksLogStore { db: db.clone() }, RocksStateMachine::new(db).await) +} + +fn read_logs_err(e: impl Error + 'static) -> StorageError { + StorageError::IO { + source: StorageIOError::read_logs(&e), + } +} diff --git a/stores/rocksstore-v2/src/test.rs b/stores/rocksstore-v2/src/test.rs new file mode 100644 index 000000000..4b9ae1ab1 --- /dev/null +++ b/stores/rocksstore-v2/src/test.rs @@ -0,0 +1,46 @@ +use async_trait::async_trait; +use openraft::testing::StoreBuilder; +use openraft::testing::Suite; +use openraft::StorageError; +use tempfile::TempDir; + +use crate::Config; +use crate::RocksLogStore; +use crate::RocksNodeId; +use crate::RocksStateMachine; + +struct RocksBuilder {} +#[async_trait] +impl StoreBuilder for RocksBuilder { + async fn build(&self) -> Result<(TempDir, RocksLogStore, RocksStateMachine), StorageError> { + let td = TempDir::new().expect("couldn't create temp dir"); + let (log_store, sm) = crate::new(td.path()).await; + Ok((td, log_store, sm)) + } +} +/// To customize a builder: +/// +/// ```ignore +/// use async_trait::async_trait; +/// use openraft::testing::StoreBuilder; +/// use crate::ClientRequest; +/// use crate::ClientResponse; +/// +/// struct MemStoreBuilder {} +/// +/// #[async_trait] +/// impl StoreBuilder for MemStoreBuilder { +/// async fn build(&self) -> MemStore { +/// MemStore::new().await +/// } +/// } +/// #[test] +/// pub fn test_mem_store() -> anyhow::Result<()> { +/// Suite::test_all(MemStoreBuilder {}) +/// } +/// ``` +#[test] +pub fn test_rocks_store() -> Result<(), StorageError> { + Suite::test_all(RocksBuilder {})?; + Ok(()) +}