Skip to content

Commit

Permalink
Feature: Engine: add method: purge_log()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed May 8, 2022
1 parent 2262c79 commit ff898cd
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 7 deletions.
2 changes: 1 addition & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::InstallElectionTimer { .. } => {
self.update_election_timeout();
}
Command::PurgeAppliedLog { .. } => {}
Command::PurgeLog { .. } => {}
Command::DeleteConflictLog { .. } => {}
Command::BuildSnapshot { .. } => {}
Command::SendVote { vote_req } => {
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ pub(crate) enum Command<NID: NodeId> {
// An already installed timer should be cleared.
InstallElectionTimer {},

#[allow(dead_code)]
PurgeLog {
upto: LogId<NID>,
},

//
// --- Draft unimplemented commands:
//

// TODO:
#[allow(dead_code)]
PurgeAppliedLog {
upto: LogId<NID>,
},
// TODO:
#[allow(dead_code)]
DeleteConflictLog {
Expand All @@ -91,7 +91,7 @@ impl<NID: NodeId> Command<NID> {
Command::SaveVote { .. } => flags.set_data_changed(),
Command::SendVote { .. } => {}
Command::InstallElectionTimer { .. } => {}
Command::PurgeAppliedLog { .. } => flags.set_data_changed(),
Command::PurgeLog { .. } => flags.set_data_changed(),
Command::DeleteConflictLog { .. } => flags.set_data_changed(),
Command::BuildSnapshot { .. } => flags.set_data_changed(),
}
Expand Down
24 changes: 24 additions & 0 deletions openraft/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,27 @@ impl<NID: NodeId> Engine<NID> {
self.push_command(Command::MoveInputCursorBy { n: l });
}

/// Purge log entries upto `upto`, inclusive.
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn purge_log(&mut self, upto: LogId<NID>) {
let st = &mut self.state;
let log_id = Some(upto);

if log_id <= st.last_purged_log_id {
return;
}

st.log_ids.purge(&upto);

st.last_purged_log_id = log_id;

if st.last_log_id < log_id {
st.last_log_id = log_id;
}

self.push_command(Command::PurgeLog { upto });
}

// --- Draft API ---

// // --- app API ---
Expand All @@ -282,7 +303,10 @@ impl<NID: NodeId> Engine<NID> {
//
// pub(crate) fn handle_append_entries_resp() {}
// pub(crate) fn handle_install_snapshot_resp() {}
}

/// Supporting util
impl<NID: NodeId> Engine<NID> {
/// Update effective membership config if encountering a membership config log entry.
fn try_update_membership<Ent: RaftEntry<NID>>(&mut self, entry: &Ent) {
if let Some(m) = entry.get_membership() {
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ mod log_id_list;
#[cfg(test)]
mod log_id_list_test;
#[cfg(test)]
mod purge_log_test;
#[cfg(test)]
mod testing;

pub(crate) use command::Command;
Expand Down
139 changes: 139 additions & 0 deletions openraft/src/engine/purge_log_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::LeaderId;
use crate::LogId;

fn log_id(term: u64, index: u64) -> LogId<u64> {
LogId::<u64> {
leader_id: LeaderId { term, node_id: 1 },
index,
}
}

fn eng() -> Engine<u64> {
let mut eng = Engine::<u64>::default();
eng.state.log_ids = LogIdList::new(vec![log_id(2, 2), log_id(4, 4), log_id(4, 6)]);
eng.state.last_purged_log_id = Some(log_id(2, 2));
eng.state.last_log_id = Some(log_id(4, 6));
eng
}

#[test]
fn test_purge_log_already_purged() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(1, 1));

assert_eq!(Some(log_id(2, 2)), eng.state.last_purged_log_id,);
assert_eq!(log_id(2, 2), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(0, eng.commands.len());

Ok(())
}

#[test]
fn test_purge_log_equal_prev_last_purged() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(2, 2));

assert_eq!(Some(log_id(2, 2)), eng.state.last_purged_log_id,);
assert_eq!(log_id(2, 2), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(0, eng.commands.len());

Ok(())
}
#[test]
fn test_purge_log_same_leader_as_prev_last_purged() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(2, 3));

assert_eq!(Some(log_id(2, 3)), eng.state.last_purged_log_id,);
assert_eq!(log_id(2, 3), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(2, 3) }], eng.commands);

Ok(())
}

#[test]
fn test_purge_log_to_last_key_log() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(4, 4));

assert_eq!(Some(log_id(4, 4)), eng.state.last_purged_log_id,);
assert_eq!(log_id(4, 4), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 4) }], eng.commands);

Ok(())
}

#[test]
fn test_purge_log_go_pass_last_key_log() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(4, 5));

assert_eq!(Some(log_id(4, 5)), eng.state.last_purged_log_id,);
assert_eq!(log_id(4, 5), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 5) }], eng.commands);

Ok(())
}

#[test]
fn test_purge_log_to_last_log_id() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(4, 6));

assert_eq!(Some(log_id(4, 6)), eng.state.last_purged_log_id,);
assert_eq!(log_id(4, 6), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 6)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 6) }], eng.commands);

Ok(())
}

#[test]
fn test_purge_log_go_pass_last_log_id() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(4, 7));

assert_eq!(Some(log_id(4, 7)), eng.state.last_purged_log_id,);
assert_eq!(log_id(4, 7), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(4, 7)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(4, 7) }], eng.commands);

Ok(())
}

#[test]
fn test_purge_log_to_higher_leader_lgo() -> anyhow::Result<()> {
let mut eng = eng();

eng.purge_log(log_id(5, 7));

assert_eq!(Some(log_id(5, 7)), eng.state.last_purged_log_id,);
assert_eq!(log_id(5, 7), eng.state.log_ids.key_log_ids()[0],);
assert_eq!(Some(log_id(5, 7)), eng.state.last_log_id,);

assert_eq!(vec![Command::PurgeLog { upto: log_id(5, 7) }], eng.commands);

Ok(())
}

0 comments on commit ff898cd

Please sign in to comment.