Skip to content

Commit

Permalink
change: RaftStorage::finalize_snapshot_installation is no more resp…
Browse files Browse the repository at this point in the history
…onsible to delete logs included in snapshot

A RaftStorage should be as simple and intuitive as possible.

One should be able to correctly impl a RaftStorage without reading the
guide but just by guessing what a trait method should do.

RaftCore is able to do the job of deleting logs that are included in
the state machine, RaftStorage should just do what is asked.
  • Loading branch information
drmingdrmer committed Sep 13, 2021
1 parent 74b1652 commit 46bb3b1
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 19 deletions.
5 changes: 5 additions & 0 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// --------------------------------------------------------------------> time
// ```

// TODO(xp): do not install if self.last_applied >= snapshot.meta.last_applied

let changes = self
.storage
.finalize_snapshot_installation(&req.meta, snapshot)
Expand All @@ -207,6 +209,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// If you have any question about this, let me know: drdr.xp at gmail.com

if let Some(last_applied) = changes.last_applied {
// Applied logs are not needed.
self.storage.delete_logs_from(..=last_applied.index).await.map_err(|e| self.map_storage_error(e))?;

// snapshot is installed
self.last_applied = last_applied;

Expand Down
1 change: 1 addition & 0 deletions async-raft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum Update<T> {
/// E.g. when applying a log to state machine, or installing a state machine from snapshot.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateMachineChanges {
// TODO(xp): it does not need to be an Option
pub last_applied: Option<LogId>,
pub is_snapshot: bool,
}
11 changes: 1 addition & 10 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,10 @@ where

/// Finalize the installation of a snapshot which has finished streaming from the cluster leader.
///
/// Delete all entries in the log through `meta.last_log_id.index`.
///
/// Write a new snapshot pointer to the log at the given `meta.last_log_id.index`. The snapshot pointer should be
/// constructed via the `Entry::new_snapshot_pointer` constructor and the other parameters
/// provided to this method.
///
/// All other snapshots should be deleted at this point.
///
/// ### snapshot
/// A snapshot created from an earlier call to `created_snapshot` which provided the snapshot.
/// By the time ownership of the snapshot object is returned here, its
/// `AsyncWriteExt.shutdown()` method will have been called, so no additional writes should be
/// made to the snapshot.
/// A snapshot created from an earlier call to `begin_receiving_snapshot` which provided the snapshot.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn finalize_snapshot_installation(
Expand Down
9 changes: 0 additions & 9 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,15 +802,6 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
tracing::debug!("JSON SNAP DATA:{}", y);
}

// Update log.
{
let mut log = self.log.write().await;

// Remove logs that are included in the snapshot.
// Leave at least one log or the replication can not find out the mismatched log.
*log = log.split_off(&meta.last_log_id.index);
}

// Update the state machine.
{
let new_sm: MemStoreStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| {
Expand Down
2 changes: 2 additions & 0 deletions memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ where
run_fut(Suite::apply_single(builder))?;
run_fut(Suite::apply_multi(builder))?;

// TODO(xp): test: finalized_snapshot, do_log_compaction, begin_receiving_snapshot, get_current_snapshot

Ok(())
}

Expand Down

0 comments on commit 46bb3b1

Please sign in to comment.