diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs index 1672b3d22..b15e474d6 100644 --- a/openraft/src/config/config.rs +++ b/openraft/src/config/config.rs @@ -83,75 +83,56 @@ fn parse_snapshot_policy(src: &str) -> Result { #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] pub struct Config { /// The application specific name of this Raft cluster - #[clap(long, env = "RAFT_CLUSTER_NAME", default_value = "foo")] + #[clap(long, default_value = "foo")] pub cluster_name: String, /// The minimum election timeout in milliseconds - #[clap(long, env = "RAFT_ELECTION_TIMEOUT_MIN", default_value = "150")] + #[clap(long, default_value = "150")] pub election_timeout_min: u64, /// The maximum election timeout in milliseconds - #[clap(long, env = "RAFT_ELECTION_TIMEOUT_MAX", default_value = "300")] + #[clap(long, default_value = "300")] pub election_timeout_max: u64, /// The heartbeat interval in milliseconds at which leaders will send heartbeats to followers - #[clap(long, env = "RAFT_HEARTBEAT_INTERVAL", default_value = "50")] + #[clap(long, default_value = "50")] pub heartbeat_interval: u64, /// The timeout for sending a snapshot segment, in millisecond - #[clap(long, env = "RAFT_INSTALL_SNAPSHOT_TIMEOUT", default_value = "200")] + #[clap(long, default_value = "200")] pub install_snapshot_timeout: u64, /// The maximum number of entries per payload allowed to be transmitted during replication /// /// If this is too low, it will take longer for the nodes to be brought up to /// consistency with the rest of the cluster. - #[clap(long, env = "RAFT_MAX_PAYLOAD_ENTRIES", default_value = "300")] + #[clap(long, default_value = "300")] pub max_payload_entries: u64, /// The distance behind in log replication a follower must fall before it is considered lagging /// /// Once a replication stream transition into line-rate state, the target node will be considered safe to join a /// cluster. - #[clap(long, env = "RAFT_REPLICATION_LAG_THRESHOLD", default_value = "1000")] + #[clap(long, default_value = "1000")] pub replication_lag_threshold: u64, /// The snapshot policy to use for a Raft node. #[clap( long, - env = "RAFT_SNAPSHOT_POLICY", default_value = "since_last:5000", parse(try_from_str=parse_snapshot_policy) )] pub snapshot_policy: SnapshotPolicy, - /// Whether to keep `applied_log`s that are not included by snapshots. - /// - /// If your application may rebuild it's state machine from snapshots, - /// please set this to true. - /// - /// By default, `OpenRaft` purges `applied_log`s from time to time regardless of snapshots, because it assumes once - /// logs are `applied` to the state machine, logs are persisted on disk. - /// - /// If an implementation does not persist data when `RaftStorage::apply_to_state_machine()` returns, and just - /// relies on `snapshot` to rebuild the state machine when the next time it restarts, the application must always - /// set `keep_unsnapshoted_log` to `true`, so that only logs that are already included in a snapshot will be - /// purged. - // - // This is another way to implement: - // #[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG", - // default_value_t = false, - // value_parser=clap::value_parser!(bool))] - #[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG")] - pub keep_unsnapshoted_log: bool, - /// The maximum snapshot chunk size allowed when transmitting snapshots (in bytes) - #[clap(long, env = "RAFT_SNAPSHOT_MAX_CHUNK_SIZE", default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))] + #[clap(long, default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))] pub snapshot_max_chunk_size: u64, - /// The maximum number of applied logs to keep before purging - #[clap(long, env = "RAFT_MAX_APPLIED_LOG_TO_KEEP", default_value = "1000")] - pub max_applied_log_to_keep: u64, + /// The maximum number of logs to keep that are already included in **snapshot**. + /// + /// Logs that are not in snapshot will never be purged. + #[clap(long, default_value = "1000")] + pub max_in_snapshot_log_to_keep: u64, /// The minimal number of applied logs to purge in a batch. #[clap(long, default_value = "1")] diff --git a/openraft/src/config/config_test.rs b/openraft/src/config/config_test.rs index 05b8732ed..95bdbe3ac 100644 --- a/openraft/src/config/config_test.rs +++ b/openraft/src/config/config_test.rs @@ -56,9 +56,8 @@ fn test_build() -> anyhow::Result<()> { "--max-payload-entries=201", "--replication-lag-threshold=202", "--snapshot-policy=since_last:203", - "--keep-unsnapshoted-log", "--snapshot-max-chunk-size=204", - "--max-applied-log-to-keep=205", + "--max-in-snapshot-log-to-keep=205", "--purge-batch-size=207", ])?; @@ -70,25 +69,13 @@ fn test_build() -> anyhow::Result<()> { assert_eq!(201, config.max_payload_entries); assert_eq!(202, config.replication_lag_threshold); assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy); - assert_eq!(true, config.keep_unsnapshoted_log); assert_eq!(204, config.snapshot_max_chunk_size); - assert_eq!(205, config.max_applied_log_to_keep); + assert_eq!(205, config.max_in_snapshot_log_to_keep); assert_eq!(207, config.purge_batch_size); Ok(()) } -#[test] -fn test_config_keep_unsnapshoted_log() -> anyhow::Result<()> { - let config = Config::build(&["foo", "--keep-unsnapshoted-log"])?; - assert_eq!(true, config.keep_unsnapshoted_log); - - let config = Config::build(&["foo"])?; - assert_eq!(false, config.keep_unsnapshoted_log); - - Ok(()) -} - #[test] fn test_config_enable_tick() -> anyhow::Result<()> { let config = Config::build(&["foo", "--enable-tick=false"])?; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 079608bc1..ce33368c8 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -250,9 +250,8 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore { - self.engine.update_snapshot(meta); + self.engine.finish_building_snapshot(meta); self.run_engine_commands::>(&[]).await?; } SnapshotResult::StorageError(sto_err) => { @@ -868,6 +867,8 @@ impl, S: RaftStorage> RaftCore bool { + tracing::debug!( + "snapshot_last_indeix:{}, last_log_index:{}, threshold: {}", + snapshot_last_index, + last_log_index, + threshold + ); // Calculate distance from actor's last log index. let distance_from_line = last_log_index.saturating_sub(*snapshot_last_index); diff --git a/openraft/src/engine/calc_purge_upto_test.rs b/openraft/src/engine/calc_purge_upto_test.rs index b54dc237c..8d94d71d5 100644 --- a/openraft/src/engine/calc_purge_upto_test.rs +++ b/openraft/src/engine/calc_purge_upto_test.rs @@ -24,7 +24,8 @@ fn eng() -> Engine { #[test] fn test_calc_purge_upto() -> anyhow::Result<()> { - // last_purged_log_id, committed, max_keep, want + // last_purged_log_id, last_snapshot_log_id, max_keep, want + // last_applied should not affect the purge let cases = vec![ // (None, None, 0, None), @@ -53,71 +54,23 @@ fn test_calc_purge_upto() -> anyhow::Result<()> { (Some(log_id(1, 2)), Some(log_id(3, 4)), 5, None), ]; - for (last_purged, committed, max_keep, want) in cases { + for (last_purged, snapshot_last_log_id, max_keep, want) in cases { let mut eng = eng(); - eng.config.keep_unsnapshoted_log = false; - eng.config.max_applied_log_to_keep = max_keep; + eng.config.max_in_snapshot_log_to_keep = max_keep; eng.config.purge_batch_size = 1; if let Some(last_purged) = last_purged { eng.state.log_ids.purge(&last_purged); } - eng.state.committed = committed; + eng.snapshot_meta.last_log_id = snapshot_last_log_id; let got = eng.calc_purge_upto(); assert_eq!( want, got, - "case: last_purged: {:?}, last_applied: {:?}, max_keep: {}", - last_purged, committed, max_keep + "case: last_purged: {:?}, snapshot_last_log_id: {:?}, max_keep: {}", + last_purged, snapshot_last_log_id, max_keep ); } Ok(()) } - -#[test] -// in this test, keep_unsnapshoted_log is set to true. -// logs being purged should at most the last that was in the snapshot. -fn test_keep_unsnapshoted() -> anyhow::Result<()> { - let cases = vec![ - // last_deleted, last_applied, last_snapshoted, max_keep, want - // empty test - (None, None, None, 0, None), - (None, None, None, 1, None), - // nothing in snapshot - (Some(log_id(1, 1)), Some(log_id(2, 2)), None, 0, None), - (Some(log_id(1, 1)), Some(log_id(5, 5)), None, 0, None), - // snapshot kept up - (None, Some(log_id(5, 5)), Some(log_id(3, 4)), 0, Some(log_id(3, 4))), - ( - Some(log_id(1, 1)), - Some(log_id(5, 5)), - Some(log_id(5, 5)), - 0, - Some(log_id(5, 5)), - ), - ]; - - for (purged, committed, snapshot, keep, want) in cases { - let mut eng = eng(); - eng.config.keep_unsnapshoted_log = true; - eng.config.max_applied_log_to_keep = keep; - eng.config.purge_batch_size = 1; - - if let Some(last_purged) = purged { - eng.state.log_ids.purge(&last_purged); - } - eng.state.committed = committed; - eng.snapshot_meta.last_log_id = snapshot; - - let got = eng.calc_purge_upto(); - - assert_eq!( - want, got, - "case: purged: {:?}, committed: {:?}, snapshot: {:?}, keep: {}", - purged, committed, snapshot, keep - ) - } - - Ok(()) -} diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index d57dc5b46..af269fb70 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -33,22 +33,17 @@ use crate::Vote; #[derive(PartialEq, Eq)] pub(crate) struct EngineConfig { /// The maximum number of applied logs to keep before purging. - pub(crate) max_applied_log_to_keep: u64, + pub(crate) max_in_snapshot_log_to_keep: u64, /// The minimal number of applied logs to purge in a batch. pub(crate) purge_batch_size: u64, - - /// whether to keep applied log that are not included in snapshots. - /// false by default - pub(crate) keep_unsnapshoted_log: bool, } impl Default for EngineConfig { fn default() -> Self { Self { - max_applied_log_to_keep: 1000, + max_in_snapshot_log_to_keep: 1000, purge_batch_size: 256, - keep_unsnapshoted_log: false, } } } @@ -433,7 +428,6 @@ where already_committed: prev_committed, upto: committed.unwrap(), }); - self.purge_applied_log(); } } @@ -540,13 +534,13 @@ where } } - /// Purge applied log if needed. + /// Purge logs that are already in snapshot if needed. /// - /// `max_applied_log_to_keep` specifies the number of applied logs to keep. - /// `max_applied_log_to_keep==0` means every applied log can be purged. + /// `max_in_snapshot_log_to_keep` specifies the number of logs already included in snapshot to keep. + /// `max_in_snapshot_log_to_keep==0` means to purge every log stored in snapshot. // NOTE: simple method, not tested. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn purge_applied_log(&mut self) { + pub(crate) fn purge_in_snapshot_log(&mut self) { if let Some(purge_upto) = self.calc_purge_upto() { self.purge_log(purge_upto); } @@ -562,20 +556,13 @@ where #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn calc_purge_upto(&mut self) -> Option> { let st = &self.state; - let last_applied = &st.committed; - let max_keep = self.config.max_applied_log_to_keep; + let max_keep = self.config.max_in_snapshot_log_to_keep; let batch_size = self.config.purge_batch_size; - let mut purge_end = last_applied.next_index().saturating_sub(max_keep); - - if self.config.keep_unsnapshoted_log { - let idx = self.snapshot_meta.last_log_id.next_index(); - tracing::debug!("the very last log included in snapshots: {}", idx); - purge_end = idx.min(purge_end); - } + let purge_end = self.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep); tracing::debug!( - last_applied = debug(last_applied), + snapshot_last_log_id = debug(self.snapshot_meta.last_log_id), max_keep, "try purge: (-oo, {})", purge_end @@ -583,7 +570,7 @@ where if st.last_purged_log_id().next_index() + batch_size > purge_end { tracing::debug!( - last_applied = debug(last_applied), + snapshot_last_log_id = debug(self.snapshot_meta.last_log_id), max_keep, last_purged_log_id = display(st.last_purged_log_id().summary()), batch_size, @@ -640,6 +627,7 @@ where // membership.log_id = (10, 5); // local_effective.log_id = (2, 10); if effective.log_id.index() <= m.log_id.index() { + // TODO: if effective membership changes, call `update_repliation()` effective = m; } @@ -743,7 +731,6 @@ where already_committed: prev_committed, upto: self.state.committed.unwrap(), }); - self.purge_applied_log(); } } @@ -801,6 +788,18 @@ where self.purge_log(snap_last_log_id) } + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta) { + tracing::info!("finish_building_snapshot: {:?}", meta); + + let updated = self.update_snapshot(meta); + if !updated { + return; + } + + self.purge_in_snapshot_log(); + } + /// Update engine state when a new snapshot is built or installed. /// /// Engine records only the metadata of a snapshot. Snapshot data is stored by RaftStorage implementation. diff --git a/openraft/src/engine/follower_commit_entries_test.rs b/openraft/src/engine/follower_commit_entries_test.rs index 5ab97dd13..d85307a5d 100644 --- a/openraft/src/engine/follower_commit_entries_test.rs +++ b/openraft/src/engine/follower_commit_entries_test.rs @@ -4,7 +4,6 @@ use maplit::btreeset; use crate::engine::Command; use crate::engine::Engine; -use crate::engine::LogIdList; use crate::EffectiveMembership; use crate::Entry; use crate::EntryPayload; @@ -175,55 +174,3 @@ fn test_follower_commit_entries_gt_last_entry() -> anyhow::Result<()> { Ok(()) } - -#[test] -fn test_follower_commit_entries_purge_to_committed() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state.log_ids = LogIdList::new([log_id(2, 2), log_id(2, 3)]); - eng.config.max_applied_log_to_keep = 0; - eng.config.purge_batch_size = 1; - - eng.follower_commit_entries(Some(log_id(3, 1)), None, &[blank(2, 3)]); - - assert_eq!(Some(log_id(2, 3)), eng.state.committed); - assert_eq!(Some(log_id(2, 3)), eng.state.last_purged_log_id()); - - assert_eq!( - vec![ - Command::FollowerCommit { - already_committed: Some(log_id(1, 1)), - upto: log_id(2, 3) - }, - Command::PurgeLog { upto: log_id(2, 3) }, - ], - eng.commands - ); - - Ok(()) -} - -#[test] -fn test_follower_commit_entries_purge_to_committed_minus_1() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state.log_ids = LogIdList::new([log_id(1, 1), log_id(2, 3)]); - eng.config.max_applied_log_to_keep = 1; - eng.config.purge_batch_size = 1; - - eng.follower_commit_entries(Some(log_id(3, 1)), None, &[blank(2, 3)]); - - assert_eq!(Some(log_id(2, 3)), eng.state.committed); - assert_eq!(Some(log_id(1, 2)), eng.state.last_purged_log_id()); - - assert_eq!( - vec![ - Command::FollowerCommit { - already_committed: Some(log_id(1, 1)), - upto: log_id(2, 3) - }, - Command::PurgeLog { upto: log_id(1, 2) }, - ], - eng.commands - ); - - Ok(()) -} diff --git a/openraft/src/engine/update_progress_test.rs b/openraft/src/engine/update_progress_test.rs index ca8fb6e98..1cd5ae03f 100644 --- a/openraft/src/engine/update_progress_test.rs +++ b/openraft/src/engine/update_progress_test.rs @@ -4,7 +4,6 @@ use maplit::btreeset; use crate::engine::Command; use crate::engine::Engine; -use crate::engine::LogIdList; use crate::EffectiveMembership; use crate::LeaderId; use crate::LogId; @@ -102,65 +101,3 @@ fn test_update_progress_update_leader_progress() -> anyhow::Result<()> { Ok(()) } - -#[test] -fn test_update_progress_purge_upto_committed() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state.log_ids = LogIdList::new([log_id(2, 0), log_id(2, 5)]); - eng.config.max_applied_log_to_keep = 0; - eng.config.purge_batch_size = 1; - - eng.state.new_leader(); - - // progress: None, (2,1), (2,3); committed: (2,1) - eng.update_progress(3, Some(log_id(1, 2))); - eng.update_progress(2, Some(log_id(2, 1))); - eng.update_progress(3, Some(log_id(2, 3))); - assert_eq!(Some(log_id(2, 1)), eng.state.committed); - assert_eq!( - vec![ - Command::ReplicateCommitted { - committed: Some(log_id(2, 1)) - }, - Command::LeaderCommit { - already_committed: None, - upto: log_id(2, 1) - }, - Command::PurgeLog { upto: log_id(2, 1) }, - ], - eng.commands - ); - - Ok(()) -} - -#[test] -fn test_update_progress_purge_upto_committed_minus_1() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state.log_ids = LogIdList::new([log_id(2, 0), log_id(2, 5)]); - eng.config.max_applied_log_to_keep = 1; - eng.config.purge_batch_size = 1; - - eng.state.new_leader(); - - // progress: None, (2,1), (2,3); committed: (2,1) - eng.update_progress(3, Some(log_id(1, 2))); - eng.update_progress(2, Some(log_id(2, 2))); - eng.update_progress(3, Some(log_id(2, 4))); - assert_eq!(Some(log_id(2, 2)), eng.state.committed); - assert_eq!( - vec![ - Command::ReplicateCommitted { - committed: Some(log_id(2, 2)) - }, - Command::LeaderCommit { - already_committed: None, - upto: log_id(2, 2) - }, - Command::PurgeLog { upto: log_id(2, 1) }, - ], - eng.commands - ); - - Ok(()) -} diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 6b79b2991..04d294d25 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -968,7 +968,7 @@ where format!("InstallSnapshot: {}", rpc.summary()) } RaftMsg::BuildingSnapshotResult { result: update } => { - format!("SnapshotUpdate: {:?}", update) + format!("BuildingSnapshotResult: {:?}", update) } RaftMsg::ClientWriteRequest { payload: rpc, .. } => { format!("ClientWriteRequest: {}", rpc.summary()) diff --git a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs index 4431b59c1..a20360006 100644 --- a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs +++ b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs @@ -23,7 +23,7 @@ async fn stale_last_log_id() -> Result<()> { election_timeout_min: 500, election_timeout_max: 1000, max_payload_entries: 1, - max_applied_log_to_keep: 0, + max_in_snapshot_log_to_keep: 0, purge_batch_size: 1, enable_heartbeat: false, ..Default::default() diff --git a/openraft/tests/log_compaction/t10_compaction.rs b/openraft/tests/log_compaction/t10_compaction.rs index 45b231710..a04f39894 100644 --- a/openraft/tests/log_compaction/t10_compaction.rs +++ b/openraft/tests/log_compaction/t10_compaction.rs @@ -38,7 +38,7 @@ async fn compaction() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 2, + max_in_snapshot_log_to_keep: 2, purge_batch_size: 1, enable_tick: false, ..Default::default() diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index edda69fa0..da8192586 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -24,7 +24,7 @@ async fn add_learner_basic() -> Result<()> { let config = Arc::new( Config { replication_lag_threshold: 0, - max_applied_log_to_keep: 2000, // prevent snapshot + max_in_snapshot_log_to_keep: 2000, // prevent snapshot purge_batch_size: 1, enable_tick: false, ..Default::default() diff --git a/openraft/tests/snapshot/main.rs b/openraft/tests/snapshot/main.rs index 68f3dd5d7..676c61de2 100644 --- a/openraft/tests/snapshot/main.rs +++ b/openraft/tests/snapshot/main.rs @@ -7,9 +7,10 @@ mod fixtures; mod t20_api_install_snapshot; mod t20_trigger_snapshot; mod t23_snapshot_chunk_size; -mod t24_snapshot_ge_half_threshold; +mod t24_snapshot_when_lacking_log; mod t25_snapshot_line_rate_to_snapshot; mod t40_after_snapshot_add_learner_and_request_a_log; +mod t40_purge_in_snapshot_logs; mod t41_snapshot_overrides_membership; mod t42_snapshot_uses_prev_snap_membership; mod t43_snapshot_delete_conflict_logs; diff --git a/openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs b/openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs similarity index 79% rename from openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs rename to openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs index 19f3aeb97..e67004ad0 100644 --- a/openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs +++ b/openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs @@ -10,25 +10,20 @@ use openraft::SnapshotPolicy; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; -/// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.: -/// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold` -/// -/// What does this test do? +/// A leader switch to snapshot replication if a log a follower/learner needs but is already purged. /// /// - build a stable single node cluster. /// - send enough requests to the node that log compaction will be triggered. -/// - send some other log after snapshot created, to make the `leader.last_log_index - snapshot.applied_index` big -/// enough. /// - add learner and assert that they receive the snapshot and logs. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -async fn snapshot_ge_half_threshold() -> Result<()> { - let snapshot_threshold: u64 = 10; - let log_cnt = snapshot_threshold + 6; +async fn switch_to_snapshot_replication_when_lacking_log() -> Result<()> { + let snapshot_threshold: u64 = 20; + let log_cnt = snapshot_threshold + 11; let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 6, + max_in_snapshot_log_to_keep: 0, purge_batch_size: 1, enable_heartbeat: false, ..Default::default() @@ -79,10 +74,15 @@ async fn snapshot_ge_half_threshold() -> Result<()> { log_index += 1; router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "add learner").await?; - let expected_snap = Some((log_index.into(), 1)); router - .wait_for_snapshot(&btreeset![1], LogId::new(LeaderId::new(1, 0), log_index), None, "") + .wait_for_snapshot( + &btreeset![1], + LogId::new(LeaderId::new(1, 0), snapshot_threshold - 1), + None, + "", + ) .await?; + let expected_snap = Some(((snapshot_threshold - 1).into(), 1)); router .assert_storage_state( 1, diff --git a/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs b/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs index a07f21aa7..6f6cee41a 100644 --- a/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs +++ b/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs @@ -19,7 +19,7 @@ async fn after_snapshot_add_learner_and_request_a_log() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 2, // do not let add-learner log and client-write log to trigger a snapshot. + max_in_snapshot_log_to_keep: 2, // do not let add-learner log and client-write log to trigger a snapshot. purge_batch_size: 1, enable_heartbeat: false, ..Default::default() diff --git a/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs new file mode 100644 index 000000000..d4b4fe556 --- /dev/null +++ b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::Config; +use openraft::LeaderId; +use openraft::LogId; +use openraft::RaftLogReader; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Leader logs should be deleted upto snapshot.last_log_id-max_in_snapshot_log_to_keep after building snapshot; +/// Follower/learner should delete upto snapshot.last_log_id after installing snapshot. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn purge_in_snapshot_logs() -> Result<()> { + let max_keep = 2; + + let config = Arc::new( + Config { + max_in_snapshot_log_to_keep: max_keep, + purge_batch_size: 1, + enable_tick: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; + + let leader = router.get_raft_handle(&0)?; + let learner = router.get_raft_handle(&1)?; + + tracing::info!("--- build snapshot on leader, check purged log"); + { + log_index += router.client_request_many(0, "0", 10).await?; + leader.trigger_snapshot().await?; + leader + .wait(timeout()) + .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "building 1st snapshot") + .await?; + let mut sto0 = router.get_storage_handle(&0)?; + let logs = sto0.try_get_log_entries(..).await?; + assert_eq!(max_keep as usize, logs.len()); + } + + // Leader: .......15..20 + // Learner: 0..10 + tracing::info!("--- block replication, build another snapshot"); + { + router.isolate_node(1); + + log_index += router.client_request_many(0, "0", 5).await?; + router.wait(&0, timeout()).log(Some(log_index), "write another 5 logs").await?; + + leader.trigger_snapshot().await?; + leader + .wait(timeout()) + .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "building 2nd snapshot") + .await?; + } + + tracing::info!("--- restore replication, install the 2nd snapshot on learner"); + { + router.restore_node(1); + + learner + .wait(timeout()) + .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "learner install snapshot") + .await?; + + let mut sto1 = router.get_storage_handle(&1)?; + let logs = sto1.try_get_log_entries(..).await?; + assert_eq!(0, logs.len()); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +} diff --git a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs index 834ca210b..763014e60 100644 --- a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs +++ b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs @@ -36,7 +36,7 @@ async fn snapshot_overrides_membership() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 0, + max_in_snapshot_log_to_keep: 0, purge_batch_size: 1, enable_heartbeat: false, ..Default::default() @@ -111,6 +111,8 @@ async fn snapshot_overrides_membership() -> Result<()> { tracing::info!("--- add learner to the cluster to receive snapshot, which overrides the learner storage"); { + let snapshot_index = log_index; + router.add_learner(0, 1).await.expect("failed to add new node as learner"); log_index += 1; @@ -118,10 +120,15 @@ async fn snapshot_overrides_membership() -> Result<()> { router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; router - .wait_for_snapshot(&btreeset![1], LogId::new(LeaderId::new(1, 0), log_index), timeout(), "") + .wait_for_snapshot( + &btreeset![1], + LogId::new(LeaderId::new(1, 0), snapshot_index), + timeout(), + "", + ) .await?; - let expected_snap = Some((log_index.into(), 1)); + let expected_snap = Some((snapshot_index.into(), 1)); router .assert_storage_state( @@ -152,5 +159,5 @@ async fn snapshot_overrides_membership() -> Result<()> { } fn timeout() -> Option { - Some(Duration::from_millis(5000)) + Some(Duration::from_millis(1_000)) } diff --git a/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs b/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs index 8eb394cb0..7608fb593 100644 --- a/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs +++ b/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs @@ -36,7 +36,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), // Use 3, with 1 it triggers a compaction when replicating ent-1, // because ent-0 is removed. - max_applied_log_to_keep: 3, + max_in_snapshot_log_to_keep: 3, purge_batch_size: 1, enable_tick: false, ..Default::default() @@ -67,7 +67,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { &btreeset![0], LogId::new(LeaderId::new(1, 0), log_index), timeout(), - "snapshot", + "1st snapshot", ) .await?; @@ -87,23 +87,6 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { m.effective.membership, "membership " ); - - // TODO(xp): this assertion fails because when change-membership, a append-entries request does not update - // voted_for and does not call save_vote. - // Thus the storage layer does not know about the leader==Some(0). - // Update voted_for whenever a new leader is seen would solve this issue. - // router - // .assert_storage_state( - // 1, - // want, - // Some(0), - // want, - // Some((want.into(), 1, MembershipConfig { - // members: btreeset![0, 1], - // members_after_consensus: None, - // })), - // ) - // .await; } tracing::info!("--- send just enough logs to trigger the 2nd snapshot"); @@ -117,7 +100,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> { &btreeset![0], LogId::new(LeaderId::new(1, 0), log_index), None, - "snapshot", + "2nd snapshot", ) .await?; } diff --git a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs index e9100f8f9..e1e95cd52 100644 --- a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs +++ b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs @@ -39,7 +39,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { let config = Arc::new( Config { snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold), - max_applied_log_to_keep: 0, + max_in_snapshot_log_to_keep: 0, purge_batch_size: 1, enable_heartbeat: false, ..Default::default() diff --git a/openraft/tests/state_machine/main.rs b/openraft/tests/state_machine/main.rs index 601175407..f667e9b3b 100644 --- a/openraft/tests/state_machine/main.rs +++ b/openraft/tests/state_machine/main.rs @@ -9,4 +9,3 @@ mod fixtures; mod t10_total_order_apply; mod t20_state_machine_apply_membership; -mod t40_clean_applied_logs; diff --git a/openraft/tests/state_machine/t40_clean_applied_logs.rs b/openraft/tests/state_machine/t40_clean_applied_logs.rs deleted file mode 100644 index e9835571b..000000000 --- a/openraft/tests/state_machine/t40_clean_applied_logs.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use anyhow::Result; -use maplit::btreeset; -use openraft::Config; -use openraft::RaftLogReader; - -use crate::fixtures::init_default_ut_tracing; -use crate::fixtures::RaftRouter; - -/// Logs should be deleted by raft after applying them, on leader and learner. -/// -/// - assert logs are deleted on leader after applying them. -/// - assert logs are deleted on replication target after installing a snapshot. -#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -async fn clean_applied_logs() -> Result<()> { - let config = Arc::new( - Config { - max_applied_log_to_keep: 2, - purge_batch_size: 1, - enable_tick: false, - ..Default::default() - } - .validate()?, - ); - - let mut router = RaftRouter::new(config.clone()); - - let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; - - let count = (10 - log_index) as usize; - for idx in 0..count { - router.client_request(0, "0", idx as u64).await?; - log_index += 1; - - // raft commit at once with a single leader cluster. - // If we send too fast, logs are removed before forwarding to learner. - // Then it triggers snapshot replication, which is not expected. - router - .wait_for_log( - &btreeset! {0,1}, - Some(log_index), - timeout(), - "client write repliated to all nodes", - ) - .await?; - } - - tracing::info!("--- logs before max_applied_log_to_keep should be cleaned"); - { - for node_id in 0..1 { - let mut sto = router.get_storage_handle(&node_id)?; - let logs = sto.get_log_entries(..).await?; - assert_eq!(2, logs.len(), "node {} should have only {} logs", node_id, 2); - } - } - - Ok(()) -} - -fn timeout() -> Option { - Some(Duration::from_millis(5000)) -}