Skip to content

Commit

Permalink
refactor: ConflictOpt: merge term and index into log_id: LogId
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 58d8e3a commit d0819f1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 22 deletions.
16 changes: 6 additions & 10 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
conflict_opt: Some(ConflictOpt {
term: self.last_log.term,
index: self.last_log.index,
}),
conflict_opt: Some(ConflictOpt { log_id: self.last_log }),
});
}
};
Expand Down Expand Up @@ -136,13 +133,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.map_err(|err| self.map_fatal_storage_error(err))?;
let opt = match old_entries.iter().find(|entry| entry.term == msg.prev_log.term) {
Some(entry) => Some(ConflictOpt {
term: entry.term,
index: entry.index,
}),
None => Some(ConflictOpt {
term: self.last_log.term,
index: self.last_log.index,
log_id: LogId {
term: entry.term,
index: entry.index,
},
}),
None => Some(ConflictOpt { log_id: self.last_log }),
};
if report_metrics {
self.report_metrics(Update::Ignore);
Expand Down
6 changes: 2 additions & 4 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,8 @@ pub struct AppendEntriesResponse {
/// which may be new to the cluster.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ConflictOpt {
/// The term of the most recent entry which does not conflict with the received request.
pub term: u64,
/// The index of the most recent entry which does not conflict with the received request.
pub index: u64,
/// The most recent entry which does not conflict with the received request.
pub log_id: LogId,
}

/// A Raft log entry.
Expand Down
12 changes: 6 additions & 6 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

// If the returned conflict opt index is greater than last_log_index, then this is a
// logical error, and no action should be taken. This represents a replication failure.
if conflict.index > self.last_log_index {
if conflict.log_id.index > self.last_log_index {
return;
}
self.next_index = conflict.index + 1;
self.matched = (conflict.term, conflict.index).into();
self.next_index = conflict.log_id.index + 1;
self.matched = conflict.log_id;

// If conflict index is 0, we will not be able to fetch that index from storage because
// it will never exist. So instead, we just return, and accept the conflict data.
if conflict.index == 0 {
if conflict.log_id.index == 0 {
self.target_state = TargetReplState::Lagging;
let _ = self.raft_tx.send(ReplicaEvent::UpdateMatchIndex {
target: self.target,
Expand All @@ -336,7 +336,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// Fetch the entry at conflict index and use the term specified there.
match self
.storage
.get_log_entries(conflict.index, conflict.index + 1)
.get_log_entries(conflict.log_id.index, conflict.log_id.index + 1)
.await
.map(|entries| entries.get(0).map(|entry| entry.term))
{
Expand Down Expand Up @@ -368,7 +368,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
});
match &self.config.snapshot_policy {
SnapshotPolicy::LogsSinceLast(threshold) => {
let diff = self.last_log_index - conflict.index; // NOTE WELL: underflow is guarded against above.
let diff = self.last_log_index - conflict.log_id.index; // NOTE WELL: underflow is guarded against above.
if &diff >= threshold {
// Follower is far behind and needs to receive an InstallSnapshot RPC.
self.target_state = TargetReplState::Snapshotting;
Expand Down
14 changes: 12 additions & 2 deletions async-raft/tests/conflict_with_empty_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ async fn conflict_with_empty_entries() -> Result<()> {
assert!(!resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(ConflictOpt { term: 0, index: 0 }, c);
assert_eq!(
ConflictOpt {
log_id: LogId { term: 0, index: 0 }
},
c
);

// Feed 2 logs

Expand Down Expand Up @@ -106,7 +111,12 @@ async fn conflict_with_empty_entries() -> Result<()> {
assert!(!resp.success);
assert!(resp.conflict_opt.is_some());
let c = resp.conflict_opt.unwrap();
assert_eq!(ConflictOpt { term: 1, index: 2 }, c);
assert_eq!(
ConflictOpt {
log_id: LogId { term: 1, index: 2 }
},
c
);

Ok(())
}

0 comments on commit d0819f1

Please sign in to comment.