Skip to content

Commit

Permalink
change: simplify membership change
Browse files Browse the repository at this point in the history
- Change: if leadership is lost, the cluster is left with the **joint**
  config.
  One does not receive response of the change-membership request should
  always re-send to ensure membership config is applied.

- Change: remove joint-uniform logic from RaftCore, which brings a lot
  complexity to raft impl. This logic is now done in Raft(which is a
  shell to control RaftCore).

- Change: RaftCore.membership is changed to `ActiveMembership`, which
  includes a log id and a membership config.
  Making this change to let raft be able to check if a membership is
  committed by comparing the log index and its committed index.

- Change: when adding a existent non-voter, it returns an `Ok` value
  instead of an `Err`.

- Change: add arg `blocking` to `add_non_voter` and `change_membership`.
  A blocking `change_membership` still wait for the two config change
  log to commit.
  `blocking` only indicates if to wait for replication to non-voter to
  be up to date.

- Change: remove `non_voters`. Merge it into `nodes`.
  Now both voters and non-voters share the same replication handle.

- Change: remove field `ReplicationState.is_ready_to_join`, it
  can be just calculated when needed.

- Change: remove `is_stepping_down`, `membership.contains()` is quite
  enough.

- Change: remove `consensus_state`.
  • Loading branch information
drmingdrmer committed Sep 27, 2021
1 parent a5793df commit 7f34793
Show file tree
Hide file tree
Showing 27 changed files with 1,022 additions and 723 deletions.
281 changes: 107 additions & 174 deletions async-raft/src/core/admin.rs

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::raft::AppendEntriesResponse;
use crate::raft::ConflictOpt;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::ActiveMembership;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
Expand Down Expand Up @@ -378,13 +379,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let last_conf_change = entries
.iter()
.filter_map(|ent| match &ent.payload {
EntryPayload::ConfigChange(conf) => Some(conf),
EntryPayload::ConfigChange(conf) => Some(ActiveMembership {
log_id: ent.log_id,
membership: conf.membership.clone(),
}),
_ => None,
})
.last();

if let Some(conf) = last_conf_change {
tracing::debug!({membership=?conf}, "applying new membership config received from leader");
self.update_membership(conf.membership.clone())?;
self.update_membership(conf)?;
};

// Replicate entries to log (same as append, but in follower mode).
Expand Down
226 changes: 102 additions & 124 deletions async-raft/src/core/client.rs

Large diffs are not rendered by default.

131 changes: 50 additions & 81 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ mod append_entries;
mod client;
mod install_snapshot;
pub(crate) mod replication;
#[cfg(test)]
mod replication_state_test;
mod vote;

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::sync::Arc;

use futures::future::AbortHandle;
Expand Down Expand Up @@ -60,14 +60,29 @@ use crate::RaftStorage;
use crate::StorageError;
use crate::Update;

/// The currently active membership config.
///
/// It includes:
/// - the id of the log that sets this membership config,
/// - and the config.
///
/// An active config is just the last seen config in raft spec.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActiveMembership {
/// The id of the log that applies this membership config
pub log_id: LogId,

pub membership: MembershipConfig,
}

/// The core type implementing the Raft protocol.
pub struct RaftCore<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
/// This node's ID.
id: NodeId,
/// This node's runtime config.
config: Arc<Config>,
/// The cluster's current membership configuration.
membership: MembershipConfig,
membership: ActiveMembership,
/// The `RaftNetwork` implementation.
network: Arc<N>,
/// The `RaftStorage` implementation.
Expand Down Expand Up @@ -152,7 +167,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
let this = Self {
id,
config,
membership,
membership: ActiveMembership {
log_id: LogId::default(),
membership,
},
network,
storage,
target_state: State::Follower,
Expand Down Expand Up @@ -185,7 +203,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_log_id = state.last_log_id;
self.current_term = state.hard_state.current_term;
self.voted_for = state.hard_state.voted_for;
self.membership = state.membership;
self.membership = state.last_membership.clone();
self.last_applied = state.last_applied;
// NOTE: this is repeated here for clarity. It is unsafe to initialize the node's commit
// index to any other value. The commit index must be determined by a leader after
Expand All @@ -199,8 +217,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

let has_log = self.last_log_id.index != u64::MIN;
let single = self.membership.members.len() == 1;
let is_voter = self.membership.contains(&self.id);
let single = self.membership.membership.members.len() == 1;
let is_voter = self.membership.membership.contains(&self.id);

self.target_state = match (has_log, single, is_voter) {
// A restarted raft that already received some logs but was not yet added to a cluster.
Expand Down Expand Up @@ -292,7 +310,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// Update core's target state, ensuring all invariants are upheld.
#[tracing::instrument(level = "trace", skip(self))]
fn set_target_state(&mut self, target_state: State) {
if target_state == State::Follower && !self.membership.contains(&self.id) {
if target_state == State::Follower && !self.membership.membership.contains(&self.id) {
self.target_state = State::NonVoter;
} else {
self.target_state = target_state;
Expand Down Expand Up @@ -375,7 +393,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

/// Update the node's current membership config & save hard state.
#[tracing::instrument(level = "trace", skip(self))]
fn update_membership(&mut self, cfg: MembershipConfig) -> RaftResult<()> {
fn update_membership(&mut self, cfg: ActiveMembership) -> RaftResult<()> {
// If the given config does not contain this node's ID, it means one of the following:
//
// - the node is currently a non-voter and is replicating an old config to which it has
Expand All @@ -384,9 +402,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// transition to the non-voter state as a signal for when it is safe to shutdown a node
// being removed.
self.membership = cfg;
if !self.membership.contains(&self.id) {
if !self.membership.membership.contains(&self.id) {
self.set_target_state(State::NonVoter);
} else if self.target_state == State::NonVoter && self.membership.members.contains(&self.id) {
} else if self.target_state == State::NonVoter && self.membership.membership.members.contains(&self.id) {
// The node is a NonVoter and the new config has it configured as a normal member.
// Transition to follower.
self.set_target_state(State::Follower);
Expand Down Expand Up @@ -613,12 +631,9 @@ impl State {
/// Volatile state specific to the Raft leader.
struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
pub(super) core: &'a mut RaftCore<D, R, N, S>,

/// A mapping of node IDs the replication state of the target node.
pub(super) nodes: BTreeMap<NodeId, ReplicationState<D>>,
/// A mapping of new nodes (non-voters) which are being synced in order to join the cluster.
pub(super) non_voters: BTreeMap<NodeId, NonVoterReplicationState<D>>,
/// A bool indicating if this node will be stepping down after committing the current config change.
pub(super) is_stepping_down: bool,

/// The metrics about a leader
pub leader_metrics: LeaderMetrics,
Expand All @@ -631,29 +646,18 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf

/// A buffer of client requests which have been appended locally and are awaiting to be committed to the cluster.
pub(super) awaiting_committed: Vec<ClientRequestEntry<D, R>>,

/// A field tracking the cluster's current consensus state, which is used for dynamic membership.
pub(super) consensus_state: ConsensusState,
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> LeaderState<'a, D, R, N, S> {
/// Create a new instance.
pub(self) fn new(core: &'a mut RaftCore<D, R, N, S>) -> Self {
let consensus_state = if core.membership.is_in_joint_consensus() {
ConsensusState::Joint { is_committed: false }
} else {
ConsensusState::Uniform
};
let (replication_tx, replication_rx) = mpsc::unbounded_channel();
Self {
core,
nodes: BTreeMap::new(),
non_voters: BTreeMap::new(),
is_stepping_down: false,
leader_metrics: LeaderMetrics::default(),
replication_tx,
replication_rx,
consensus_state,
awaiting_committed: Vec::new(),
}
}
Expand All @@ -665,13 +669,14 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let targets = self
.core
.membership
.membership
.all_nodes()
.into_iter()
.filter(|elem| elem != &self.core.id)
.collect::<Vec<_>>();

for target in targets {
let state = self.spawn_replication_stream(target);
let state = self.spawn_replication_stream(target, None);
self.nodes.insert(target, state);
}

Expand All @@ -691,9 +696,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
for node in self.nodes.values() {
let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
}
for node in self.non_voters.values() {
let _ = node.state.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
}
return Ok(());
}

Expand Down Expand Up @@ -728,13 +730,13 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
tracing::info!("leader recv from rx_api: Initialize");
self.core.reject_init_with_config(tx);
}
RaftMsg::AddNonVoter{id, tx} => {
RaftMsg::AddNonVoter{id, tx, blocking} => {
tracing::info!("leader recv from rx_api: AddNonVoter, {}", id);
self.add_member(id, tx);
self.add_member(id, tx, blocking);
}
RaftMsg::ChangeMembership{members, tx} => {
RaftMsg::ChangeMembership{members, blocking, tx} => {
tracing::info!("leader recv from rx_api: ChangeMembership, {:?}", members);
self.change_membership(members, tx).await;
self.change_membership(members, blocking, tx).await;
}
}
},
Expand Down Expand Up @@ -767,60 +769,27 @@ struct ReplicationState<D: AppData> {
pub matched: LogId,
pub remove_after_commit: Option<u64>,
pub replstream: ReplicationStream<D>,
}

/// The same as `ReplicationState`, except for non-voters.
struct NonVoterReplicationState<D: AppData> {
/// The replication stream state.
pub state: ReplicationState<D>,
/// A bool indicating if this non-voters is ready to join the cluster.
pub is_ready_to_join: bool,

/// The response channel to use for when this node has successfully synced with the cluster.
pub tx: Option<ResponseTx>,
}

/// A state enum used by Raft leaders to navigate the joint consensus protocol.
pub enum ConsensusState {
/// The cluster is preparring to go into joint consensus, but the leader is still syncing
/// some non-voters to prepare them for cluster membership.
NonVoterSync {
/// The set of non-voters nodes which are still being synced.
awaiting: HashSet<NodeId>,
/// The full membership change which has been proposed.
members: BTreeSet<NodeId>,

/// The response channel to use once the consensus state is back into uniform state.
tx: ResponseTx,
},
/// The cluster is in a joint consensus state and is syncing new nodes.
Joint {
/// A bool indicating if the associated config which started this joint consensus has yet been comitted.
///
/// NOTE: when a new leader is elected, it will initialize this value to false, and then
/// update this value to true once the new leader's blank payload has been committed.
is_committed: bool,
},
/// The cluster consensus is uniform; not in a joint consensus state.
Uniform,
}
impl<D> ReplicationState<D>
where D: AppData
{
// TODO(xp): make this a method of Config?

impl ConsensusState {
/// Check the current state to determine if it is in joint consensus, and if it is safe to finalize the joint
/// consensus.
///
/// The return value will be true if:
/// 1. this object currently represents a joint consensus state.
/// 2. the corresponding config for this consensus state has been committed to the cluster.
pub fn is_joint_consensus_safe_to_finalize(&self) -> bool {
match self {
ConsensusState::Joint { is_committed } => *is_committed,
_ => false,
}
/// Return true if the distance behind last_log_id is smaller than the threshold to join.
pub fn is_line_rate(&self, last_log_id: &LogId, config: &Config) -> bool {
is_matched_upto_date(&self.matched, last_log_id, config)
}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
pub fn is_matched_upto_date(matched: &LogId, last_log_id: &LogId, config: &Config) -> bool {
let my_index = matched.index;
let distance = last_log_id.index.saturating_sub(my_index);
distance <= config.replication_lag_threshold
}

/// Volatile state specific to a Raft node in candidate state.
struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
Expand Down Expand Up @@ -857,8 +826,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

// Setup initial state per term.
self.votes_granted_old = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_old = ((self.core.membership.members.len() / 2) + 1) as u64; // Just need a majority.
if let Some(nodes) = &self.core.membership.members_after_consensus {
self.votes_needed_old = ((self.core.membership.membership.members.len() / 2) + 1) as u64; // Just need a majority.
if let Some(nodes) = &self.core.membership.membership.members_after_consensus {
self.votes_granted_new = 1; // We must vote for ourselves per the Raft spec.
self.votes_needed_new = ((nodes.len() / 2) + 1) as u64; // Just need a majority.
}
Expand Down
Loading

0 comments on commit 7f34793

Please sign in to comment.