Skip to content

Commit

Permalink
feature: support promote a learner to voter
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <themanforfree@gmail.com>
  • Loading branch information
themanforfree committed Oct 12, 2023
1 parent edb7a91 commit 7c95b45
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 27 deletions.
7 changes: 7 additions & 0 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ impl ClusterInfo {
.iter()
.find_map(|m| (m.name == name).then_some(m.id))
}

/// Promote a learner to voter
pub(crate) fn promote(&self, node_id: ServerId) {
if let Some(mut s) = self.members.get_mut(&node_id) {
s.is_learner = false;
}
}
}

#[cfg(test)]
Expand Down
11 changes: 11 additions & 0 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,17 @@ impl ConfChange {
address: vec![],
}
}

/// Create a new `ConfChange` to promote a node
#[must_use]
#[inline]
pub fn promote(node_id: ServerId) -> Self {
Self {
change_type: ConfChangeType::Promote as i32,
node_id,
address: vec![],
}
}
}

impl ProposeConfChangeRequest {
Expand Down
3 changes: 1 addition & 2 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
let Ok(change) = change_res else {
break;
};

match change.change_type() {
ConfChangeType::Add | ConfChangeType::AddLearner => {
let connect = match InnerConnectApiWrapper::connect(change.node_id, change.address).await {
Expand Down Expand Up @@ -487,7 +486,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
continue;
}
}
ConfChangeType::Promote => {} // TODO: Support Promote
ConfChangeType::Promote => {}
}
}
}
Expand Down
39 changes: 29 additions & 10 deletions curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
},
};

use clippy_utilities::NumericCast;
use clippy_utilities::{NumericCast, OverflowArithmetic};
use curp_external_api::cmd::ConflictCheck;
use dashmap::DashMap;
use event_listener::Event;
Expand Down Expand Up @@ -80,6 +80,9 @@ pub(super) type UncommittedPoolRef<C> = Arc<Mutex<UncommittedPool<C>>>;
/// Default Size of channel
const CHANGE_CHANNEL_SIZE: usize = 128;

/// Max gap between leader and learner when promoting a learner
const MAX_PROMOTE_GAP: u64 = 500;

/// The curp state machine
#[derive(Debug)]
pub(super) struct RawCurp<C: Command, RC: RoleChange> {
Expand Down Expand Up @@ -310,6 +313,16 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
Err(ConfChangeError::new_propose(ProposeError::NotLeader)),
);
}
if let Some(change) = conf_change.changes().first() {
if let ConfChangeType::Promote = change.change_type() {
let learner_index = self.lst.get_match_index(change.node_id);
let leader_index = self.log.read().last_log_index();
if leader_index.overflow_sub(learner_index) > MAX_PROMOTE_GAP {
return (info, Err(ConfChangeError::LearnerNotCatchUp(())));
}
}
}

let pool_entry = PoolEntry::from(conf_change.clone());
let mut conflict = self.insert_sp(pool_entry.clone());
conflict |= self.insert_ucp(pool_entry);
Expand Down Expand Up @@ -1247,7 +1260,6 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}

/// Check if the new config is valid
#[allow(clippy::unimplemented)] // TODO: remove this when learner promote is implemented
fn check_new_config(&self, conf_change: &ConfChange) -> Result<(), ConfChangeError> {
let mut statuses_ids = self
.lst
Expand Down Expand Up @@ -1282,18 +1294,17 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
}
let mut all_nodes = HashSet::new();
all_nodes.extend(config.voters());
all_nodes.extend(config.learners());
all_nodes.extend(&config.learners);
if statuses_ids.len() < 3
|| all_nodes != statuses_ids
|| !config.voters().is_disjoint(config.learners())
|| !config.voters().is_disjoint(&config.learners)
{
return Err(ConfChangeError::InvalidConfig(()));
}
Ok(())
}

/// Switch to a new config and return old member infos for fallback
#[allow(clippy::unimplemented)] // TODO: remove this when learner is implemented
fn switch_config(&self, conf_change: ConfChange) -> (Vec<String>, String, bool) {
let node_id = conf_change.node_id;
let fallback_info = match conf_change.change_type() {
Expand Down Expand Up @@ -1333,13 +1344,21 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
(old_addrs, String::new(), false)
}
ConfChangeType::Promote => {
unimplemented!("learner node is not supported yet");
self.cst.map_lock(|mut cst_l| {
_ = cst_l.config.learners.remove(&node_id);
_ = cst_l.config.insert(node_id, false);
});
self.ctx.cluster_info.promote(node_id);
self.lst.promote(node_id);
(vec![], String::new(), false)
}
};
self.ctx
.change_tx
.send(conf_change)
.unwrap_or_else(|_e| unreachable!("change_rx should not be dropped"));
if self.is_leader() {
self.ctx
.change_tx
.send(conf_change)
.unwrap_or_else(|_e| unreachable!("change_rx should not be dropped"));
}
fallback_info
}

Expand Down
25 changes: 13 additions & 12 deletions curp/src/server/raw_curp/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,10 @@ impl LeaderState {
self.get_status(id).next_index
}

// /// Get `match_index` for server
// pub(super) fn get_match_index(&self, id: ServerId) -> LogIndex {
// self.get_status(id).match_index
// }
// TODO
/// Get `match_index` for server
pub(super) fn get_match_index(&self, id: ServerId) -> LogIndex {
self.get_status(id).match_index
}

/// Update `next_index` for server
pub(super) fn update_next_index(&self, id: ServerId, index: LogIndex) {
Expand All @@ -204,6 +203,13 @@ impl LeaderState {
pub(super) fn iter(&self) -> impl Iterator<Item = RefMulti<'_, ServerId, FollowerStatus>> {
self.statuses.iter()
}

/// Promote a learner to voter
pub(super) fn promote(&self, node_id: ServerId) {
if let Some(mut s) = self.statuses.get_mut(&node_id) {
s.is_learner = false;
}
}
}

impl<C> CandidateState<C> {
Expand Down Expand Up @@ -239,9 +245,9 @@ pub(super) struct MajorityConfig {
#[derive(Debug, Clone)]
pub(super) struct Config {
/// The majority config
majority_config: MajorityConfig,
pub(super) majority_config: MajorityConfig,
/// The learners in the cluster
learners: HashSet<ServerId>,
pub(super) learners: HashSet<ServerId>,
}

impl Config {
Expand All @@ -258,11 +264,6 @@ impl Config {
&self.majority_config.voters
}

/// Get learners set
pub(super) fn learners(&self) -> &HashSet<ServerId> {
&self.learners
}

/// Insert a voter
pub(super) fn insert(&mut self, id: ServerId, is_learner: bool) -> bool {
if is_learner {
Expand Down
24 changes: 22 additions & 2 deletions curp/src/server/raw_curp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
let mut log_w = self.log.write();
log_w.push(st_r.term, cmd).unwrap().index
}

pub(crate) fn check_learner(&self, node_id: ServerId, is_learner: bool) -> bool {
self.lst
.get_all_statuses()
.get(&node_id)
.is_some_and(|f| f.is_learner == is_learner)
&& self
.cluster()
.all_members()
.get(&node_id)
.is_some_and(|m| m.is_learner == is_learner)
&& self.cst.map_lock(|cst_l| {
cst_l.config.learners.contains(&node_id) == is_learner
&& cst_l.config.voters().contains(&1) != is_learner
})
}
}

/*************** tests for propose **************/
Expand Down Expand Up @@ -751,7 +767,7 @@ fn add_node_should_add_new_node_to_curp() {

#[traced_test]
#[test]
fn add_learner_node_should_add_learner_to_curp() {
fn add_learner_node_and_promote_should_success() {
let curp = {
let exe_tx = MockCEEventTxApi::<TestCommand>::default();
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
Expand All @@ -761,7 +777,11 @@ fn add_learner_node_should_add_learner_to_curp() {
vec!["http://127.0.0.1:4567".to_owned()],
)];
assert!(curp.apply_conf_change(changes).is_ok());
assert!(curp.contains(1));
assert!(curp.check_learner(1, true));

let changes = vec![ConfChange::promote(1)];
assert!(curp.apply_conf_change(changes).is_ok());
assert!(curp.check_learner(1, false));
}

#[traced_test]
Expand Down
2 changes: 1 addition & 1 deletion simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use curp::{
client::{Client, ReadState},
cmd::Command,
error::{CommandProposeError, ProposeError},
members::{ClusterInfo, Member, ServerId},
members::{ClusterInfo, ServerId},
server::Rpc,
ConfChangeError, FetchClusterRequest, FetchClusterResponse, LogIndex, Member,
ProposeConfChangeRequest, ProposeConfChangeResponse,
Expand Down

0 comments on commit 7c95b45

Please sign in to comment.