Skip to content

Commit

Permalink
Improve: send AppendEntries response before committing entries
Browse files Browse the repository at this point in the history
When a follower receives an append-entries request that includes a
series of log entries to append and the log id that the leader
has committed, it responds with an append-entries response after
committing and applying the entries.

However, this is not strictly necessary. The follower could simply send
the response as soon as the log entries have been appended and flushed
to disk, without waiting for them to be committed.
  • Loading branch information
drmingdrmer committed Apr 4, 2023
1 parent e20794f commit dbac91d
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 114 deletions.
8 changes: 4 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,11 +948,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
) {
tracing::debug!(req = display(req.summary()), func = func_name!());

let resp = self.engine.handle_append_entries_req(&req.vote, req.prev_log_id, req.entries, req.leader_commit);
let is_ok = self.engine.handle_append_entries(&req.vote, req.prev_log_id, req.entries, Some(tx));

self.engine.output.push_command(Command::SendAppendEntriesResult {
send: SendResult::new(Ok(resp), tx),
});
if is_ok {
self.engine.handle_commit_entries(req.leader_commit);
}
}

// TODO: Make this method non-async. It does not need to run any async command in it.
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Implement [`std::fmt::Display`] for types such as `Option<T>` and slice `&[T]`.
//! Implement [`fmt::Display`] for types such as `Option<T>` and slice `&[T]`.

use std::fmt;

Expand All @@ -9,7 +9,7 @@ use std::fmt;
pub(crate) struct DisplayOption<'a, T: fmt::Display>(pub &'a Option<T>);

impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
None => {
write!(f, "None")
Expand All @@ -26,7 +26,7 @@ impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> {
pub(crate) struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]);

impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let slice = self.0;
let len = slice.len();

Expand Down
58 changes: 45 additions & 13 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ use crate::engine::time_state;
use crate::engine::time_state::TimeState;
use crate::engine::Command;
use crate::engine::EngineOutput;
use crate::engine::SendResult;
use crate::entry::RaftEntry;
use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::error::NotAllowed;
use crate::error::NotInMembers;
use crate::error::RejectAppendEntries;
use crate::internal_server_state::InternalServerState;
use crate::membership::EffectiveMembership;
use crate::node::Node;
use crate::raft::AppendEntriesResponse;
use crate::raft::AppendEntriesTx;
use crate::raft::RaftRespTx;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
Expand Down Expand Up @@ -363,36 +366,65 @@ where
///
/// Also clean conflicting entries and update membership state.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries_req(
pub(crate) fn handle_append_entries(
&mut self,
vote: &Vote<NID>,
prev_log_id: Option<LogId<NID>>,
entries: Vec<Ent>,
leader_committed: Option<LogId<NID>>,
) -> AppendEntriesResponse<NID> {
tx: Option<AppendEntriesTx<NID>>,
) -> bool {
tracing::debug!(
vote = display(vote),
prev_log_id = display(prev_log_id.summary()),
entries = display(DisplaySlice::<_>(&entries)),
leader_committed = display(leader_committed.summary()),
"append-entries request"
);
tracing::debug!(
my_vote = display(self.state.vote_ref()),
my_last_log_id = display(self.state.last_log_id().summary()),
my_committed = display(self.state.committed().summary()),
"local state"
"{}",
func_name!()
);

let res = self.vote_handler().handle_message_vote(vote);
if let Err(rejected) = res {
return rejected.into();
let res = self.append_entries(vote, prev_log_id, entries);
let is_ok = res.is_ok();

if let Some(tx) = tx {
let resp: AppendEntriesResponse<NID> = res.into();
self.output.push_command(Command::SendAppendEntriesResult {
send: SendResult::new(Ok(resp), tx),
});
}
is_ok
}

pub(crate) fn append_entries(
&mut self,
vote: &Vote<NID>,
prev_log_id: Option<LogId<NID>>,
entries: Vec<Ent>,
) -> Result<(), RejectAppendEntries<NID>> {
self.vote_handler().handle_message_vote(vote)?;

// Vote is legal.

let mut fh = self.following_handler();
fh.append_entries(prev_log_id, entries, leader_committed)
fh.ensure_log_consecutive(prev_log_id)?;
fh.append_entries(prev_log_id, entries);

Ok(())
}

/// Commit entries for follower/learner.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option<LogId<NID>>) {
tracing::debug!(
leader_committed = display(leader_committed.summary()),
my_accepted = display(self.state.accepted().summary()),
my_committed = display(self.state.committed().summary()),
"{}",
func_name!()
);

let mut fh = self.following_handler();
fh.commit_entries(leader_committed);
}

/// Leader steps down(convert to learner) once the membership not containing it is committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ fn eng() -> Engine<u64, (), <UTCfg as RaftTypeConfig>::Entry> {
fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {
let mut eng = eng();

eng.following_handler().append_entries(
Some(log_id(2, 3)),
vec![
//
blank_ent(3, 4),
blank_ent(3, 5),
],
None,
);
eng.following_handler().append_entries(Some(log_id(2, 3)), vec![
//
blank_ent(3, 4),
blank_ent(3, 5),
]);

assert_eq!(
&[
Expand All @@ -64,14 +60,10 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> {

// Update again, accept should not decrease.

eng.following_handler().append_entries(
Some(log_id(2, 3)),
vec![
//
blank_ent(3, 4),
],
None,
);
eng.following_handler().append_entries(Some(log_id(2, 3)), vec![
//
blank_ent(3, 4),
]);

assert_eq!(Some(&log_id(3, 5)), eng.state.last_log_id());
assert_eq!(Some(&log_id(3, 5)), eng.state.accepted());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> {
);
assert_eq!(
vec![
//
Command::FollowerCommit {
already_committed: Some(log_id(1, 1)),
upto: log_id(2, 3)
}, //
},
],
eng.output.take_commands()
);
Expand Down
53 changes: 29 additions & 24 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntry;
use crate::raft::AppendEntriesResponse;
use crate::error::RejectAppendEntries;
use crate::raft_state::LogStateReader;
use crate::EffectiveMembership;
use crate::LogId;
Expand Down Expand Up @@ -52,16 +52,10 @@ where
///
/// Also clean conflicting entries and update membership state.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn append_entries(
&mut self,
prev_log_id: Option<LogId<NID>>,
entries: Vec<Ent>,
leader_committed: Option<LogId<NID>>,
) -> AppendEntriesResponse<NID> {
pub(crate) fn append_entries(&mut self, prev_log_id: Option<LogId<NID>>, entries: Vec<Ent>) {
tracing::debug!(
prev_log_id = display(prev_log_id.summary()),
entries = display(DisplaySlice::<_>(&entries)),
leader_committed = display(leader_committed.summary()),
"append-entries request"
);
tracing::debug!(
Expand All @@ -74,18 +68,6 @@ where
debug_assert!(x.get_log_id().index == prev_log_id.next_index());
}

if let Some(ref prev) = prev_log_id {
if !self.state.has_log_id(prev) {
let local = self.state.get_log_id(prev.index);
tracing::debug!(local = display(local.summary()), "prev_log_id does not match");

self.truncate_logs(prev.index);
return AppendEntriesResponse::Conflict;
}
}

// else `prev_log_id.is_none()` means replicating logs from the very beginning.

tracing::debug!(
committed = display(self.state.committed().summary()),
entries = display(DisplaySlice::<_>(&entries)),
Expand All @@ -107,10 +89,28 @@ where
}

self.do_append_entries(entries, since);
}

self.commit_entries(leader_committed);
/// Ensures the log to replicate is consecutive to the local log.
///
/// If not, truncate the local log and return an error.
pub(crate) fn ensure_log_consecutive(
&mut self,
prev_log_id: Option<LogId<NID>>,
) -> Result<(), RejectAppendEntries<NID>> {
if let Some(ref prev) = prev_log_id {
if !self.state.has_log_id(prev) {
let local = self.state.get_log_id(prev.index);
tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match");

self.truncate_logs(prev.index);
return Err(RejectAppendEntries::ByConflictingLogId { local, expect: *prev });
}
}

// else `prev_log_id.is_none()` means replicating logs from the very beginning.

AppendEntriesResponse::Success
Ok(())
}

/// Follower/Learner appends `entries[since..]`.
Expand Down Expand Up @@ -143,8 +143,9 @@ where
self.output.push_command(Command::AppendInputEntries { entries });
}

/// Commit entries that are already committed by the leader.
#[tracing::instrument(level = "debug", skip_all)]
fn commit_entries(&mut self, leader_committed: Option<LogId<NID>>) {
pub(crate) fn commit_entries(&mut self, leader_committed: Option<LogId<NID>>) {
let accepted = self.state.accepted().copied();
let committed = std::cmp::min(accepted, leader_committed);

Expand Down Expand Up @@ -232,7 +233,9 @@ where

let m = Arc::new(membership);

// TODO: if effective membership changes, call `update_repliation()`
// TODO: if effective membership changes, call `update_replication()`, if a follower has replication
// streams. Now we don't have replication streams for follower, so it's ok to not call
// `update_replication()`.
let effective_changed = self.state.membership_state.update_committed(m);
if let Some(c) = effective_changed {
self.output.push_command(Command::UpdateMembership { membership: c })
Expand All @@ -247,13 +250,15 @@ where
// There are two special cases in which snapshot last log id does not exists locally:
// Snapshot last log id before the local last-purged-log-id, or after the local last-log-id:
//
// ```
// snapshot ----.
// v
// -----------------------llllllllll--->
//
// snapshot ----.
// v
// ----lllllllllll--------------------->
// ```
//
// In the first case, snapshot-last-log-id <= last-purged-log-id <=
// local-snapshot-last-log-id. Thus snapshot is obsolete and won't be installed.
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ pub(crate) mod time_state;

#[cfg(test)]
mod tests {
mod append_entries_test;
mod command_test;
mod elect_test;
mod handle_append_entries_req_test;
mod handle_vote_req_test;
mod handle_vote_resp_test;
mod initialize_test;
Expand Down
Loading

0 comments on commit dbac91d

Please sign in to comment.