diff --git a/crates/curp/src/client/cluster_state.rs b/crates/curp/src/client/cluster_state.rs new file mode 100644 index 000000000..cdfe72b47 --- /dev/null +++ b/crates/curp/src/client/cluster_state.rs @@ -0,0 +1,113 @@ +use std::{collections::HashMap, sync::Arc}; + +use futures::{stream::FuturesUnordered, Future}; + +use crate::{ + members::ServerId, + rpc::{connect::ConnectApi, CurpError}, +}; + +/// The cluster state +/// +/// The client must discover the cluster info before sending any propose +#[derive(Default, Clone)] +pub(crate) struct ClusterState { + /// Leader id. + leader: ServerId, + /// Term, initialize to 0, calibrated by the server. + term: u64, + /// Cluster version, initialize to 0, calibrated by the server. + cluster_version: u64, + /// Members' connect, calibrated by the server. + connects: HashMap>, +} + +impl std::fmt::Debug for ClusterState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("State") + .field("leader", &self.leader) + .field("term", &self.term) + .field("cluster_version", &self.cluster_version) + .field("connects", &self.connects.keys()) + .finish() + } +} + +impl ClusterState { + /// Creates a new `ClusterState` + pub(crate) fn new( + leader: ServerId, + term: u64, + cluster_version: u64, + connects: HashMap>, + ) -> Self { + Self { + leader, + term, + cluster_version, + connects, + } + } + + /// Take an async function and map to the dedicated server, return None + /// if the server can not found in local state + pub(crate) fn map_server>>( + &self, + id: ServerId, + f: impl FnOnce(Arc) -> F, + ) -> Option { + // If the leader id cannot be found in connects, it indicates that there is + // an inconsistency between the client's local leader state and the cluster + // state, then mock a `WrongClusterVersion` return to the outside. + self.connects.get(&id).map(Arc::clone).map(f) + } + + /// Take an async function and map to the dedicated server, return None + /// if the server can not found in local state + pub(crate) fn map_leader>>( + &self, + f: impl FnOnce(Arc) -> F, + ) -> F { + // If the leader id cannot be found in connects, it indicates that there is + // an inconsistency between the client's local leader state and the cluster + // state, then mock a `WrongClusterVersion` return to the outside. + f(Arc::clone(self.connects.get(&self.leader).unwrap_or_else( + || unreachable!("leader connect should always exists"), + ))) + } + + /// Take an async function and map to all server, returning `FuturesUnordered` + pub(crate) fn for_each_server>( + &self, + f: impl FnMut(Arc) -> F, + ) -> FuturesUnordered { + self.connects.values().map(Arc::clone).map(f).collect() + } + + /// Take an async function and map to all server, returning `FuturesUnordered` + pub(crate) fn for_each_follower>( + &self, + f: impl FnMut(Arc) -> F, + ) -> FuturesUnordered { + self.connects + .iter() + .filter_map(|(id, conn)| (*id != self.leader).then_some(conn)) + .map(Arc::clone) + .map(f) + .collect() + } + + /// Returns the quorum size based on the given quorum function + /// + /// NOTE: Do not update the cluster in between an `for_each_xxx` and an `get_quorum`, which may + /// lead to inconsistent quorum. + pub(crate) fn get_quorum usize>(&self, mut quorum: Q) -> usize { + let cluster_size = self.connects.len(); + quorum(cluster_size) + } + + /// Returns the term of the cluster + pub(crate) fn term(&self) -> u64 { + self.term + } +} diff --git a/crates/curp/src/client/config.rs b/crates/curp/src/client/config.rs new file mode 100644 index 000000000..cc149d966 --- /dev/null +++ b/crates/curp/src/client/config.rs @@ -0,0 +1,66 @@ +use std::time::Duration; + +use tonic::transport::ClientTlsConfig; + +use crate::members::ServerId; + +/// Client config +#[derive(Default, Debug, Clone)] +pub(crate) struct Config { + /// Local server id, should be initialized on startup + local_server: Option, + /// Client tls config + tls_config: Option, + /// The rpc timeout of a propose request + propose_timeout: Duration, + /// The rpc timeout of a 2-RTT request, usually takes longer than propose timeout + /// + /// The recommended the values is within (propose_timeout, 2 * propose_timeout]. + wait_synced_timeout: Duration, + /// is current client send request to raw curp server + is_raw_curp: bool, +} + +impl Config { + /// Creates a new `Config` + pub(crate) fn new( + local_server: Option, + tls_config: Option, + propose_timeout: Duration, + wait_synced_timeout: Duration, + is_raw_curp: bool, + ) -> Self { + Self { + local_server, + tls_config, + propose_timeout, + wait_synced_timeout, + is_raw_curp, + } + } + + /// Get the local server id + pub(crate) fn local_server(&self) -> Option { + self.local_server + } + + /// Get the client TLS config + pub(crate) fn tls_config(&self) -> Option<&ClientTlsConfig> { + self.tls_config.as_ref() + } + + /// Get the propose timeout + pub(crate) fn propose_timeout(&self) -> Duration { + self.propose_timeout + } + + /// Get the wait synced timeout + pub(crate) fn wait_synced_timeout(&self) -> Duration { + self.wait_synced_timeout + } + + /// Returns `true` if the current client is on the server + pub(crate) fn is_raw_curp(&self) -> bool { + self.is_raw_curp + } +} diff --git a/crates/curp/src/client/fetch.rs b/crates/curp/src/client/fetch.rs new file mode 100644 index 000000000..1b7f4e187 --- /dev/null +++ b/crates/curp/src/client/fetch.rs @@ -0,0 +1,121 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use curp_external_api::cmd::Command; +use futures::{future, FutureExt, StreamExt}; +use parking_lot::RwLock; +use tonic::Response; +use tracing::warn; +use utils::parking_lot_lock::RwLockMap; + +use crate::{ + quorum, + rpc::{self, connect::ConnectApi, CurpError, FetchClusterRequest, FetchClusterResponse}, +}; + +use super::cluster_state::ClusterState; +use super::config::Config; + +/// Fetch cluster implementation +struct Fetch { + /// The fetch config + config: Config, +} + +impl Fetch { + /// Creates a new `Fetch` + pub(crate) fn new(config: Config) -> Self { + Self { config } + } + + /// Fetch cluster and updates the current state + pub(crate) async fn fetch_cluster( + &self, + state: ClusterState, + ) -> Result { + /// Retry interval + const FETCH_RETRY_INTERVAL: Duration = Duration::from_secs(1); + loop { + let resp = self + .pre_fetch(&state) + .await + .ok_or(CurpError::internal("cluster not available"))?; + let new_members = self.member_addrs(&resp); + let new_connects = self.connect_to(new_members); + let new_state = ClusterState::new( + resp.leader_id + .unwrap_or_else(|| unreachable!("leader id should be Some")) + .into(), + resp.term, + resp.cluster_version, + new_connects, + ); + if self.fetch_term(&new_state).await { + return Ok(new_state); + } + warn!("Fetch cluster failed, sleep for {FETCH_RETRY_INTERVAL:?}"); + tokio::time::sleep(FETCH_RETRY_INTERVAL).await; + } + } + + /// Fetch the term of the cluster. This ensures that the current leader is the latest. + async fn fetch_term(&self, state: &ClusterState) -> bool { + let timeout = self.config.wait_synced_timeout(); + let term = state.term(); + let quorum = state.get_quorum(quorum); + state + .for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }) + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .filter(move |resp| future::ready(resp.term == term)) + .take(quorum) + .count() + .map(move |t| t >= quorum) + .await + } + + /// Prefetch, send fetch cluster request to the cluster and get the + /// config with the greatest quorum. + async fn pre_fetch(&self, state: &ClusterState) -> Option { + let timeout = self.config.wait_synced_timeout(); + let requests = state.for_each_server(|c| async move { + c.fetch_cluster(FetchClusterRequest { linearizable: true }, timeout) + .await + }); + let responses: Vec<_> = requests + .filter_map(|r| future::ready(r.ok())) + .map(Response::into_inner) + .collect() + .await; + responses + .into_iter() + .filter(|resp| resp.leader_id.is_some()) + .filter(|resp| !resp.members.is_empty()) + .max_by(|x, y| x.term.cmp(&y.term)) + } + + /// Gets the member addresses to connect to + fn member_addrs(&self, resp: &FetchClusterResponse) -> HashMap> { + if self.config.is_raw_curp() { + resp.clone().into_peer_urls() + } else { + resp.clone().into_client_urls() + } + } + + /// Connect to the given addrs + fn connect_to( + &self, + new_members: HashMap>, + ) -> HashMap> { + new_members + .into_iter() + .map(|(id, addrs)| { + let tls_config = self.config.tls_config().cloned(); + (id, rpc::connect(id, addrs, tls_config)) + }) + .collect() + } +} diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 8cc18ca44..f759f18b9 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -17,6 +17,18 @@ mod retry; /// State for clients mod state; +#[allow(unused)] +/// State of the cluster +mod cluster_state; + +#[allow(unused)] +/// Client cluster fetch implementation +mod fetch; + +#[allow(unused)] +/// Config of the client +mod config; + /// Tests for client #[cfg(test)] mod tests; diff --git a/crates/curp/src/client/unary/mod.rs b/crates/curp/src/client/unary/mod.rs index 90986bdb7..41f510e80 100644 --- a/crates/curp/src/client/unary/mod.rs +++ b/crates/curp/src/client/unary/mod.rs @@ -16,8 +16,8 @@ use tonic::Response; use tracing::{debug, warn}; use super::{ - state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse, - RepeatableClientApi, + cluster_state::ClusterState, config::Config, state::State, ClientApi, LeaderStateUpdate, + ProposeIdGuard, ProposeResponse, RepeatableClientApi, }; use crate::{ members::ServerId, @@ -64,6 +64,13 @@ pub(super) struct Unary { last_sent_seq: AtomicU64, /// marker phantom: PhantomData, + + #[allow(dead_code)] + /// Cluster state + cluster_state: RwLock, + #[allow(dead_code)] + /// Cluster state + client_config: Config, } impl Unary { @@ -75,6 +82,10 @@ impl Unary { tracker: RwLock::new(Tracker::default()), last_sent_seq: AtomicU64::new(0), phantom: PhantomData, + + // TODO: build cluster state + cluster_state: RwLock::default(), + client_config: Config::default(), } }