Skip to content

Commit

Permalink
Change: RaftState: add field snapshot_meta
Browse files Browse the repository at this point in the history
Snapshot meta should be part of the `RaftState`.
Move it from `Engine` to `RaftState`
  • Loading branch information
drmingdrmer committed Dec 30, 2022
1 parent e8effe9 commit e123842
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 50 deletions.
10 changes: 2 additions & 8 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
purge_batch_size: self.config.purge_batch_size,
});

// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.engine.snapshot_meta = snapshot.meta;
self.engine.metrics_flags.set_data_changed();
}

self.engine.startup();
// No output commands

Expand Down Expand Up @@ -639,7 +633,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
current_term: self.engine.state.vote.term,
last_log_index: self.engine.state.last_log_id().map(|id| id.index),
last_applied: self.engine.state.committed,
snapshot: self.engine.snapshot_meta.last_log_id,
snapshot: self.engine.state.snapshot_meta.last_log_id,

// --- cluster ---
state: self.engine.state.server_state,
Expand Down Expand Up @@ -752,7 +746,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

if !force {
// If we are below the threshold, then there is nothing to do.
if self.engine.state.committed.next_index() - self.engine.snapshot_meta.last_log_id.next_index()
if self.engine.state.committed.next_index() - self.engine.state.snapshot_meta.last_log_id.next_index()
< *threshold
{
return;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/calc_purge_upto_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {
if let Some(last_purged) = last_purged {
eng.state.log_ids.purge(&last_purged);
}
eng.snapshot_meta.last_log_id = snapshot_last_log_id;
eng.state.snapshot_meta.last_log_id = snapshot_last_log_id;
let got = eng.calc_purge_upto();

assert_eq!(
Expand Down
16 changes: 6 additions & 10 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ where

pub(crate) config: EngineConfig,

/// The metadata of the last snapshot.
pub(crate) snapshot_meta: SnapshotMeta<NID, N>,

/// The state of this raft node.
pub(crate) state: RaftState<NID, N>,

Expand All @@ -93,7 +90,6 @@ where
Self {
id,
config,
snapshot_meta: Default::default(),
state: init_state.clone(),
metrics_flags: MetricsChangeFlags::default(),
commands: vec![],
Expand Down Expand Up @@ -575,18 +571,18 @@ where
let max_keep = self.config.max_in_snapshot_log_to_keep;
let batch_size = self.config.purge_batch_size;

let purge_end = self.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep);
let purge_end = self.state.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep);

tracing::debug!(
snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id),
max_keep,
"try purge: (-oo, {})",
purge_end
);

if st.last_purged_log_id().next_index() + batch_size > purge_end {
tracing::debug!(
snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id),
max_keep,
last_purged_log_id = display(st.last_purged_log_id().summary()),
batch_size,
Expand Down Expand Up @@ -893,16 +889,16 @@ where
pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta<NID, N>) -> bool {
tracing::info!("update_snapshot: {:?}", meta);

if meta.last_log_id <= self.snapshot_meta.last_log_id {
if meta.last_log_id <= self.state.snapshot_meta.last_log_id {
tracing::info!(
"No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})",
self.snapshot_meta.last_log_id.summary(),
self.state.snapshot_meta.last_log_id.summary(),
meta.last_log_id.summary()
);
return false;
}

self.snapshot_meta = meta;
self.state.snapshot_meta = meta;
self.metrics_flags.set_data_changed();

true
Expand Down
39 changes: 18 additions & 21 deletions openraft/src/engine/install_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ fn m1234() -> Membership<u64, ()> {
}

fn eng() -> Engine<u64, ()> {
let mut eng = Engine::<u64, ()> {
snapshot_meta: SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
..Default::default()
};
let mut eng = Engine::<u64, ()> { ..Default::default() };

eng.state.committed = Some(log_id(4, 5));
eng.state.log_ids = LogIdList::new(vec![
Expand All @@ -46,6 +39,11 @@ fn eng() -> Engine<u64, ()> {
log_id(4, 6),
log_id(4, 8),
]);
eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
};
eng.state.server_state = eng.calc_server_state();

eng
Expand All @@ -68,7 +66,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);

assert_eq!(
Expand Down Expand Up @@ -113,7 +111,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);

assert_eq!(
Expand Down Expand Up @@ -156,7 +154,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);
assert_eq!(&[log_id(4, 6), log_id(4, 8)], eng.state.log_ids.key_log_ids());
assert_eq!(Some(log_id(4, 6)), eng.state.committed);
Expand Down Expand Up @@ -200,14 +198,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
// Snapshot will be installed, all non-committed log will be deleted.
// And there should be no conflicting logs left.
let mut eng = {
let mut eng = Engine::<u64, ()> {
snapshot_meta: SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
..Default::default()
};
let mut eng = Engine::<u64, ()> { ..Default::default() };

eng.state.committed = Some(log_id(2, 3));
eng.state.log_ids = LogIdList::new(vec![
Expand All @@ -218,6 +209,12 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
log_id(4, 8),
]);

eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
};

eng.state.server_state = eng.calc_server_state();

eng
Expand All @@ -235,7 +232,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);
assert_eq!(&[log_id(5, 6)], eng.state.log_ids.key_log_ids());
assert_eq!(Some(log_id(5, 6)), eng.state.committed);
Expand Down Expand Up @@ -292,7 +289,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);
assert_eq!(&[log_id(100, 100)], eng.state.log_ids.key_log_ids());
assert_eq!(Some(log_id(100, 100)), eng.state.committed);
Expand Down
20 changes: 10 additions & 10 deletions openraft/src/engine/update_snapshot_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ fn m1234() -> Membership<u64, ()> {
}

fn eng() -> Engine<u64, ()> {
Engine::<u64, ()> {
snapshot_meta: SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
..Default::default()
}
let mut eng = Engine::<u64, ()> { ..Default::default() };

eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 2)),
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
};
eng
}

#[test]
Expand All @@ -54,7 +54,7 @@ fn test_update_snapshot_no_update() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);

assert_eq!(
Expand Down Expand Up @@ -90,7 +90,7 @@ fn test_update_snapshot_updated() -> anyhow::Result<()> {
last_membership: EffectiveMembership::new(Some(log_id(2, 2)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.snapshot_meta
eng.state.snapshot_meta
);

assert_eq!(
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/raft_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::LogIdOptionExt;
use crate::MembershipState;
use crate::NodeId;
use crate::ServerState;
use crate::SnapshotMeta;
use crate::Vote;

/// A struct used to represent the raft state which a Raft node needs.
Expand All @@ -33,6 +34,9 @@ where
/// The latest cluster membership configuration found, in log or in state machine.
pub membership_state: MembershipState<NID, N>,

/// The metadata of the last snapshot.
pub snapshot_meta: SnapshotMeta<NID, N>,

// --
// -- volatile fields: they are not persisted.
// --
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ where

let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self).await?;

let snapshot_meta = self.sto.get_current_snapshot().await?.map(|x| x.meta).unwrap_or_default();

Ok(RaftState {
committed: last_applied,
// The initial value for `vote` is the minimal possible value.
// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
vote: vote.unwrap_or_default(),
log_ids,
membership_state: mem_state,
snapshot_meta,

// -- volatile fields: they are not persisted.
internal_server_state: InternalServerState::default(),
Expand Down

0 comments on commit e123842

Please sign in to comment.