Skip to content

Commit

Permalink
Fix: ProgressEntry::is_log_range_inflight() checks a log range, not a…
Browse files Browse the repository at this point in the history
… log entry

This bug causes replication tries to send pruged log.
  • Loading branch information
drmingdrmer committed Apr 11, 2023
1 parent 88f947a commit 26dc883
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ where
// Check if any replication task is going to use the log that are going to purge.
let mut in_use = false;
for (id, prog_entry) in self.leader.progress.iter() {
if prog_entry.is_inflight(&purge_upto) {
if prog_entry.is_log_range_inflight(&purge_upto) {
tracing::debug!("log {} is in use by {}", purge_upto, id);
in_use = true;
}
Expand Down
24 changes: 12 additions & 12 deletions openraft/src/progress/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ impl<NID: NodeId> ProgressEntry<NID> {
self
}

/// Return if a log id is inflight sending.
/// Return if a range of log id `..=log_id` is inflight sending.
///
/// `prev_log_id` is never inflight.
pub(crate) fn is_inflight(&self, log_id: &LogId<NID>) -> bool {
pub(crate) fn is_log_range_inflight(&self, upto: &LogId<NID>) -> bool {
match &self.inflight {
Inflight::None => false,
Inflight::Logs { log_id_range, .. } => {
let lid = Some(*log_id);
lid > log_id_range.prev_log_id && lid <= log_id_range.last_log_id
let lid = Some(*upto);
lid > log_id_range.prev_log_id
}
Inflight::Snapshot { last_log_id: _, .. } => false,
}
Expand Down Expand Up @@ -257,19 +257,19 @@ mod tests {
Inflight::logs(Some(log_id(prev_index)), Some(log_id(last_index)))
}
#[test]
fn test_is_inflight() -> anyhow::Result<()> {
fn test_is_log_range_inflight() -> anyhow::Result<()> {
let mut pe = ProgressEntry::empty(20);
assert_eq!(false, pe.is_inflight(&log_id(2)));
assert_eq!(false, pe.is_log_range_inflight(&log_id(2)));

pe.inflight = inflight_logs(2, 4);
assert_eq!(false, pe.is_inflight(&log_id(1)));
assert_eq!(false, pe.is_inflight(&log_id(2)));
assert_eq!(true, pe.is_inflight(&log_id(3)));
assert_eq!(true, pe.is_inflight(&log_id(4)));
assert_eq!(false, pe.is_inflight(&log_id(5)));
assert_eq!(false, pe.is_log_range_inflight(&log_id(1)));
assert_eq!(false, pe.is_log_range_inflight(&log_id(2)));
assert_eq!(true, pe.is_log_range_inflight(&log_id(3)));
assert_eq!(true, pe.is_log_range_inflight(&log_id(4)));
assert_eq!(true, pe.is_log_range_inflight(&log_id(5)));

pe.inflight = Inflight::snapshot(Some(log_id(5)));
assert_eq!(false, pe.is_inflight(&log_id(5)));
assert_eq!(false, pe.is_log_range_inflight(&log_id(5)));

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions tests/tests/snapshot/t40_purge_in_snapshot_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ async fn purge_in_snapshot_logs() -> Result<()> {
)
.await?;
let mut sto0 = router.get_storage_handle(&0)?;

// Wait for purge to complete.
sleep(Duration::from_millis(500)).await;

let logs = sto0.try_get_log_entries(..).await?;
assert_eq!(max_keep as usize, logs.len());
}
Expand Down

0 comments on commit 26dc883

Please sign in to comment.