diff --git a/src/progress.rs b/src/progress.rs index c1b30ca70..b12beabaf 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -25,7 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use eraftpb::ConfState; +use eraftpb::{ConfState, SnapshotMetadata}; use errors::{Error, Result}; use hashbrown::hash_map::DefaultHashBuilder; use hashbrown::{HashMap, HashSet}; @@ -197,6 +197,44 @@ impl ProgressSet { } } + pub(crate) fn restore_snapmeta( + meta: &SnapshotMetadata, + next_idx: u64, + max_inflight: usize, + ) -> Self { + let mut prs = ProgressSet::new(); + let pr = Progress::new(next_idx, max_inflight); + meta.get_conf_state().get_nodes().iter().for_each(|id| { + prs.progress.insert(*id, pr.clone()); + prs.configuration.voters.insert(*id); + }); + meta.get_conf_state().get_learners().iter().for_each(|id| { + prs.progress.insert(*id, pr.clone()); + prs.configuration.learners.insert(*id); + }); + + if meta.pending_membership_change_index != 0 { + let mut next_configuration = Configuration::with_capacity(0, 0); + meta.get_pending_membership_change() + .get_nodes() + .iter() + .for_each(|id| { + prs.progress.insert(*id, pr.clone()); + next_configuration.voters.insert(*id); + }); + meta.get_pending_membership_change() + .get_learners() + .iter() + .for_each(|id| { + prs.progress.insert(*id, pr.clone()); + next_configuration.learners.insert(*id); + }); + prs.next_configuration = Some(next_configuration); + } + prs.assert_progress_and_configuration_consistent(); + prs + } + /// Returns the status of voters. /// /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be @@ -415,7 +453,15 @@ impl ProgressSet { .progress .keys() .all(|v| self.configuration.learners.contains(v) - || self.configuration.voters.contains(v))); + || self.configuration.voters.contains(v) + || self + .next_configuration + .as_ref() + .map_or(false, |c| c.learners.contains(v)) + || self + .next_configuration + .as_ref() + .map_or(false, |c| c.voters.contains(v)))); assert_eq!( self.voter_ids().len() + self.learner_ids().len(), self.progress.len() diff --git a/src/raft.rs b/src/raft.rs index 5d2b7930b..ce091e46e 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1988,16 +1988,6 @@ impl Raft { fn restore_raft(&mut self, snap: &Snapshot) -> Option { let meta = snap.get_metadata(); - - if snap.get_metadata().has_pending_membership_change() { - let meta = snap.get_metadata(); - let change = meta.get_pending_membership_change().clone(); - let index = meta.get_pending_membership_change_index(); - let change = ConfChange::from((index, change)); - // We already started this change, so it must be safe. We can't bail here. - self.begin_membership_change(&change).unwrap(); - } - if self.raft_log.match_term(meta.get_index(), meta.get_term()) { info!( "{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \ @@ -2039,68 +2029,21 @@ impl Raft { meta.get_term() ); - let nodes = meta.get_conf_state().get_nodes(); - let learners = meta.get_conf_state().get_learners(); - self.prs = Some(ProgressSet::with_capacity(nodes.len(), learners.len())); - - for &(is_learner, nodes) in &[(false, nodes), (true, learners)] { - for &n in nodes { - let next_index = self.raft_log.last_index() + 1; - let mut matched = 0; - if n == self.id { - matched = next_index - 1; - self.is_learner = is_learner; - } - self.set_progress(n, matched, next_index, is_learner); - info!( - "{} restored progress of {} [{:?}]", - self.tag, - n, - self.prs().get(n) - ); - } + let next_idx = self.raft_log.last_index() + 1; + let mut prs = ProgressSet::restore_snapmeta(meta, next_idx, self.max_inflight); + prs.get_mut(self.id).unwrap().matched = next_idx - 1; + if self.is_learner && prs.configuration().voters().contains(&self.id) { + self.is_learner = false; } - - if meta.has_pending_membership_change() { - let state = meta.get_pending_membership_change(); - let start_index = meta.get_pending_membership_change_index(); - let change = ConfChange::from((start_index, state.clone())); - let config = Configuration::from(state.clone()); - - let (voters, learners) = { - let voters = config - .voters() - .difference(self.prs().configuration().voters()) - .cloned() - .collect::>(); - let learners = config - .learners() - .difference(self.prs().configuration().learners()) - .cloned() - .collect::>(); - (voters, learners) - }; - for &(is_learner, ref nodes) in &[(false, voters), (true, learners)] { - for &n in nodes { - let next_index = self.raft_log.last_index() + 1; - let mut matched = 0; - if n == self.id { - matched = next_index - 1; - self.is_learner = is_learner; - } - self.set_progress(n, matched, next_index, is_learner); - info!( - "{} restored progress of {} [{:?}]", - self.tag, - n, - self.prs().get(n) - ); - } - } - self.begin_membership_change(&change) - .expect("Expected already valid change to still be valid."); + self.prs = Some(prs); + if meta.get_pending_membership_change_index() > 0 { + let cs = meta.get_pending_membership_change().clone(); + let mut conf_change = ConfChange::new(); + conf_change.set_change_type(ConfChangeType::BeginMembershipChange); + conf_change.set_configuration(cs); + conf_change.set_start_index(meta.get_pending_membership_change_index()); + self.pending_membership_change = Some(conf_change); } - None }