Skip to content

Commit

Permalink
feature: add apply_conf_change method for RawCurp
Browse files Browse the repository at this point in the history
refactor data structures that need to be mutable

Signed-off-by: themanforfree <themanforfree@gmail.com>
  • Loading branch information
themanforfree committed Aug 30, 2023
1 parent 2a034e9 commit b5087f5
Show file tree
Hide file tree
Showing 14 changed files with 501 additions and 126 deletions.
24 changes: 24 additions & 0 deletions curp/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,32 @@ message FetchReadStateResponse {
}
}



message ProposeConfChangeRequest {
enum ConfChangeType {
Add = 0;
AddLearner = 1;
Remove = 2;
Update = 3;
}
message ConfChange {
ConfChangeType change_type = 1;
uint64 node_id = 2;
string address = 3;
}
string id = 1;
repeated ConfChange changes = 2;
}

message ProposeConfChangeResponse {
bool ok = 1;
}


service Protocol {
rpc Propose (ProposeRequest) returns (ProposeResponse);
rpc ProposeConfChange (ProposeConfChangeRequest) returns (ProposeConfChangeResponse);
rpc WaitSynced (WaitSyncedRequest) returns (WaitSyncedResponse);
rpc AppendEntries (AppendEntriesRequest) returns (AppendEntriesResponse);
rpc Vote (VoteRequest) returns (VoteResponse);
Expand Down
16 changes: 16 additions & 0 deletions curp/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,19 @@ impl<C: Command> From<SyncError> for CommandSyncError<C> {
Self::Sync(err)
}
}

/// Error type of `apply_conf_change`
#[derive(Error, Debug, Clone, Copy)]
#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive
#[non_exhaustive]
pub enum ApplyConfChangeError {
/// Config is invalid after applying the conf change
#[error("invalid config")]
InvalidConfig,
/// The node to be removed does not exist
#[error("node {0} does not exist")]
NodeNotExists(ServerId),
/// The node to be added already exists
#[error("node {0} already exists")]
NodeAlreadyExists(ServerId),
}
108 changes: 83 additions & 25 deletions curp/src/members.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dashmap::{mapref::one::Ref, DashMap};
use itertools::Itertools;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
Expand All @@ -8,7 +9,7 @@ use std::{
pub type ServerId = u64;

/// Cluster member
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Member {
/// Server id of the member
id: ServerId,
Expand All @@ -18,6 +19,26 @@ pub struct Member {
address: String,
}

/// Cluster member
impl Member {
/// Create a new `Member`
#[inline]
pub fn new(id: ServerId, name: impl Into<String>, address: impl Into<String>) -> Self {
Self {
id,
name: name.into(),
address: address.into(),
}
}

/// Get member id
#[must_use]
#[inline]
pub fn id(&self) -> ServerId {
self.id
}
}

/// cluster members information
#[derive(Debug)]
pub struct ClusterInfo {
Expand All @@ -26,7 +47,7 @@ pub struct ClusterInfo {
/// current server id
member_id: ServerId,
/// all members information
members: HashMap<ServerId, Member>,
members: DashMap<ServerId, Member>,
}

impl ClusterInfo {
Expand All @@ -35,10 +56,10 @@ impl ClusterInfo {
/// panic if `all_members` is empty
#[inline]
#[must_use]
pub fn new(all_members: HashMap<String, String>, self_name: &str) -> Self {
pub fn new(all_members_addrs: HashMap<String, String>, self_name: &str) -> Self {
let mut member_id = 0;
let mut members = HashMap::new();
for (name, address) in all_members {
let members = DashMap::new();
for (name, address) in all_members_addrs {
let id = Self::calculate_member_id(&address, "", None);
if name == self_name {
member_id = id;
Expand All @@ -56,47 +77,84 @@ impl ClusterInfo {
cluster_info
}

/// Get all members
#[must_use]
#[inline]
pub fn get_members(&self) -> HashMap<ServerId, Member> {
self.members
.iter()
.map(|t| (t.id, t.value().clone()))
.collect()
}

/// Insert a member
#[inline]
pub fn insert(&self, member: Member) {
_ = self.members.insert(member.id, member);
}
/// Remove a member
#[inline]
pub fn remove(&self, id: &ServerId) {
_ = self.members.remove(id);
}

/// Update a member
#[inline]
pub fn update(&self, id: &ServerId, address: impl Into<String>) {
self.members
.get_mut(id)
.unwrap_or_else(|| unreachable!("member {} not found", id))
.address = address.into();
}

/// Get server address via server id
#[must_use]
#[inline]
pub fn address(&self, id: ServerId) -> Option<&str> {
pub fn address(&self, id: ServerId) -> Option<String> {
self.members
.values()
.iter()
.find(|t| t.id == id)
.map(|t| t.address.as_str())
.map(|t| t.address.clone())
}

/// Get the current member
#[allow(clippy::indexing_slicing)] // self member id must be in members
fn self_member(&self) -> &Member {
&self.members[&self.member_id]
#[allow(clippy::unwrap_used)] // self member id must be in members
fn self_member(&self) -> Ref<'_, u64, Member> {
self.members.get(&self.member_id).unwrap()
}

/// Get the current server address
#[must_use]
#[inline]
pub fn self_address(&self) -> &str {
&self.self_member().address
pub fn self_address(&self) -> String {
self.self_member().address.clone()
}

/// Get the current server id
#[must_use]
#[inline]
pub fn self_name(&self) -> &str {
&self.self_member().name
pub fn self_name(&self) -> String {
self.self_member().name.clone()
}

/// Get peers id
/// Get peers ids
#[must_use]
#[inline]
pub fn peers_ids(&self) -> Vec<ServerId> {
self.members
.values()
.iter()
.filter(|t| t.id != self.member_id)
.map(|t| t.id)
.collect()
}

/// Get all ids
#[must_use]
#[inline]
pub fn all_ids(&self) -> Vec<ServerId> {
self.members.iter().map(|t| t.id).collect()
}

/// Calculate the member id
fn calculate_member_id(address: &str, cluster_name: &str, timestamp: Option<u64>) -> ServerId {
let mut hasher = DefaultHasher::new();
Expand All @@ -111,8 +169,8 @@ impl ClusterInfo {
/// Calculate the cluster id
fn gen_cluster_id(&mut self) {
let mut hasher = DefaultHasher::new();
for id in self.members.keys().sorted() {
hasher.write_u64(*id);
for id in self.members.iter().map(|t| t.id).sorted() {
hasher.write_u64(id);
}
self.cluster_id = hasher.finish();
}
Expand All @@ -134,9 +192,9 @@ impl ClusterInfo {
/// Get peers
#[must_use]
#[inline]
pub fn peers(&self) -> HashMap<ServerId, String> {
pub fn peers_addrs(&self) -> HashMap<ServerId, String> {
self.members
.values()
.iter()
.filter(|t| t.id != self.member_id)
.map(|t| (t.id, t.address.clone()))
.collect()
Expand All @@ -145,9 +203,9 @@ impl ClusterInfo {
/// Get all members
#[must_use]
#[inline]
pub fn all_members(&self) -> HashMap<ServerId, String> {
pub fn all_members_addrs(&self) -> HashMap<ServerId, String> {
self.members
.values()
.iter()
.map(|t| (t.id, t.address.clone()))
.collect()
}
Expand All @@ -166,7 +224,7 @@ impl ClusterInfo {
pub fn get_id_by_name(&self, name: &str) -> Option<ServerId> {
self.members
.iter()
.find_map(|(_, m)| (m.name == name).then_some(m.id))
.find_map(|m| (m.name == name).then_some(m.id))
}
}

Expand Down Expand Up @@ -205,7 +263,7 @@ mod tests {
]);

let node1 = ClusterInfo::new(all_members, "S1");
let peers = node1.peers();
let peers = node1.peers_addrs();
let node1_id = node1.self_id();
let node1_url = node1.self_address();
assert!(!peers.contains_key(&node1_id));
Expand Down
44 changes: 43 additions & 1 deletion curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use serde::{de::DeserializeOwned, Serialize};

pub(crate) use self::proto::{
fetch_read_state_response::ReadState,
propose_conf_change_request::{ConfChange, ConfChangeType},
propose_response::ExeResult,
protocol_server::Protocol,
wait_synced_response::{Success, SyncResult as SyncResultRaw},
AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse,
FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest,
InstallSnapshotResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse,
InstallSnapshotResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, VoteRequest,
VoteResponse, WaitSyncedRequest, WaitSyncedResponse,
};
pub use self::proto::{
propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest,
Expand Down Expand Up @@ -391,3 +393,43 @@ impl FetchReadStateResponse {
}
}
}

#[allow(dead_code)] // TODO: remove
#[allow(clippy::as_conversions)] // those conversions are safe
impl ConfChange {
/// Create a new `ConfChange` to add a node
pub(crate) fn add(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::Add as i32,
node_id,
address,
}
}

/// Create a new `ConfChange` to remove a node
pub(crate) fn remove(node_id: ServerId) -> Self {
Self {
change_type: ConfChangeType::Remove as i32,
node_id,
address: String::new(),
}
}

/// Create a new `ConfChange` to update a node
pub(crate) fn update(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::Update as i32,
node_id,
address,
}
}

/// Create a new `ConfChange` to add a learner node
pub(crate) fn add_learner(node_id: ServerId, address: String) -> Self {
Self {
change_type: ConfChangeType::AddLearner as i32,
node_id,
address,
}
}
}
2 changes: 1 addition & 1 deletion curp/src/server/cmd_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ struct TaskRx<C: Command>(flume::Receiver<Task<C>>);

/// Send cmd to background execution worker
#[cfg_attr(test, automock)]
pub(super) trait CEEventTxApi<C: Command + 'static>: Send + Sync + 'static {
pub(super) trait CEEventTxApi<C: Command + 'static>: Send + Sync + 'static + Debug {
/// Send cmd to background cmd worker for speculative execution
fn send_sp_exe(&self, entry: Arc<LogEntry<C>>);

Expand Down
Loading

0 comments on commit b5087f5

Please sign in to comment.