From e123842862ac91860352afc48a48f19f011e7ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Fri, 30 Dec 2022 14:03:40 +0800 Subject: [PATCH] Change: RaftState: add field snapshot_meta Snapshot meta should be part of the `RaftState`. Move it from `Engine` to `RaftState` --- openraft/src/core/raft_core.rs | 10 +---- openraft/src/engine/calc_purge_upto_test.rs | 2 +- openraft/src/engine/engine_impl.rs | 16 +++----- openraft/src/engine/install_snapshot_test.rs | 39 +++++++++----------- openraft/src/engine/update_snapshot_test.rs | 20 +++++----- openraft/src/raft_state.rs | 4 ++ openraft/src/storage/helper.rs | 3 ++ 7 files changed, 44 insertions(+), 50 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 7c405d351..9130b8a53 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -207,12 +207,6 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore anyhow::Result<()> { if let Some(last_purged) = last_purged { eng.state.log_ids.purge(&last_purged); } - eng.snapshot_meta.last_log_id = snapshot_last_log_id; + eng.state.snapshot_meta.last_log_id = snapshot_last_log_id; let got = eng.calc_purge_upto(); assert_eq!( diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index eef51af25..ab4941d1a 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -71,9 +71,6 @@ where pub(crate) config: EngineConfig, - /// The metadata of the last snapshot. - pub(crate) snapshot_meta: SnapshotMeta, - /// The state of this raft node. pub(crate) state: RaftState, @@ -93,7 +90,6 @@ where Self { id, config, - snapshot_meta: Default::default(), state: init_state.clone(), metrics_flags: MetricsChangeFlags::default(), commands: vec![], @@ -575,10 +571,10 @@ where let max_keep = self.config.max_in_snapshot_log_to_keep; let batch_size = self.config.purge_batch_size; - let purge_end = self.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep); + let purge_end = self.state.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep); tracing::debug!( - snapshot_last_log_id = debug(self.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), max_keep, "try purge: (-oo, {})", purge_end @@ -586,7 +582,7 @@ where if st.last_purged_log_id().next_index() + batch_size > purge_end { tracing::debug!( - snapshot_last_log_id = debug(self.snapshot_meta.last_log_id), + snapshot_last_log_id = debug(self.state.snapshot_meta.last_log_id), max_keep, last_purged_log_id = display(st.last_purged_log_id().summary()), batch_size, @@ -893,16 +889,16 @@ where pub(crate) fn update_snapshot(&mut self, meta: SnapshotMeta) -> bool { tracing::info!("update_snapshot: {:?}", meta); - if meta.last_log_id <= self.snapshot_meta.last_log_id { + if meta.last_log_id <= self.state.snapshot_meta.last_log_id { tracing::info!( "No need to install a smaller snapshot: current snapshot last_log_id({}), new snapshot last_log_id({})", - self.snapshot_meta.last_log_id.summary(), + self.state.snapshot_meta.last_log_id.summary(), meta.last_log_id.summary() ); return false; } - self.snapshot_meta = meta; + self.state.snapshot_meta = meta; self.metrics_flags.set_data_changed(); true diff --git a/openraft/src/engine/install_snapshot_test.rs b/openraft/src/engine/install_snapshot_test.rs index 3b8f851e6..f5de7d203 100644 --- a/openraft/src/engine/install_snapshot_test.rs +++ b/openraft/src/engine/install_snapshot_test.rs @@ -29,14 +29,7 @@ fn m1234() -> Membership { } fn eng() -> Engine { - let mut eng = Engine:: { - snapshot_meta: SnapshotMeta { - last_log_id: Some(log_id(2, 2)), - last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), - snapshot_id: "1-2-3-4".to_string(), - }, - ..Default::default() - }; + let mut eng = Engine:: { ..Default::default() }; eng.state.committed = Some(log_id(4, 5)); eng.state.log_ids = LogIdList::new(vec![ @@ -46,6 +39,11 @@ fn eng() -> Engine { log_id(4, 6), log_id(4, 8), ]); + eng.state.snapshot_meta = SnapshotMeta { + last_log_id: Some(log_id(2, 2)), + last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }; eng.state.server_state = eng.calc_server_state(); eng @@ -68,7 +66,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!( @@ -113,7 +111,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!( @@ -156,7 +154,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!(&[log_id(4, 6), log_id(4, 8)], eng.state.log_ids.key_log_ids()); assert_eq!(Some(log_id(4, 6)), eng.state.committed); @@ -200,14 +198,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { // Snapshot will be installed, all non-committed log will be deleted. // And there should be no conflicting logs left. let mut eng = { - let mut eng = Engine:: { - snapshot_meta: SnapshotMeta { - last_log_id: Some(log_id(2, 2)), - last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), - snapshot_id: "1-2-3-4".to_string(), - }, - ..Default::default() - }; + let mut eng = Engine:: { ..Default::default() }; eng.state.committed = Some(log_id(2, 3)); eng.state.log_ids = LogIdList::new(vec![ @@ -218,6 +209,12 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { log_id(4, 8), ]); + eng.state.snapshot_meta = SnapshotMeta { + last_log_id: Some(log_id(2, 2)), + last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }; + eng.state.server_state = eng.calc_server_state(); eng @@ -235,7 +232,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!(&[log_id(5, 6)], eng.state.log_ids.key_log_ids()); assert_eq!(Some(log_id(5, 6)), eng.state.committed); @@ -292,7 +289,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m1234()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!(&[log_id(100, 100)], eng.state.log_ids.key_log_ids()); assert_eq!(Some(log_id(100, 100)), eng.state.committed); diff --git a/openraft/src/engine/update_snapshot_test.rs b/openraft/src/engine/update_snapshot_test.rs index 5d0a07ea4..dab13b36b 100644 --- a/openraft/src/engine/update_snapshot_test.rs +++ b/openraft/src/engine/update_snapshot_test.rs @@ -25,14 +25,14 @@ fn m1234() -> Membership { } fn eng() -> Engine { - Engine:: { - snapshot_meta: SnapshotMeta { - last_log_id: Some(log_id(2, 2)), - last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), - snapshot_id: "1-2-3-4".to_string(), - }, - ..Default::default() - } + let mut eng = Engine:: { ..Default::default() }; + + eng.state.snapshot_meta = SnapshotMeta { + last_log_id: Some(log_id(2, 2)), + last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }; + eng } #[test] @@ -54,7 +54,7 @@ fn test_update_snapshot_no_update() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(1, 1)), m12()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!( @@ -90,7 +90,7 @@ fn test_update_snapshot_updated() -> anyhow::Result<()> { last_membership: EffectiveMembership::new(Some(log_id(2, 2)), m1234()), snapshot_id: "1-2-3-4".to_string(), }, - eng.snapshot_meta + eng.state.snapshot_meta ); assert_eq!( diff --git a/openraft/src/raft_state.rs b/openraft/src/raft_state.rs index 64e57f97b..14fda773c 100644 --- a/openraft/src/raft_state.rs +++ b/openraft/src/raft_state.rs @@ -8,6 +8,7 @@ use crate::LogIdOptionExt; use crate::MembershipState; use crate::NodeId; use crate::ServerState; +use crate::SnapshotMeta; use crate::Vote; /// A struct used to represent the raft state which a Raft node needs. @@ -33,6 +34,9 @@ where /// The latest cluster membership configuration found, in log or in state machine. pub membership_state: MembershipState, + /// The metadata of the last snapshot. + pub snapshot_meta: SnapshotMeta, + // -- // -- volatile fields: they are not persisted. // -- diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 8cda6adf3..4a848a26c 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -56,6 +56,8 @@ where let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self).await?; + let snapshot_meta = self.sto.get_current_snapshot().await?.map(|x| x.meta).unwrap_or_default(); + Ok(RaftState { committed: last_applied, // The initial value for `vote` is the minimal possible value. @@ -63,6 +65,7 @@ where vote: vote.unwrap_or_default(), log_ids, membership_state: mem_state, + snapshot_meta, // -- volatile fields: they are not persisted. internal_server_state: InternalServerState::default(),