Skip to content

Commit

Permalink
Fix: When a Leader starts up, it should re-apply all logs
Browse files Browse the repository at this point in the history
When a node starts up as the Leader, it now re-applies all logs at once.

Previously:
- New Leader only updated IO progress
- Committed log ID remained unchanged

Now:
- New Leader updates IO progress
- Triggers update of committed log ID

- Fix: #1246
  • Loading branch information
drmingdrmer committed Sep 13, 2024
1 parent c28ad9b commit 681d04d
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 9 deletions.
3 changes: 3 additions & 0 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,9 @@ where C: RaftTypeConfig

self.state.accept_io(IOId::new_log_io(vote.into_committed(), last_log_id));

// No need to submit UpdateIOProgress command,
// IO progress is updated by the new blank log

self.leader_handler()
.unwrap()
.leader_append_entries(vec![C::Entry::new_blank(LogId::<C::NodeId>::default())]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::engine::ReplicationProgress;
use crate::entry::RaftEntry;
use crate::log_id_range::LogIdRange;
use crate::progress::entry::ProgressEntry;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::testing::log_id;
use crate::type_config::alias::EntryOf;
Expand Down Expand Up @@ -57,6 +58,10 @@ fn test_become_leader() -> anyhow::Result<()> {
assert_eq!(ServerState::Leader, eng.state.server_state);

assert_eq!(eng.output.take_commands(), vec![
Command::UpdateIOProgress {
when: None,
io_id: IOId::new_log_io(Vote::new(2, 1).into_committed(), None)
},
Command::RebuildReplicationStreams {
targets: vec![ReplicationProgress(0, ProgressEntry::empty(0))]
},
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ where C: RaftTypeConfig

self.state.accept_io(IOId::new_log_io(leader_vote, last_log_id));

self.output.push_command(Command::UpdateIOProgress {
when: None,
io_id: IOId::new_log_io(leader_vote, last_log_id),
});

self.server_state_handler().update_server_state_if_changed();

let mut rh = self.replication_handler();
Expand Down
12 changes: 10 additions & 2 deletions openraft/src/engine/tests/startup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::entry::RaftEntry;
use crate::log_id_range::LogIdRange;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::testing::log_id;
use crate::type_config::TypeConfigExt;
Expand Down Expand Up @@ -44,6 +45,7 @@ fn eng() -> Engine<UTConfig> {
eng
}

/// It is a Leader but not yet append any logs.
#[test]
fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
let mut eng = eng();
Expand All @@ -67,7 +69,10 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> {
assert_eq!(leader.last_log_id(), Some(&log_id(2, 2, 4)));
assert_eq!(
vec![
//
Command::UpdateIOProgress {
when: None,
io_id: IOId::new_log_io(Vote::new(2, 2).into_committed(), Some(log_id(1, 1, 3)))
},
Command::RebuildReplicationStreams {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
Expand Down Expand Up @@ -115,7 +120,10 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> {
assert_eq!(leader.last_log_id(), Some(&log_id(1, 2, 6)));
assert_eq!(
vec![
//
Command::UpdateIOProgress {
when: None,
io_id: IOId::new_log_io(Vote::new(1, 2).into_committed(), Some(log_id(1, 2, 6)))
},
Command::RebuildReplicationStreams {
targets: vec![ReplicationProgress(3, ProgressEntry {
matching: None,
Expand Down
7 changes: 1 addition & 6 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ where

let last_log_id = last_leader_log_id.last().copied();

let mut leader = Self {
let leader = Self {
transfer_to: None,
committed_vote: vote,
next_heartbeat: C::now(),
Expand All @@ -130,11 +130,6 @@ where
clock_progress: VecProgress::new(quorum_set, learner_ids, || None),
};

// Update progress for this Leader.
// Note that Leader not being a voter is allowed.
let leader_node_id = vote.leader_id().voted_for().unwrap();
let _ = leader.progress.update(&leader_node_id, ProgressEntry::new(last_log_id));

leader
}

Expand Down
20 changes: 19 additions & 1 deletion stores/memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::io::Cursor;
use std::ops::RangeBounds;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;

Expand Down Expand Up @@ -139,6 +141,11 @@ impl BlockConfig {
pub struct MemLogStore {
last_purged_log_id: RwLock<Option<LogId<MemNodeId>>>,

/// Saving committed log id is optional in Openraft.
///
/// This flag switches on the saving for testing purposes.
pub enable_saving_committed: AtomicBool,

committed: RwLock<Option<LogId<MemNodeId>>>,

/// The Raft log. Logs are stored in serialized json.
Expand All @@ -157,6 +164,7 @@ impl MemLogStore {

Self {
last_purged_log_id: RwLock::new(None),
enable_saving_committed: AtomicBool::new(true),
committed: RwLock::new(None),
log,
block,
Expand Down Expand Up @@ -350,13 +358,23 @@ impl RaftLogStorage<TypeConfig> for Arc<MemLogStore> {
}

async fn save_committed(&mut self, committed: Option<LogId<MemNodeId>>) -> Result<(), StorageError<TypeConfig>> {
tracing::debug!(?committed, "save_committed");
let enabled = self.enable_saving_committed.load(Ordering::Relaxed);
tracing::debug!(?committed, "save_committed, enabled: {}", enabled);
if !enabled {
return Ok(());
}
let mut c = self.committed.write().await;
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<MemNodeId>>, StorageError<TypeConfig>> {
let enabled = self.enable_saving_committed.load(Ordering::Relaxed);
tracing::debug!("read_committed, enabled: {}", enabled);
if !enabled {
return Ok(None);
}

Ok(*self.committed.read().await)
}

Expand Down
5 changes: 5 additions & 0 deletions tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ pub struct TypedRaftRouter {
#[allow(clippy::type_complexity)]
nodes: Arc<Mutex<BTreeMap<MemNodeId, (MemRaft, MemLogStore, MemStateMachine)>>>,

/// Whether to save the committed entries to the RaftLogStorage.
pub enable_saving_committed: bool,

/// Whether to fail a network RPC that is sent from/to a node.
/// And it defines what kind of error to return.
fail_rpc: Arc<Mutex<HashMap<(MemNodeId, Direction), RPCErrorType>>>,
Expand Down Expand Up @@ -315,6 +318,7 @@ impl Builder {
TypedRaftRouter {
config: self.config,
nodes: Default::default(),
enable_saving_committed: true,
fail_rpc: Default::default(),
send_delay: Arc::new(AtomicU64::new(send_delay)),
append_entries_quota: Arc::new(Mutex::new(None)),
Expand Down Expand Up @@ -474,6 +478,7 @@ impl TypedRaftRouter {

pub fn new_store(&mut self) -> (MemLogStore, MemStateMachine) {
let (log, sm) = openraft_memstore::new_mem_store();
log.enable_saving_committed.store(self.enable_saving_committed, Ordering::Relaxed);
(log, sm)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use crate::fixtures::RaftRouter;

/// A single leader should re-apply all logs upon startup,
/// because itself is a quorum.
///
/// This test disables save_committed() to ensure that logs are still re-applied because the leader
/// itself forms a quorum.
#[tracing::instrument]
#[test_harness::test(harness = ut_harness)]
async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {
Expand All @@ -25,6 +28,7 @@ async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {
);

let mut router = RaftRouter::new(config.clone());
router.enable_saving_committed = false;

tracing::info!("--- bring up cluster of 1 node");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;
Expand Down

0 comments on commit 681d04d

Please sign in to comment.