From 44381b0c776cfbb7dfc7789de27346110776b7f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 14 Aug 2022 13:04:23 +0800 Subject: [PATCH] Fix: when handling append-entries, if prev_log_id is purged, it should not delete any logs. When handling append-entries, if the local log at `prev_log_id.index` is purged, a follower should not believe it is a **conflict** and should not delete all logs. It will get committed log lost. To fix this issue, use `last_applied` instead of `committed`: `last_applied` is always the committed log id, while `committed` is not persisted and may be smaller than the actually applied, when a follower is restarted. --- openraft/src/core/append_entries.rs | 9 +- openraft/src/defensive.rs | 4 + openraft/tests/append_entries/main.rs | 1 + .../t31_append_prev_is_purged.rs | 97 +++++++++++++++++++ openraft/tests/fixtures/mod.rs | 5 + 5 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 openraft/tests/append_entries/t31_append_prev_is_purged.rs diff --git a/openraft/src/core/append_entries.rs b/openraft/src/core/append_entries.rs index 9fafa28db..9f6b84ad5 100644 --- a/openraft/src/core/append_entries.rs +++ b/openraft/src/core/append_entries.rs @@ -108,7 +108,10 @@ impl, S: RaftStorage> Ra // - keep track of last_log_id, first_log_id, // RaftStorage should only provides the least basic APIs. - self.storage.delete_conflict_logs_since(start).await?; + let res = self.storage.delete_conflict_logs_since(start).await; + tracing::debug!("delete_conflict_logs_since res: {:?}", res); + + res?; self.last_log_id = self.storage.get_log_state().await?.last_log_id; @@ -278,7 +281,7 @@ impl, S: RaftStorage> Ra for i in 0..l { let log_id = entries[i].log_id; - if Some(log_id) <= self.committed { + if Some(log_id) <= self.last_applied { continue; } @@ -312,7 +315,7 @@ impl, S: RaftStorage> Ra }; // Committed entries are always safe and are consistent to a valid leader. - if remote_log_id <= self.committed { + if remote_log_id <= self.last_applied { return Ok(None); } diff --git a/openraft/src/defensive.rs b/openraft/src/defensive.rs index 8696eb72f..2ffd6b75e 100644 --- a/openraft/src/defensive.rs +++ b/openraft/src/defensive.rs @@ -191,6 +191,10 @@ where } async fn defensive_delete_conflict_gt_last_applied(&self, since: LogId) -> Result<(), StorageError> { + if !self.is_defensive() { + return Ok(()); + } + let (last_applied, _) = self.inner().last_applied_state().await?; if Some(since.index) <= last_applied.index() { return Err( diff --git a/openraft/tests/append_entries/main.rs b/openraft/tests/append_entries/main.rs index fc43713d1..1d602dab4 100644 --- a/openraft/tests/append_entries/main.rs +++ b/openraft/tests/append_entries/main.rs @@ -8,6 +8,7 @@ mod fixtures; mod t10_conflict_with_empty_entries; mod t20_append_conflicts; mod t30_append_inconsistent_log; +mod t31_append_prev_is_purged; mod t40_append_updates_membership; mod t50_append_entries_with_bigger_term; mod t60_large_heartbeat; diff --git a/openraft/tests/append_entries/t31_append_prev_is_purged.rs b/openraft/tests/append_entries/t31_append_prev_is_purged.rs new file mode 100644 index 000000000..28aa20b8a --- /dev/null +++ b/openraft/tests/append_entries/t31_append_prev_is_purged.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use anyhow::Result; +use maplit::btreeset; +use openraft::raft::Entry; +use openraft::raft::EntryPayload; +use openraft::AppendEntriesRequest; +use openraft::Config; +use openraft::DefensiveCheck; +use openraft::LogId; +use openraft::Membership; +use openraft::Raft; +use openraft::RaftStorage; + +use crate::fixtures::blank; +use crate::fixtures::RaftRouter; + +/// When handling append-entries, if the local log at `prev_log_id.index` is purged, a follower should not believe it is +/// a **conflict** and should not delete all logs. Which will get committed log lost. +/// +/// Fake a raft node with one log (1,3) and set last-applied to (1,2). +/// Then an append-entries with `prev_log_id=(1,2)` should not be considered as **conflict**. +#[tokio::test(flavor = "multi_thread", worker_threads = 6)] +async fn append_prev_is_purged() -> Result<()> { + let (_log_guard, ut_span) = init_ut!(); + let _ent = ut_span.enter(); + + let config = Arc::new( + Config { + max_applied_log_to_keep: 2, + ..Default::default() + } + .validate()?, + ); + let router = Arc::new(RaftRouter::new(config.clone())); + + tracing::info!("--- fake store: logs: (1,3), last_applied == last_purged == (1,2)"); + let sto0 = { + let sto0 = router.new_store().await; + + // With defensive==true, it will panic. + sto0.set_defensive(false); + + let entries = [ + &Entry { + log_id: LogId { term: 0, index: 0 }, + payload: EntryPayload::Membership(Membership::new_single(btreeset! {0,1})), + }, + &blank(1, 1), + &blank(1, 2), + &blank(1, 3), + ]; + + sto0.append_to_log(&entries).await?; + sto0.apply_to_state_machine(&entries[0..3]).await?; + sto0.purge_logs_upto(LogId::new(1, 2)).await?; + + let logs = sto0.try_get_log_entries(..).await?; + tracing::debug!("logs left after purge: {:?}", logs); + assert_eq!(LogId::new(1, 3), logs[0].log_id); + + sto0 + }; + + tracing::info!("--- new node with faked sto"); + let node0 = { + let config0 = Arc::new( + Config { + max_applied_log_to_keep: 1, + ..Default::default() + } + .validate()?, + ); + let node0 = Raft::new(0, config0.clone(), router.clone(), sto0.clone()); + router.add_raft_node(0, node0.clone(), sto0.clone()).await; + node0 + }; + + tracing::info!("--- append-entries with prev_log_id=(1,2), should not erase any logs"); + { + node0 + .append_entries(AppendEntriesRequest { + term: 1, + leader_id: 1, + prev_log_id: Some(LogId::new(1, 2)), + entries: vec![], + leader_commit: None, + }) + .await?; + + let logs = sto0.try_get_log_entries(..).await?; + tracing::debug!("logs left after append: {:?}", logs); + assert_eq!(LogId::new(1, 3), logs[0].log_id); + } + + Ok(()) +} diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index 3ba0454e0..7f0c06466 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -270,6 +270,11 @@ impl RaftRouter { rt.insert(id, (node, sto)); } + pub async fn add_raft_node(self: &Arc, id: NodeId, node: MemRaft, sto: Arc) { + let mut rt = self.routing_table.write().await; + rt.insert(id, (node, sto)); + } + /// Remove the target node from the routing table & isolation. pub async fn remove_node(&self, id: NodeId) -> Option<(MemRaft, Arc)> { let mut rt = self.routing_table.write().await;