Skip to content

Commit

Permalink
feature: support add learner node to cluster
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <themanforfree@gmail.com>
  • Loading branch information
themanforfree committed Oct 12, 2023
1 parent 601c710 commit edb7a91
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 106 deletions.
18 changes: 9 additions & 9 deletions curp-test-utils/src/test_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ impl PbCodec for ExecuteError {

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct TestCommand {
id: ProposeId,
keys: Vec<u32>,
exe_dur: Duration,
as_dur: Duration,
exe_should_fail: bool,
as_should_fail: bool,
cmd_type: TestCommandType,
pub id: ProposeId,
pub keys: Vec<u32>,
pub exe_dur: Duration,
pub as_dur: Duration,
pub exe_should_fail: bool,
pub as_should_fail: bool,
pub cmd_type: TestCommandType,
}

impl Default for TestCommand {
Expand Down Expand Up @@ -177,7 +177,7 @@ impl From<LogIndexResult> for LogIndex {
}
}

// The `TestCommandResult` is only for internal use, so we donnot have to serialize it to protobuf format
// The `TestCommandResult` is only for internal use, so we do not have to serialize it to protobuf format
impl PbCodec for LogIndexResult {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_else(|_| {
Expand Down Expand Up @@ -225,7 +225,7 @@ impl ConflictCheck for TestCommand {
}
}

// The `TestCommand` is only for internal use, so we donnot have to serialize it to protobuf format
// The `TestCommand` is only for internal use, so we do not have to serialize it to protobuf format
impl PbCodec for TestCommand {
fn encode(&self) -> Vec<u8> {
bincode::serialize(self)
Expand Down
2 changes: 1 addition & 1 deletion curp/proto/common
Submodule common updated 1 files
+3 −2 src/message.proto
20 changes: 10 additions & 10 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ impl ClusterInfo {
.collect()
}

/// Get all members vec
#[must_use]
#[inline]
pub fn all_members_vec(&self) -> Vec<Member> {
self.members.iter().map(|t| t.value().clone()).collect()
}

/// Insert a member
#[inline]
pub fn insert(&self, member: Member) {
Expand Down Expand Up @@ -241,18 +248,11 @@ impl ClusterInfo {
.collect()
}

/// Get all members
#[must_use]
#[inline]
pub fn members(&self) -> Vec<Member> {
self.members.iter().map(|t| t.value().clone()).collect()
}

/// Get length of peers
#[must_use]
#[inline]
pub fn members_len(&self) -> usize {
self.members.len()
pub fn voters_len(&self) -> usize {
self.members.iter().filter(|t| !t.is_learner).count()
}

/// Get id by name
Expand Down Expand Up @@ -304,7 +304,7 @@ mod tests {
let node1_url = node1.self_addrs();
assert!(!peers.contains_key(&node1_id));
assert_eq!(peers.len(), 2);
assert_eq!(node1.members_len(), peers.len() + 1);
assert_eq!(node1.voters_len(), peers.len() + 1); // TODO fix test

let peer_urls = peers.values().collect::<Vec<_>>();

Expand Down
6 changes: 4 additions & 2 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ impl FetchClusterResponse {
/// Create a new `FetchClusterResponse`
pub(crate) fn new(
leader_id: Option<ServerId>,
all_members: HashMap<ServerId, Member>,
term: u64,
cluster_id: u64,
members: Vec<Member>,
) -> Self {
Self {
leader_id,
members: all_members.into_values().map(Into::into).collect(),
term,
cluster_id,
members,
}
}

Expand Down
98 changes: 82 additions & 16 deletions curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
}
Err(err) => Some(err),
};
let members = self.curp.cluster().members();
let members = self.curp.cluster().all_members_vec();
Ok(ProposeConfChangeResponse {
members,
leader_id,
Expand Down Expand Up @@ -254,8 +254,11 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
_req: FetchClusterRequest,
) -> Result<FetchClusterResponse, CurpError> {
let (leader_id, term) = self.curp.leader();
let all_members = self.curp.cluster().all_members();
Ok(FetchClusterResponse::new(leader_id, all_members, term))
let cluster_id = self.curp.cluster().cluster_id();
let members = self.curp.cluster().all_members_vec();
Ok(FetchClusterResponse::new(
leader_id, term, cluster_id, members,
))
}

/// Handle `InstallSnapshot` stream
Expand Down Expand Up @@ -723,21 +726,24 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
async fn bcast_vote(curp: &RawCurp<C, RC>, vote: Vote) {
debug!("{} broadcasts votes to all servers", curp.id());
let rpc_timeout = curp.cfg().rpc_timeout;
let voters = curp.voters();
let resps = curp
.connects()
.iter()
.map(|c| {
let req = VoteRequest::new(
vote.term,
vote.candidate_id,
vote.last_log_index,
vote.last_log_term,
);
let connect = Arc::clone(c.value());
async move {
let resp = connect.vote(req, rpc_timeout).await;
(connect.id(), resp)
}
.filter_map(|c| {
voters.contains(c.key()).then(|| {
let req = VoteRequest::new(
vote.term,
vote.candidate_id,
vote.last_log_index,
vote.last_log_term,
);
let connect = Arc::clone(c.value());
async move {
let resp = connect.vote(req, rpc_timeout).await;
(connect.id(), resp)
}
})
})
.collect::<FuturesUnordered<_>>()
.filter_map(|(id, resp)| async move {
Expand Down Expand Up @@ -858,7 +864,9 @@ mod tests {
use tracing_test::traced_test;

use super::*;
use crate::{rpc::connect::MockInnerConnectApi, server::cmd_worker::MockCEEventTxApi};
use crate::{
rpc::connect::MockInnerConnectApi, server::cmd_worker::MockCEEventTxApi, ConfChange,
};

#[traced_test]
#[tokio::test]
Expand Down Expand Up @@ -930,4 +938,62 @@ mod tests {
assert!(curp.is_leader());
curp.shutdown_trigger().self_shutdown_and_wait().await;
}

#[traced_test]
#[tokio::test]
async fn vote_will_not_send_to_learner_during_election() {
let curp = {
let exe_tx = MockCEEventTxApi::<TestCommand>::default();
Arc::new(RawCurp::new_test(3, exe_tx, mock_role_change()))
};

let learner_id = 123;
let s1_id = curp.cluster().get_id_by_name("S1").unwrap();
let s2_id = curp.cluster().get_id_by_name("S2").unwrap();

let _ig = curp.apply_conf_change(vec![ConfChange::add_learner(
learner_id,
vec!["address".to_owned()],
)]);

curp.handle_append_entries(1, s2_id, 0, 0, vec![], 0)
.unwrap();

let mut mock_connect1 = MockInnerConnectApi::default();
mock_connect1.expect_vote().returning(|req, _| {
Ok(tonic::Response::new(
VoteResponse::new_accept::<TestCommand>(req.term, vec![]).unwrap(),
))
});
mock_connect1.expect_id().return_const(s1_id);
curp.set_connect(
s1_id,
InnerConnectApiWrapper::new_from_arc(Arc::new(mock_connect1)),
);

let mut mock_connect2 = MockInnerConnectApi::default();
mock_connect2.expect_vote().returning(|req, _| {
Ok(tonic::Response::new(
VoteResponse::new_accept::<TestCommand>(req.term, vec![]).unwrap(),
))
});
mock_connect2.expect_id().return_const(s2_id);
curp.set_connect(
s2_id,
InnerConnectApiWrapper::new_from_arc(Arc::new(mock_connect2)),
);

let mut mock_connect_learner = MockInnerConnectApi::default();
mock_connect_learner
.expect_vote()
.returning(|_, _| panic!("should not send vote to learner"));
curp.set_connect(
learner_id,
InnerConnectApiWrapper::new_from_arc(Arc::new(mock_connect_learner)),
);
tokio::spawn(CurpNode::election_task(Arc::clone(&curp)));
sleep_secs(3).await;
assert!(curp.is_leader());
curp.shutdown_trigger().self_shutdown_and_wait().await;
}
}
Loading

0 comments on commit edb7a91

Please sign in to comment.