Skip to content

Commit

Permalink
change: InstallSnapshotRequest: merge last_included{term,index} into …
Browse files Browse the repository at this point in the history
…last_included
  • Loading branch information
drmingdrmer committed Jul 8, 2021
1 parent cf4badd commit 954c67a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
16 changes: 8 additions & 8 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
mut snapshot: Box<S::Snapshot>,
) -> RaftResult<()> {
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;
let delete_through = if self.last_log_index > req.last_included_index {
Some(req.last_included_index)
let delete_through = if self.last_log_index > req.last_included.index {
Some(req.last_included.index)
} else {
None
};
self.storage
.finalize_snapshot_installation(
req.last_included_index,
req.last_included_term,
req.last_included.index,
req.last_included.term,
delete_through,
id,
snapshot,
Expand All @@ -162,10 +162,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.map_err(|err| self.map_fatal_storage_error(err))?;
let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
self.last_log_index = req.last_included_index;
self.last_log_term = req.last_included_term;
self.last_applied = req.last_included_index;
self.snapshot_index = req.last_included_index;
self.last_log_index = req.last_included.index;
self.last_log_term = req.last_included.term;
self.last_applied = req.last_included.index;
self.snapshot_index = req.last_included.index;
Ok(())
}
}
7 changes: 3 additions & 4 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
Expand Down Expand Up @@ -575,10 +576,8 @@ pub struct InstallSnapshotRequest {
pub term: u64,
/// The leader's ID. Useful in redirecting clients.
pub leader_id: u64,
/// The snapshot replaces all log entries up through and including this index.
pub last_included_index: u64,
/// The term of the `last_included_index`.
pub last_included_term: u64,
/// The snapshot replaces all log entries up through and including this log.
pub last_included: LogId,
/// The byte offset where this chunk of data is positioned in the snapshot file.
pub offset: u64,
/// The raw bytes of the snapshot chunk, starting at `offset`.
Expand Down
6 changes: 6 additions & 0 deletions async-raft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ pub struct LogId {
pub index: u64,
}

impl From<(u64, u64)> for LogId {
fn from(v: (u64, u64)) -> Self {
LogId { term: v.0, index: v.1 }
}
}

// An update action with option to update with some value or just ignore this update.
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
pub enum Update<T> {
Expand Down
3 changes: 1 addition & 2 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let req = InstallSnapshotRequest {
term: self.core.term,
leader_id: self.core.id,
last_included_index: snapshot.index,
last_included_term: snapshot.term,
last_included: (snapshot.term, snapshot.index).into(),
offset,
data: Vec::from(&buf[..nread]),
done,
Expand Down

0 comments on commit 954c67a

Please sign in to comment.