diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 93f4d9182..f014aba97 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -9,6 +9,8 @@ use std::collections::HashMap; use std::fmt::Debug; use std::io::Cursor; use std::ops::RangeBounds; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; @@ -117,6 +119,11 @@ pub enum BlockOperation { pub struct MemStore { last_purged_log_id: RwLock>>, + /// Saving committed log id is optional in Openraft. + /// + /// This flag switches on the saving for testing purposes. + pub enable_saving_committed: AtomicBool, + committed: RwLock>>, /// The Raft log. Logs are stored in serialized json. @@ -146,6 +153,7 @@ impl MemStore { Self { last_purged_log_id: RwLock::new(None), + enable_saving_committed: AtomicBool::new(true), committed: RwLock::new(None), log, sm, @@ -325,13 +333,23 @@ impl RaftStorage for Arc { } async fn save_committed(&mut self, committed: Option>) -> Result<(), StorageError> { - tracing::debug!(?committed, "save_committed"); + let enabled = self.enable_saving_committed.load(Ordering::Relaxed); + tracing::debug!(?committed, "save_committed, enabled: {}", enabled); + if !enabled { + return Ok(()); + } let mut c = self.committed.write().await; *c = committed; Ok(()) } async fn read_committed(&mut self) -> Result>, StorageError> { + let enabled = self.enable_saving_committed.load(Ordering::Relaxed); + tracing::debug!("read_committed, enabled: {}", enabled); + if !enabled { + return Ok(None); + } + Ok(*self.committed.read().await) } diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 5b8fbd997..9bafe47c6 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -40,6 +40,7 @@ fn eng() -> Engine { eng } +/// It is a Leader but not yet append any logs. #[test] fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { let mut eng = eng(); diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index f7fe6e151..257bab5c5 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -241,6 +241,9 @@ pub struct TypedRaftRouter { #[allow(clippy::type_complexity)] nodes: Arc>>, + /// Whether to save the committed entries to the RaftLogStorage. + pub enable_saving_committed: bool, + /// Whether to fail a network RPC that is sent from/to a node. /// And it defines what kind of error to return. fail_rpc: Arc>>, @@ -290,6 +293,7 @@ impl Builder { TypedRaftRouter { config: self.config, nodes: Default::default(), + enable_saving_committed: true, fail_rpc: Default::default(), send_delay: Arc::new(AtomicU64::new(send_delay)), append_entries_quota: Arc::new(Mutex::new(None)), @@ -449,7 +453,9 @@ impl TypedRaftRouter { pub fn new_store(&mut self) -> (MemLogStore, MemStateMachine) { let store = Arc::new(MemStore::default()); - Adaptor::new(store) + store.enable_saving_committed.store(self.enable_saving_committed, Ordering::Relaxed); + let (log, sm) = Adaptor::new(store); + (log, sm) } #[tracing::instrument(level = "debug", skip_all)] diff --git a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs index 0f3666dcb..b29395847 100644 --- a/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs +++ b/tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs @@ -13,6 +13,9 @@ use crate::fixtures::RaftRouter; /// A single leader should re-apply all logs upon startup, /// because itself is a quorum. +/// +/// This test disables save_committed() to ensure that logs are still re-applied because the leader +/// itself forms a quorum. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { let config = Arc::new( @@ -24,6 +27,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> { ); let mut router = RaftRouter::new(config.clone()); + router.enable_saving_committed = false; tracing::info!("--- bring up cluster of 1 node"); let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;