diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 7ff420458..4164fefaa 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -893,7 +893,7 @@ impl, S: RaftStorage> RaftRuntime Command::InstallElectionTimer { .. } => { self.update_election_timeout(); } - Command::PurgeAppliedLog { .. } => {} + Command::PurgeLog { .. } => {} Command::DeleteConflictLog { .. } => {} Command::BuildSnapshot { .. } => {} Command::SendVote { vote_req } => { diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 19d29a898..107b0922f 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -58,15 +58,15 @@ pub(crate) enum Command { // An already installed timer should be cleared. InstallElectionTimer {}, + #[allow(dead_code)] + PurgeLog { + upto: LogId, + }, + // // --- Draft unimplemented commands: // - // TODO: - #[allow(dead_code)] - PurgeAppliedLog { - upto: LogId, - }, // TODO: #[allow(dead_code)] DeleteConflictLog { @@ -91,7 +91,7 @@ impl Command { Command::SaveVote { .. } => flags.set_data_changed(), Command::SendVote { .. } => {} Command::InstallElectionTimer { .. } => {} - Command::PurgeAppliedLog { .. } => flags.set_data_changed(), + Command::PurgeLog { .. } => flags.set_data_changed(), Command::DeleteConflictLog { .. } => flags.set_data_changed(), Command::BuildSnapshot { .. } => flags.set_data_changed(), } diff --git a/openraft/src/engine/engine.rs b/openraft/src/engine/engine.rs index b64259d28..10e5c6ff8 100644 --- a/openraft/src/engine/engine.rs +++ b/openraft/src/engine/engine.rs @@ -265,6 +265,27 @@ impl Engine { self.push_command(Command::MoveInputCursorBy { n: l }); } + /// Purge log entries upto `upto`, inclusive. + #[tracing::instrument(level = "debug", skip(self))] + pub(crate) fn purge_log(&mut self, upto: LogId) { + let st = &mut self.state; + let log_id = Some(upto); + + if log_id <= st.last_purged_log_id { + return; + } + + st.log_ids.purge(&upto); + + st.last_purged_log_id = log_id; + + if st.last_log_id < log_id { + st.last_log_id = log_id; + } + + self.push_command(Command::PurgeLog { upto }); + } + // --- Draft API --- // // --- app API --- @@ -282,7 +303,10 @@ impl Engine { // // pub(crate) fn handle_append_entries_resp() {} // pub(crate) fn handle_install_snapshot_resp() {} +} +/// Supporting util +impl Engine { /// Update effective membership config if encountering a membership config log entry. fn try_update_membership>(&mut self, entry: &Ent) { if let Some(m) = entry.get_membership() { diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index a6491da7b..d42a55047 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -14,6 +14,8 @@ mod log_id_list; #[cfg(test)] mod log_id_list_test; #[cfg(test)] +mod purge_log_test; +#[cfg(test)] mod testing; pub(crate) use command::Command; diff --git a/openraft/src/engine/purge_log_test.rs b/openraft/src/engine/purge_log_test.rs new file mode 100644 index 000000000..381ff9eb2 --- /dev/null +++ b/openraft/src/engine/purge_log_test.rs @@ -0,0 +1,139 @@ +use crate::engine::Command; +use crate::engine::Engine; +use crate::engine::LogIdList; +use crate::LeaderId; +use crate::LogId; + +fn log_id(term: u64, index: u64) -> LogId { + LogId:: { + leader_id: LeaderId { term, node_id: 1 }, + index, + } +} + +fn eng() -> Engine { + let mut eng = Engine::::default(); + eng.state.log_ids = LogIdList::new(vec![log_id(2, 2), log_id(4, 4), log_id(4, 6)]); + eng.state.last_purged_log_id = Some(log_id(2, 2)); + eng.state.last_log_id = Some(log_id(4, 6)); + eng +} + +#[test] +fn test_purge_log_already_purged() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(1, 1)); + + assert_eq!(Some(log_id(2, 2)), eng.state.last_purged_log_id,); + assert_eq!(log_id(2, 2), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(0, eng.commands.len()); + + Ok(()) +} + +#[test] +fn test_purge_log_equal_prev_last_purged() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(2, 2)); + + assert_eq!(Some(log_id(2, 2)), eng.state.last_purged_log_id,); + assert_eq!(log_id(2, 2), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(0, eng.commands.len()); + + Ok(()) +} +#[test] +fn test_purge_log_same_leader_as_prev_last_purged() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(2, 3)); + + assert_eq!(Some(log_id(2, 3)), eng.state.last_purged_log_id,); + assert_eq!(log_id(2, 3), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(2, 3) }], eng.commands); + + Ok(()) +} + +#[test] +fn test_purge_log_to_last_key_log() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(4, 4)); + + assert_eq!(Some(log_id(4, 4)), eng.state.last_purged_log_id,); + assert_eq!(log_id(4, 4), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 4) }], eng.commands); + + Ok(()) +} + +#[test] +fn test_purge_log_go_pass_last_key_log() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(4, 5)); + + assert_eq!(Some(log_id(4, 5)), eng.state.last_purged_log_id,); + assert_eq!(log_id(4, 5), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 5) }], eng.commands); + + Ok(()) +} + +#[test] +fn test_purge_log_to_last_log_id() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(4, 6)); + + assert_eq!(Some(log_id(4, 6)), eng.state.last_purged_log_id,); + assert_eq!(log_id(4, 6), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 6) }], eng.commands); + + Ok(()) +} + +#[test] +fn test_purge_log_go_pass_last_log_id() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(4, 7)); + + assert_eq!(Some(log_id(4, 7)), eng.state.last_purged_log_id,); + assert_eq!(log_id(4, 7), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(4, 7)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 7) }], eng.commands); + + Ok(()) +} + +#[test] +fn test_purge_log_to_higher_leader_lgo() -> anyhow::Result<()> { + let mut eng = eng(); + + eng.purge_log(log_id(5, 7)); + + assert_eq!(Some(log_id(5, 7)), eng.state.last_purged_log_id,); + assert_eq!(log_id(5, 7), eng.state.log_ids.key_log_ids()[0],); + assert_eq!(Some(log_id(5, 7)), eng.state.last_log_id,); + + assert_eq!(vec![Command::PurgeLog { upto: log_id(5, 7) }], eng.commands); + + Ok(()) +}