Skip to content

Commit

Permalink
Feature: add Raft::install_complete_snapshot() to install a snapshot
Browse files Browse the repository at this point in the history
Using this method, the application provides a full snapshot to
Openraft, which is then used to install and replace the state machine.
It is entirely the responsibility of the application to acquire a
snapshot through any means: be it in chunks, as a stream, or via shared
storage solutions like S3.

This method necessitates that the caller supplies a valid `Vote` to
confirm the legitimacy of the leader, mirroring the requirements of
other Raft protocol APIs such as `append_entries` and `vote`.

- Part of #606
  • Loading branch information
drmingdrmer committed Feb 11, 2024
1 parent fad4d87 commit c1aa1b5
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 4 deletions.
3 changes: 3 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,9 @@ where

self.handle_install_snapshot_request(rpc, tx);
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx } => {
self.engine.handle_install_complete_snapshot(vote, snapshot, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.handle_check_is_leader_request(tx).await;
Expand Down
11 changes: 11 additions & 0 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::type_config::alias::NodeOf;
use crate::ChangeMembers;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::Vote;

pub(crate) mod external_command;

Expand Down Expand Up @@ -67,6 +69,12 @@ where C: RaftTypeConfig
tx: InstallSnapshotTx<C::NodeId>,
},

InstallCompleteSnapshot {
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
},

ClientWriteRequest {
app_data: C::D,
tx: ClientWriteTx<C>,
Expand Down Expand Up @@ -114,6 +122,9 @@ where C: RaftTypeConfig
RaftMsg::InstallSnapshot { rpc, .. } => {
format!("InstallSnapshot: {}", rpc.summary())
}
RaftMsg::InstallCompleteSnapshot { vote, snapshot, .. } => {
format!("InstallCompleteSnapshot: vote: {}, snapshot: {}", vote, snapshot)
}
RaftMsg::ClientWriteRequest { .. } => "ClientWriteRequest".to_string(),
RaftMsg::CheckIsLeaderRequest { .. } => "CheckIsLeaderRequest".to_string(),
RaftMsg::Initialize { members, .. } => {
Expand Down
24 changes: 21 additions & 3 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn install_complete_snapshot(snapshot: Snapshot<C>) -> Self {
let payload = CommandPayload::InstallCompleteSnapshot { snapshot };
Command::new(payload)
}

pub(crate) fn cancel_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
let payload = CommandPayload::FinalizeSnapshot {
install: false,
Expand Down Expand Up @@ -103,14 +108,18 @@ where C: RaftTypeConfig
BuildSnapshot,

/// Get the latest built snapshot.
GetSnapshot { tx: ResultSender<Option<Snapshot<C>>> },
GetSnapshot {
tx: ResultSender<Option<Snapshot<C>>>,
},

/// Receive a chunk of snapshot.
///
/// If it is the final chunk, the snapshot stream will be closed and saved.
///
/// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot.
ReceiveSnapshotChunk { req: InstallSnapshotRequest<C> },
ReceiveSnapshotChunk {
req: InstallSnapshotRequest<C>,
},

/// After receiving all chunks, finalize the snapshot by installing it or discarding it,
/// if the snapshot is stale(the snapshot last log id is smaller than the local committed).
Expand All @@ -120,8 +129,14 @@ where C: RaftTypeConfig
snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
},

InstallCompleteSnapshot {
snapshot: Snapshot<C>,
},

/// Apply the log entries to the state machine.
Apply { entries: Vec<C::Entry> },
Apply {
entries: Vec<C::Entry>,
},
}

impl<C> Debug for CommandPayload<C>
Expand All @@ -137,6 +152,9 @@ where C: RaftTypeConfig
CommandPayload::FinalizeSnapshot { install, snapshot_meta } => {
write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta)
}
CommandPayload::InstallCompleteSnapshot { snapshot } => {
write!(f, "InstallCompleteSnapshot: meta: {:?}", snapshot.meta)
}
CommandPayload::Apply { entries } => write!(f, "Apply: {}", DisplaySlice::<_>(entries)),
}
}
Expand Down
11 changes: 11 additions & 0 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ where
let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(resp)));
let _ = self.resp_tx.send(Notify::sm(res));
}
CommandPayload::InstallCompleteSnapshot { snapshot } => {
tracing::info!("{}: install complete snapshot", func_name!());

let meta = snapshot.meta.clone();
self.state_machine.install_snapshot(&meta, snapshot.snapshot).await?;

tracing::info!("Done install complete snapshot, meta: {}", meta);

let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(Some(meta))));
let _ = self.resp_tx.send(Notify::sm(res));
}
CommandPayload::Apply { entries } => {
let resp = self.apply(entries).await?;
let res = CommandResult::new(cmd.seq, Ok(Response::Apply(resp)));
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ where
AppendEntries(ValueSender<Result<AppendEntriesResponse<NID>, Infallible>>),
ReceiveSnapshotChunk(ValueSender<Result<(), InstallSnapshotError>>),
InstallSnapshot(ValueSender<Result<InstallSnapshotResponse<NID>, InstallSnapshotError>>),
InstallCompleteSnapshot(ValueSender<Result<InstallSnapshotResponse<NID>, Infallible>>),
Initialize(ValueSender<Result<(), InitializeError<NID, N>>>),
}

Expand All @@ -251,6 +252,7 @@ where
Respond::AppendEntries(x) => x.send(),
Respond::ReceiveSnapshotChunk(x) => x.send(),
Respond::InstallSnapshot(x) => x.send(),
Respond::InstallCompleteSnapshot(x) => x.send(),
Respond::Initialize(x) => x.send(),
}
}
Expand Down
37 changes: 37 additions & 0 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::LogIdOptionExt;
use crate::Membership;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;
use crate::Vote;

Expand Down Expand Up @@ -479,6 +480,42 @@ where C: RaftTypeConfig
Some(())
}

/// Install a completely received snapshot on a follower.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_complete_snapshot(
&mut self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
tx: ResultSender<InstallSnapshotResponse<C::NodeId>>,
) {
tracing::info!(vote = display(vote), snapshot = display(&snapshot), "{}", func_name!());

let vote_res = self.vote_handler().accept_vote(&vote, tx, |state, _rejected| {
Ok(InstallSnapshotResponse {
vote: *state.vote_ref(),
})
});

let Some(tx) = vote_res else {
return;
};

let mut fh = self.following_handler();
fh.install_complete_snapshot(snapshot);
let res = Ok(InstallSnapshotResponse {
vote: *self.state.vote_ref(),
});

self.output.push_command(Command::Respond {
// When there is an error, there may still be queued IO, we need to run them before sending back
// response.
when: Some(Condition::StateMachineCommand {
command_seq: self.output.last_sm_seq(),
}),
resp: Respond::new(res, tx),
});
}

#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_snapshot(&mut self, req: InstallSnapshotRequest<C>) -> Result<(), InstallSnapshotError> {
tracing::info!(req = display(req.summary()), "{}", func_name!());
Expand Down
54 changes: 54 additions & 0 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::MessageSummary;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;
use crate::SnapshotSegmentId;
use crate::StoredMembership;
Expand Down Expand Up @@ -347,6 +348,59 @@ where C: RaftTypeConfig
self.log_handler().purge_log();
}

/// Follower/Learner handles install-full-snapshot.
///
/// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication)
/// for the reason the following workflow is needed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_complete_snapshot(&mut self, snapshot: Snapshot<C>) {
let meta = &snapshot.meta;
tracing::info!("install_complete_snapshot: meta:{:?}", meta);

let snap_last_log_id = meta.last_log_id;

if snap_last_log_id.as_ref() <= self.state.committed() {
tracing::info!(
"No need to install snapshot; snapshot last_log_id({}) <= committed({})",
snap_last_log_id.summary(),
self.state.committed().summary()
);
return;
}

// snapshot_last_log_id can not be None
let snap_last_log_id = snap_last_log_id.unwrap();

// 1. Truncate all logs if conflict.
// 2. Install snapshot.
// 3. Purge logs the snapshot covers.

let mut snap_handler = self.snapshot_handler();
let updated = snap_handler.update_snapshot(meta.clone());
if !updated {
return;
}

let local = self.state.get_log_id(snap_last_log_id.index);
if let Some(local) = local {
if local != snap_last_log_id {
// Conflict, delete all non-committed logs.
self.truncate_logs(self.state.committed().next_index());
}
}

self.state.update_accepted(Some(snap_last_log_id));
self.state.committed = Some(snap_last_log_id);
self.update_committed_membership(EffectiveMembership::new_from_stored_membership(
meta.last_membership.clone(),
));

self.output.push_command(Command::from(sm::Command::install_complete_snapshot(snapshot)));

self.state.purge_upto = Some(snap_last_log_id);
self.log_handler().purge_log();
}

/// Find the last 2 membership entries in a list of entries.
///
/// A follower/learner reverts the effective membership to the previous one,
Expand Down
18 changes: 18 additions & 0 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::RaftState;
pub use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::StorageHelper;
use crate::Vote;

/// Define types for a Raft type configuration.
///
Expand Down Expand Up @@ -361,6 +362,23 @@ where C: RaftTypeConfig
self.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
}

/// Install a completely received snapshot to the state machine.
///
/// This method is used to implement a totally application defined snapshot transmission.
/// The application receives a snapshot from the leader, in chunks or a stream, and
/// then rebuild a snapshot, then pass the snapshot to Raft to install.
#[tracing::instrument(level = "debug", skip_all)]
pub async fn install_complete_snapshot(
&self,
vote: Vote<C::NodeId>,
snapshot: Snapshot<C>,
) -> Result<InstallSnapshotResponse<C::NodeId>, RaftError<C::NodeId>> {
tracing::debug!("Raft::install_complete_snapshot()");

let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::InstallCompleteSnapshot { vote, snapshot, tx }, rx).await
}

/// Submit an InstallSnapshot RPC to this Raft node.
///
/// These RPCs are sent by the cluster leader in order to bring a new node or a slow node
Expand Down
10 changes: 9 additions & 1 deletion openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
}

/// The data associated with the current snapshot.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Snapshot<C>
where C: RaftTypeConfig
{
Expand All @@ -118,6 +118,14 @@ where C: RaftTypeConfig
}
}

impl<C> fmt::Display for Snapshot<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Snapshot{{meta: {}}}", self.meta)
}
}

/// The state about logs.
///
/// Invariance: last_purged_log_id <= last_applied <= last_log_id
Expand Down
1 change: 1 addition & 0 deletions tests/tests/client_api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod t10_client_writes;
mod t11_client_reads;
mod t12_trigger_purge_log;
mod t13_get_snapshot;
mod t13_install_complete_snapshot;
mod t13_trigger_snapshot;
mod t16_with_raft_state;
mod t50_lagging_network_write;
Expand Down
Loading

0 comments on commit c1aa1b5

Please sign in to comment.