From 26dc8837f4f5afb9be30a32b3eae90235546ca3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 12 Apr 2023 00:10:31 +0800 Subject: [PATCH] Fix: ProgressEntry::is_log_range_inflight() checks a log range, not a log entry This bug causes replication tries to send pruged log. --- .../engine/handler/replication_handler/mod.rs | 2 +- openraft/src/progress/entry.rs | 24 +++++++++---------- .../snapshot/t40_purge_in_snapshot_logs.rs | 4 ++++ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index a1915332c..171f9f027 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -358,7 +358,7 @@ where // Check if any replication task is going to use the log that are going to purge. let mut in_use = false; for (id, prog_entry) in self.leader.progress.iter() { - if prog_entry.is_inflight(&purge_upto) { + if prog_entry.is_log_range_inflight(&purge_upto) { tracing::debug!("log {} is in use by {}", purge_upto, id); in_use = true; } diff --git a/openraft/src/progress/entry.rs b/openraft/src/progress/entry.rs index d23a973bd..f1762569b 100644 --- a/openraft/src/progress/entry.rs +++ b/openraft/src/progress/entry.rs @@ -73,15 +73,15 @@ impl ProgressEntry { self } - /// Return if a log id is inflight sending. + /// Return if a range of log id `..=log_id` is inflight sending. /// /// `prev_log_id` is never inflight. - pub(crate) fn is_inflight(&self, log_id: &LogId) -> bool { + pub(crate) fn is_log_range_inflight(&self, upto: &LogId) -> bool { match &self.inflight { Inflight::None => false, Inflight::Logs { log_id_range, .. } => { - let lid = Some(*log_id); - lid > log_id_range.prev_log_id && lid <= log_id_range.last_log_id + let lid = Some(*upto); + lid > log_id_range.prev_log_id } Inflight::Snapshot { last_log_id: _, .. } => false, } @@ -257,19 +257,19 @@ mod tests { Inflight::logs(Some(log_id(prev_index)), Some(log_id(last_index))) } #[test] - fn test_is_inflight() -> anyhow::Result<()> { + fn test_is_log_range_inflight() -> anyhow::Result<()> { let mut pe = ProgressEntry::empty(20); - assert_eq!(false, pe.is_inflight(&log_id(2))); + assert_eq!(false, pe.is_log_range_inflight(&log_id(2))); pe.inflight = inflight_logs(2, 4); - assert_eq!(false, pe.is_inflight(&log_id(1))); - assert_eq!(false, pe.is_inflight(&log_id(2))); - assert_eq!(true, pe.is_inflight(&log_id(3))); - assert_eq!(true, pe.is_inflight(&log_id(4))); - assert_eq!(false, pe.is_inflight(&log_id(5))); + assert_eq!(false, pe.is_log_range_inflight(&log_id(1))); + assert_eq!(false, pe.is_log_range_inflight(&log_id(2))); + assert_eq!(true, pe.is_log_range_inflight(&log_id(3))); + assert_eq!(true, pe.is_log_range_inflight(&log_id(4))); + assert_eq!(true, pe.is_log_range_inflight(&log_id(5))); pe.inflight = Inflight::snapshot(Some(log_id(5))); - assert_eq!(false, pe.is_inflight(&log_id(5))); + assert_eq!(false, pe.is_log_range_inflight(&log_id(5))); Ok(()) } diff --git a/tests/tests/snapshot/t40_purge_in_snapshot_logs.rs b/tests/tests/snapshot/t40_purge_in_snapshot_logs.rs index 087c800a3..d218848fe 100644 --- a/tests/tests/snapshot/t40_purge_in_snapshot_logs.rs +++ b/tests/tests/snapshot/t40_purge_in_snapshot_logs.rs @@ -49,6 +49,10 @@ async fn purge_in_snapshot_logs() -> Result<()> { ) .await?; let mut sto0 = router.get_storage_handle(&0)?; + + // Wait for purge to complete. + sleep(Duration::from_millis(500)).await; + let logs = sto0.try_get_log_entries(..).await?; assert_eq!(max_keep as usize, logs.len()); }