diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 32b542fc5..a414c3836 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -948,11 +948,11 @@ impl, S: RaftStorage> RaftCore` and slice `&[T]`. +//! Implement [`fmt::Display`] for types such as `Option` and slice `&[T]`. use std::fmt; @@ -9,7 +9,7 @@ use std::fmt; pub(crate) struct DisplayOption<'a, T: fmt::Display>(pub &'a Option); impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.0 { None => { write!(f, "None") @@ -26,7 +26,7 @@ impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> { pub(crate) struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]); impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let slice = self.0; let len = slice.len(); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index a31fe88ef..678f7f35a 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -17,15 +17,18 @@ use crate::engine::time_state; use crate::engine::time_state::TimeState; use crate::engine::Command; use crate::engine::EngineOutput; +use crate::engine::SendResult; use crate::entry::RaftEntry; use crate::error::ForwardToLeader; use crate::error::InitializeError; use crate::error::NotAllowed; use crate::error::NotInMembers; +use crate::error::RejectAppendEntries; use crate::internal_server_state::InternalServerState; use crate::membership::EffectiveMembership; use crate::node::Node; use crate::raft::AppendEntriesResponse; +use crate::raft::AppendEntriesTx; use crate::raft::RaftRespTx; use crate::raft::VoteRequest; use crate::raft::VoteResponse; @@ -363,36 +366,65 @@ where /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn handle_append_entries_req( + pub(crate) fn handle_append_entries( &mut self, vote: &Vote, prev_log_id: Option>, entries: Vec, - leader_committed: Option>, - ) -> AppendEntriesResponse { + tx: Option>, + ) -> bool { tracing::debug!( vote = display(vote), prev_log_id = display(prev_log_id.summary()), entries = display(DisplaySlice::<_>(&entries)), - leader_committed = display(leader_committed.summary()), - "append-entries request" - ); - tracing::debug!( my_vote = display(self.state.vote_ref()), my_last_log_id = display(self.state.last_log_id().summary()), - my_committed = display(self.state.committed().summary()), - "local state" + "{}", + func_name!() ); - let res = self.vote_handler().handle_message_vote(vote); - if let Err(rejected) = res { - return rejected.into(); + let res = self.append_entries(vote, prev_log_id, entries); + let is_ok = res.is_ok(); + + if let Some(tx) = tx { + let resp: AppendEntriesResponse = res.into(); + self.output.push_command(Command::SendAppendEntriesResult { + send: SendResult::new(Ok(resp), tx), + }); } + is_ok + } + + pub(crate) fn append_entries( + &mut self, + vote: &Vote, + prev_log_id: Option>, + entries: Vec, + ) -> Result<(), RejectAppendEntries> { + self.vote_handler().handle_message_vote(vote)?; // Vote is legal. let mut fh = self.following_handler(); - fh.append_entries(prev_log_id, entries, leader_committed) + fh.ensure_log_consecutive(prev_log_id)?; + fh.append_entries(prev_log_id, entries); + + Ok(()) + } + + /// Commit entries for follower/learner. + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option>) { + tracing::debug!( + leader_committed = display(leader_committed.summary()), + my_accepted = display(self.state.accepted().summary()), + my_committed = display(self.state.committed().summary()), + "{}", + func_name!() + ); + + let mut fh = self.following_handler(); + fh.commit_entries(leader_committed); } /// Leader steps down(convert to learner) once the membership not containing it is committed. diff --git a/openraft/src/engine/handler/following_handler/append_entries_test.rs b/openraft/src/engine/handler/following_handler/append_entries_test.rs index 8d800a546..88d5a5fd6 100644 --- a/openraft/src/engine/handler/following_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/append_entries_test.rs @@ -41,15 +41,11 @@ fn eng() -> Engine::Entry> { fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> { let mut eng = eng(); - eng.following_handler().append_entries( - Some(log_id(2, 3)), - vec![ - // - blank_ent(3, 4), - blank_ent(3, 5), - ], - None, - ); + eng.following_handler().append_entries(Some(log_id(2, 3)), vec![ + // + blank_ent(3, 4), + blank_ent(3, 5), + ]); assert_eq!( &[ @@ -64,14 +60,10 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> { // Update again, accept should not decrease. - eng.following_handler().append_entries( - Some(log_id(2, 3)), - vec![ - // - blank_ent(3, 4), - ], - None, - ); + eng.following_handler().append_entries(Some(log_id(2, 3)), vec![ + // + blank_ent(3, 4), + ]); assert_eq!(Some(&log_id(3, 5)), eng.state.last_log_id()); assert_eq!(Some(&log_id(3, 5)), eng.state.accepted()); diff --git a/openraft/src/engine/handler/following_handler/commit_entries_test.rs b/openraft/src/engine/handler/following_handler/commit_entries_test.rs index 9ebc39588..dab6eb43e 100644 --- a/openraft/src/engine/handler/following_handler/commit_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/commit_entries_test.rs @@ -97,10 +97,11 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> { ); assert_eq!( vec![ + // Command::FollowerCommit { already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) - }, // + }, ], eng.output.take_commands() ); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 4229548ba..f9b5734d0 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -9,7 +9,7 @@ use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::entry::RaftEntry; -use crate::raft::AppendEntriesResponse; +use crate::error::RejectAppendEntries; use crate::raft_state::LogStateReader; use crate::EffectiveMembership; use crate::LogId; @@ -52,16 +52,10 @@ where /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn append_entries( - &mut self, - prev_log_id: Option>, - entries: Vec, - leader_committed: Option>, - ) -> AppendEntriesResponse { + pub(crate) fn append_entries(&mut self, prev_log_id: Option>, entries: Vec) { tracing::debug!( prev_log_id = display(prev_log_id.summary()), entries = display(DisplaySlice::<_>(&entries)), - leader_committed = display(leader_committed.summary()), "append-entries request" ); tracing::debug!( @@ -74,18 +68,6 @@ where debug_assert!(x.get_log_id().index == prev_log_id.next_index()); } - if let Some(ref prev) = prev_log_id { - if !self.state.has_log_id(prev) { - let local = self.state.get_log_id(prev.index); - tracing::debug!(local = display(local.summary()), "prev_log_id does not match"); - - self.truncate_logs(prev.index); - return AppendEntriesResponse::Conflict; - } - } - - // else `prev_log_id.is_none()` means replicating logs from the very beginning. - tracing::debug!( committed = display(self.state.committed().summary()), entries = display(DisplaySlice::<_>(&entries)), @@ -107,10 +89,28 @@ where } self.do_append_entries(entries, since); + } - self.commit_entries(leader_committed); + /// Ensures the log to replicate is consecutive to the local log. + /// + /// If not, truncate the local log and return an error. + pub(crate) fn ensure_log_consecutive( + &mut self, + prev_log_id: Option>, + ) -> Result<(), RejectAppendEntries> { + if let Some(ref prev) = prev_log_id { + if !self.state.has_log_id(prev) { + let local = self.state.get_log_id(prev.index); + tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match"); + + self.truncate_logs(prev.index); + return Err(RejectAppendEntries::ByConflictingLogId { local, expect: *prev }); + } + } + + // else `prev_log_id.is_none()` means replicating logs from the very beginning. - AppendEntriesResponse::Success + Ok(()) } /// Follower/Learner appends `entries[since..]`. @@ -143,8 +143,9 @@ where self.output.push_command(Command::AppendInputEntries { entries }); } + /// Commit entries that are already committed by the leader. #[tracing::instrument(level = "debug", skip_all)] - fn commit_entries(&mut self, leader_committed: Option>) { + pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { let accepted = self.state.accepted().copied(); let committed = std::cmp::min(accepted, leader_committed); @@ -232,7 +233,9 @@ where let m = Arc::new(membership); - // TODO: if effective membership changes, call `update_repliation()` + // TODO: if effective membership changes, call `update_replication()`, if a follower has replication + // streams. Now we don't have replication streams for follower, so it's ok to not call + // `update_replication()`. let effective_changed = self.state.membership_state.update_committed(m); if let Some(c) = effective_changed { self.output.push_command(Command::UpdateMembership { membership: c }) @@ -247,6 +250,7 @@ where // There are two special cases in which snapshot last log id does not exists locally: // Snapshot last log id before the local last-purged-log-id, or after the local last-log-id: // + // ``` // snapshot ----. // v // -----------------------llllllllll---> @@ -254,6 +258,7 @@ where // snapshot ----. // v // ----lllllllllll---------------------> + // ``` // // In the first case, snapshot-last-log-id <= last-purged-log-id <= // local-snapshot-last-log-id. Thus snapshot is obsolete and won't be installed. diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index 12ebef788..1896cf6f1 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -38,9 +38,9 @@ pub(crate) mod time_state; #[cfg(test)] mod tests { + mod append_entries_test; mod command_test; mod elect_test; - mod handle_append_entries_req_test; mod handle_vote_req_test; mod handle_vote_resp_test; mod initialize_test; diff --git a/openraft/src/engine/tests/handle_append_entries_req_test.rs b/openraft/src/engine/tests/append_entries_test.rs similarity index 76% rename from openraft/src/engine/tests/handle_append_entries_req_test.rs rename to openraft/src/engine/tests/append_entries_test.rs index 18100a2fb..1f6d22343 100644 --- a/openraft/src/engine/tests/handle_append_entries_req_test.rs +++ b/openraft/src/engine/tests/append_entries_test.rs @@ -11,7 +11,7 @@ use crate::engine::CEngine; use crate::engine::Command; use crate::engine::Engine; use crate::entry::RaftEntry; -use crate::raft::AppendEntriesResponse; +use crate::error::RejectAppendEntries; use crate::raft_state::LogStateReader; use crate::testing::log_id; use crate::utime::UTime; @@ -51,12 +51,12 @@ fn eng() -> CEngine { } #[test] -fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { +fn test_append_entries_vote_is_rejected() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req(&Vote::new(1, 1), None, Vec::>::new(), None); + let res = eng.append_entries(&Vote::new(1, 1), None, Vec::>::new()); - assert_eq!(AppendEntriesResponse::HigherVote(Vote::new(2, 1)), resp); + assert_eq!(Err(RejectAppendEntries::ByVote(Vote::new(2, 1))), res); assert_eq!( &[ log_id(1, 1), // @@ -66,7 +66,6 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { ); assert_eq!(Vote::new(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -81,20 +80,19 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { } #[test] -fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_is_applied() -> anyhow::Result<()> { // An applied log id has to be committed thus let mut eng = eng(); eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); - let resp = eng.handle_append_entries_req( + let res = eng.append_entries( &Vote::new_committed(2, 1), Some(log_id(0, 0)), Vec::>::new(), - None, ); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), res); assert_eq!( &[ log_id(1, 1), // @@ -104,7 +102,6 @@ fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -124,17 +121,22 @@ fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> } #[test] -fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req( + let res = eng.append_entries( &Vote::new_committed(2, 1), Some(log_id(2, 2)), Vec::>::new(), - None, ); - assert_eq!(AppendEntriesResponse::Conflict, resp); + assert_eq!( + Err(RejectAppendEntries::ByConflictingLogId { + expect: log_id(2, 2), + local: Some(log_id(1, 2)), + }), + res + ); assert_eq!( &[ log_id(1, 1), // @@ -143,7 +145,6 @@ fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(1, 1)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -169,17 +170,15 @@ fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { } #[test] -fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(0, 0)), - vec![blank_ent(1, 1), blank_ent(2, 2)], - Some(log_id(1, 1)), - ); + let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(0, 0)), vec![ + blank_ent(1, 1), + blank_ent(2, 2), + ]); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), res); assert_eq!( &[ log_id(1, 1), // @@ -189,7 +188,6 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 2)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(1, 1)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -210,10 +208,6 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( Command::AppendInputEntries { entries: vec![blank_ent(2, 2)] }, - Command::FollowerCommit { - already_committed: Some(log_id(0, 0)), - upto: log_id(1, 1) - }, ], eng.output.take_commands() ); @@ -222,19 +216,23 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( } #[test] -fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_not_exists() -> anyhow::Result<()> { let mut eng = eng(); eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(2, 4)), - vec![blank_ent(2, 5), blank_ent(2, 6)], - Some(log_id(1, 1)), - ); + let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(2, 4)), vec![ + blank_ent(2, 5), + blank_ent(2, 6), + ]); - assert_eq!(AppendEntriesResponse::Conflict, resp); + assert_eq!( + Err(RejectAppendEntries::ByConflictingLogId { + expect: log_id(2, 4), + local: None, + }), + res + ); assert_eq!( &[ log_id(1, 1), // @@ -244,7 +242,6 @@ fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -264,7 +261,7 @@ fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> } #[test] -fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { +fn test_append_entries_conflict() -> anyhow::Result<()> { // prev_log_id matches, // The second entry in entries conflict. // This request will replace the effective membership. @@ -272,14 +269,12 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { // It is no longer a member, change to learner let mut eng = eng(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(1, 1)), - vec![blank_ent(1, 2), Entry::new_membership(log_id(3, 3), m34())], - Some(log_id(4, 4)), - ); + let resp = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(1, 1)), vec![ + blank_ent(1, 2), + Entry::new_membership(log_id(3, 3), m34()), + ]); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), resp); assert_eq!( &[ log_id(1, 1), // @@ -289,10 +284,9 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(3, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(3, 3)), eng.state.committed()); assert_eq!( MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m34())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m34())), ), eng.state.membership_state @@ -313,10 +307,6 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { Command::AppendInputEntries { entries: vec![Entry::new_membership(log_id(3, 3), m34())] }, - Command::FollowerCommit { - already_committed: Some(log_id(0, 0)), - upto: log_id(3, 3) - }, ], eng.output.take_commands() ); diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 2f652565a..da787a429 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -467,3 +467,38 @@ impl From> for AppendEntriesResponse { } } } + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub(crate) enum RejectAppendEntries { + #[error("reject AppendEntries by a greater vote: {0}")] + ByVote(Vote), + + #[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")] + ByConflictingLogId { + expect: LogId, + local: Option>, + }, +} + +impl From> for RejectAppendEntries { + fn from(r: RejectVoteRequest) -> Self { + match r { + RejectVoteRequest::ByVote(v) => RejectAppendEntries::ByVote(v), + RejectVoteRequest::ByLastLogId(_) => { + unreachable!("the leader should always has a greater last log id") + } + } + } +} + +impl From>> for AppendEntriesResponse { + fn from(r: Result<(), RejectAppendEntries>) -> Self { + match r { + Ok(_) => AppendEntriesResponse::Success, + Err(e) => match e { + RejectAppendEntries::ByVote(v) => AppendEntriesResponse::HigherVote(v), + RejectAppendEntries::ByConflictingLogId { expect: _, local: _ } => AppendEntriesResponse::Conflict, + }, + } + } +}