diff --git a/.config/nextest.toml b/.config/nextest.toml index fa2933367..b3525f4bf 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -3,3 +3,4 @@ retries = 0 slow-timeout = { period = "10s", terminate-after = 3 } status-level = "all" final-status-level = "slow" +fail-fast = false diff --git a/crates/curp/proto/common b/crates/curp/proto/common index 7e2813c48..4993bdfae 160000 --- a/crates/curp/proto/common +++ b/crates/curp/proto/common @@ -1 +1 @@ -Subproject commit 7e2813c48513235e87e64b9f23fe933c9a13cec4 +Subproject commit 4993bdfaebd750e8c1fd60471302ed1e30dd9e58 diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 17ded1a7d..dea988087 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -21,11 +21,12 @@ mod state; #[cfg(test)] mod tests; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc}; use async_trait::async_trait; use curp_external_api::cmd::Command; use futures::{stream::FuturesUnordered, StreamExt}; +use parking_lot::RwLock; use tokio::task::JoinHandle; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; @@ -45,6 +46,7 @@ use crate::{ protocol_client::ProtocolClient, ConfChange, FetchClusterRequest, FetchClusterResponse, Member, ProposeId, Protocol, ReadState, }, + tracker::Tracker, }; /// The response of propose command, deserialized from [`crate::rpc::ProposeResponse`] or @@ -119,11 +121,43 @@ pub trait ClientApi { } } +/// Propose id guard, used to ensure the sequence of propose id is recorded. +struct ProposeIdGuard<'a> { + /// The propose id + propose_id: ProposeId, + /// The tracker + tracker: &'a RwLock, +} + +impl Deref for ProposeIdGuard<'_> { + type Target = ProposeId; + + fn deref(&self) -> &Self::Target { + &self.propose_id + } +} + +impl<'a> ProposeIdGuard<'a> { + /// Create a new propose id guard + fn new(tracker: &'a RwLock, propose_id: ProposeId) -> Self { + Self { + propose_id, + tracker, + } + } +} + +impl Drop for ProposeIdGuard<'_> { + fn drop(&mut self) { + let _ig = self.tracker.write().record(self.propose_id.1); + } +} + /// This trait override some unrepeatable methods in ClientApi, and a client with this trait will be able to retry. #[async_trait] trait RepeatableClientApi: ClientApi { /// Generate a unique propose id during the retry process. - fn gen_propose_id(&self) -> Result; + fn gen_propose_id(&self) -> Result, Self::Error>; /// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered /// requests (event the requests are commutative). @@ -352,6 +386,14 @@ impl ClientBuilder { }) } + /// Wait for client id + async fn wait_for_client_id(&self, state: Arc) { + while state.client_id() == 0 { + tokio::time::sleep(*self.config.propose_timeout()).await; + debug!("waiting for client_id"); + } + } + /// Build the client /// /// # Errors @@ -368,8 +410,9 @@ impl ClientBuilder { let client = Retry::new( Unary::new(Arc::clone(&state), self.init_unary_config()), self.init_retry_config(), - Some(self.spawn_bg_tasks(state)), + Some(self.spawn_bg_tasks(Arc::clone(&state))), ); + self.wait_for_client_id(state).await; Ok(client) } } @@ -393,8 +436,9 @@ impl ClientBuilderWithBypass

{ let client = Retry::new( Unary::new(Arc::clone(&state), self.inner.init_unary_config()), self.inner.init_retry_config(), - Some(self.inner.spawn_bg_tasks(state)), + Some(self.inner.spawn_bg_tasks(Arc::clone(&state))), ); + self.inner.wait_for_client_id(state).await; Ok(client) } } diff --git a/crates/curp/src/client/retry.rs b/crates/curp/src/client/retry.rs index 346248c0e..a07d5ea3e 100644 --- a/crates/curp/src/client/retry.rs +++ b/crates/curp/src/client/retry.rs @@ -3,7 +3,7 @@ use std::{ops::SubAssign, time::Duration}; use async_trait::async_trait; use futures::Future; use tokio::task::JoinHandle; -use tracing::warn; +use tracing::{info, warn}; use super::{ClientApi, LeaderStateUpdate, ProposeResponse, RepeatableClientApi}; use crate::{ @@ -110,6 +110,7 @@ pub(super) struct Retry { impl Drop for Retry { fn drop(&mut self) { if let Some(handle) = self.bg_handle.as_ref() { + info!("stopping background task"); handle.abort(); } } @@ -147,14 +148,14 @@ where | CurpError::ShuttingDown(_) | CurpError::InvalidConfig(_) | CurpError::NodeNotExists(_) + | CurpError::ExpiredClientId(_) | CurpError::NodeAlreadyExists(_) | CurpError::LearnerNotCatchUp(_) => { return Err(tonic::Status::from(err)); } // some errors that could have a retry - CurpError::ExpiredClientId(_) - | CurpError::KeyConflict(_) + CurpError::KeyConflict(_) | CurpError::Internal(_) | CurpError::LeaderTransfer(_) => {} @@ -218,7 +219,7 @@ where ) -> Result, tonic::Status> { let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { - RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path) + RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path) }) .await } @@ -231,7 +232,7 @@ where let propose_id = self.inner.gen_propose_id()?; self.retry::<_, _>(|client| { let changes_c = changes.clone(); - RepeatableClientApi::propose_conf_change(client, propose_id, changes_c) + RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c) }) .await } @@ -239,7 +240,7 @@ where /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), tonic::Status> { let propose_id = self.inner.gen_propose_id()?; - self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, propose_id)) + self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id)) .await } @@ -256,7 +257,7 @@ where let node_client_urls_c = node_client_urls.clone(); RepeatableClientApi::propose_publish( client, - propose_id, + *propose_id, node_id, name_c, node_client_urls_c, diff --git a/crates/curp/src/client/state.rs b/crates/curp/src/client/state.rs index e86b7b1ba..d04109990 100644 --- a/crates/curp/src/client/state.rs +++ b/crates/curp/src/client/state.rs @@ -2,10 +2,12 @@ use std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap, HashSet}, sync::{atomic::AtomicU64, Arc}, + time::Duration, }; use event_listener::Event; use futures::{stream::FuturesUnordered, Future}; +use rand::seq::IteratorRandom; use tokio::sync::RwLock; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; @@ -18,7 +20,7 @@ use crate::{ rpc::{ self, connect::{BypassedConnect, ConnectApi}, - CurpError, FetchClusterResponse, Protocol, + CurpError, FetchClusterRequest, FetchClusterResponse, Protocol, }, }; @@ -127,6 +129,28 @@ impl State { } } + /// Choose a random server to try to refresh the state + /// Use when the current leader is missing. + pub(crate) async fn try_refresh_state(&self) -> Result<(), CurpError> { + /// The timeout for refreshing the state + const REFRESH_TIMEOUT: Duration = Duration::from_secs(1); + + let rand_conn = { + let state = self.mutable.read().await; + state + .connects + .values() + .choose(&mut rand::thread_rng()) + .map(Arc::clone) + .ok_or_else(CurpError::wrong_cluster_version)? + }; + let resp = rand_conn + .fetch_cluster(FetchClusterRequest::default(), REFRESH_TIMEOUT) + .await?; + self.check_and_update(&resp.into_inner()).await?; + Ok(()) + } + /// Get the local server connection pub(super) async fn local_connect(&self) -> Option> { let id = self.immutable.local_server?; diff --git a/crates/curp/src/client/stream.rs b/crates/curp/src/client/stream.rs index 3cf790b58..853a7b25d 100644 --- a/crates/curp/src/client/stream.rs +++ b/crates/curp/src/client/stream.rs @@ -30,6 +30,9 @@ pub(super) struct Streaming { config: StreamingConfig, } +/// Prevent lock contention when leader crashed or some unknown errors +const RETRY_DELAY: Duration = Duration::from_millis(100); + impl Streaming { /// Create a stream client pub(super) fn new(state: Arc, config: StreamingConfig) -> Self { @@ -44,8 +47,11 @@ impl Streaming { ) -> Result { loop { let Some(leader_id) = self.state.leader_id().await else { - debug!("cannot find the leader id in state, wait for leadership update"); - self.state.leader_notifier().listen().await; + warn!( + "cannot find leader_id, refreshing state..." + ); + let _ig = self.state.try_refresh_state().await; + tokio::time::sleep(RETRY_DELAY).await; continue; }; if let Some(local_id) = self.state.local_server_id() { @@ -62,9 +68,6 @@ impl Streaming { /// Keep heartbeat pub(super) async fn keep_heartbeat(&self) { - /// Prevent lock contention when leader crashed or some unknown errors - const RETRY_DELAY: Duration = Duration::from_millis(100); - loop { let heartbeat = self.map_remote_leader::<(), _>(|conn| async move { loop { @@ -85,9 +88,12 @@ impl Streaming { ); self.state.leader_notifier().listen().await; } - CurpError::ShuttingDown(_) => { - debug!("shutting down stream client background task"); - break Err(err); + CurpError::RpcTransport(_) => { + warn!( + "got rpc transport error when keep heartbeat, refreshing state..." + ); + let _ig = self.state.try_refresh_state().await; + tokio::time::sleep(RETRY_DELAY).await; } _ => { warn!("got unexpected error {err:?} when keep heartbeat, retrying..."); diff --git a/crates/curp/src/client/tests.rs b/crates/curp/src/client/tests.rs index a29104aec..a1cc4e51a 100644 --- a/crates/curp/src/client/tests.rs +++ b/crates/curp/src/client/tests.rs @@ -680,7 +680,6 @@ async fn test_retry_propose_return_no_retry_error() { #[tokio::test] async fn test_retry_propose_return_retry_error() { for early_err in [ - CurpError::expired_client_id(), CurpError::key_conflict(), CurpError::RpcTransport(()), CurpError::internal("No reason"), diff --git a/crates/curp/src/client/unary.rs b/crates/curp/src/client/unary.rs index d0c7de17d..a5d8d04ca 100644 --- a/crates/curp/src/client/unary.rs +++ b/crates/curp/src/client/unary.rs @@ -1,12 +1,22 @@ -use std::{cmp::Ordering, marker::PhantomData, ops::AddAssign, sync::Arc, time::Duration}; +use std::{ + cmp::Ordering, + marker::PhantomData, + ops::AddAssign, + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; use async_trait::async_trait; use curp_external_api::cmd::Command; use futures::{Future, StreamExt}; +use parking_lot::RwLock; use tonic::Response; use tracing::{debug, warn}; -use super::{state::State, ClientApi, LeaderStateUpdate, ProposeResponse, RepeatableClientApi}; +use super::{ + state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse, + RepeatableClientApi, +}; use crate::{ members::ServerId, quorum, @@ -16,6 +26,7 @@ use crate::{ ProposeRequest, PublishRequest, ReadState, ShutdownRequest, WaitSyncedRequest, }, super_quorum, + tracker::Tracker, }; /// The unary client config @@ -45,6 +56,10 @@ pub(super) struct Unary { state: Arc, /// Unary config config: UnaryConfig, + /// Request tracker + tracker: RwLock, + /// Last sent sequence number + last_sent_seq: AtomicU64, /// marker phantom: PhantomData, } @@ -55,6 +70,8 @@ impl Unary { Self { state, config, + tracker: RwLock::new(Tracker::default()), + last_sent_seq: AtomicU64::new(0), phantom: PhantomData, } } @@ -85,7 +102,12 @@ impl Unary { cmd: &C, token: Option<&String>, ) -> Result, CurpError> { - let req = ProposeRequest::new(propose_id, cmd, self.state.cluster_version().await); + let req = ProposeRequest::new( + propose_id, + cmd, + self.state.cluster_version().await, + self.tracker.read().first_incomplete(), + ); let timeout = self.config.propose_timeout; let mut responses = self @@ -191,7 +213,8 @@ impl Unary { /// New a seq num and record it #[allow(clippy::unused_self)] // TODO: implement request tracker fn new_seq_num(&self) -> u64 { - rand::random() + self.last_sent_seq + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) } } @@ -212,7 +235,7 @@ impl ClientApi for Unary { use_fast_path: bool, ) -> Result, CurpError> { let propose_id = self.gen_propose_id()?; - RepeatableClientApi::propose(self, propose_id, cmd, token, use_fast_path).await + RepeatableClientApi::propose(self, *propose_id, cmd, token, use_fast_path).await } /// Send propose configuration changes to the cluster @@ -221,13 +244,13 @@ impl ClientApi for Unary { changes: Vec, ) -> Result, CurpError> { let propose_id = self.gen_propose_id()?; - RepeatableClientApi::propose_conf_change(self, propose_id, changes).await + RepeatableClientApi::propose_conf_change(self, *propose_id, changes).await } /// Send propose to shutdown cluster async fn propose_shutdown(&self) -> Result<(), CurpError> { let propose_id = self.gen_propose_id()?; - RepeatableClientApi::propose_shutdown(self, propose_id).await + RepeatableClientApi::propose_shutdown(self, *propose_id).await } /// Send propose to publish a node id and name @@ -238,8 +261,14 @@ impl ClientApi for Unary { node_client_urls: Vec, ) -> Result<(), Self::Error> { let propose_id = self.gen_propose_id()?; - RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name, node_client_urls) - .await + RepeatableClientApi::propose_publish( + self, + *propose_id, + node_id, + node_name, + node_client_urls, + ) + .await } /// Send move leader request @@ -382,10 +411,13 @@ impl ClientApi for Unary { #[async_trait] impl RepeatableClientApi for Unary { /// Generate a unique propose id during the retry process. - fn gen_propose_id(&self) -> Result { + fn gen_propose_id(&self) -> Result, Self::Error> { let client_id = self.state.client_id(); let seq_num = self.new_seq_num(); - Ok(ProposeId(client_id, seq_num)) + Ok(ProposeIdGuard::new( + &self.tracker, + ProposeId(client_id, seq_num), + )) } /// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered diff --git a/crates/curp/src/rpc/mod.rs b/crates/curp/src/rpc/mod.rs index bcfec2acd..655afe5fe 100644 --- a/crates/curp/src/rpc/mod.rs +++ b/crates/curp/src/rpc/mod.rs @@ -138,11 +138,17 @@ impl FetchClusterResponse { impl ProposeRequest { /// Create a new `Propose` request #[inline] - pub fn new(propose_id: ProposeId, cmd: &C, cluster_version: u64) -> Self { + pub fn new( + propose_id: ProposeId, + cmd: &C, + cluster_version: u64, + first_incomplete: u64, + ) -> Self { Self { propose_id: Some(propose_id.into()), command: cmd.encode(), cluster_version, + first_incomplete, } } diff --git a/crates/curp/src/server/cmd_board.rs b/crates/curp/src/server/cmd_board.rs index f41d7237e..ddfc2a491 100644 --- a/crates/curp/src/server/cmd_board.rs +++ b/crates/curp/src/server/cmd_board.rs @@ -1,3 +1,5 @@ +#![allow(unused)] // TODO remove + use std::{collections::HashMap, sync::Arc}; use event_listener::{Event, EventListener}; @@ -5,7 +7,7 @@ use indexmap::{IndexMap, IndexSet}; use parking_lot::RwLock; use utils::parking_lot_lock::RwLockMap; -use crate::{cmd::Command, rpc::ProposeId}; +use crate::{cmd::Command, rpc::ProposeId, tracker::Tracker}; /// Ref to the cmd board pub(super) type CmdBoardRef = Arc>>; @@ -21,10 +23,10 @@ pub(super) struct CommandBoard { shutdown_notifier: Event, /// Store all notifiers for conf change results conf_notifier: HashMap, + /// The result trackers track all cmd, this is used for dedup + pub(super) trackers: HashMap, /// Store all conf change propose ids pub(super) conf_buffer: IndexSet, - /// The cmd has been received before, this is used for dedup - pub(super) sync: IndexSet, /// Store all execution results pub(super) er_buffer: IndexMap>, /// Store all after sync results @@ -38,7 +40,7 @@ impl CommandBoard { er_notifiers: HashMap::new(), asr_notifiers: HashMap::new(), shutdown_notifier: Event::new(), - sync: IndexSet::new(), + trackers: HashMap::new(), er_buffer: IndexMap::new(), asr_buffer: IndexMap::new(), conf_notifier: HashMap::new(), @@ -46,6 +48,18 @@ impl CommandBoard { } } + /// Get the tracker for a client id + pub(super) fn tracker(&mut self, client_id: u64) -> &mut Tracker { + self.trackers + .entry(client_id) + .or_insert_with(Tracker::default) + } + + /// Remove client result tracker from trackers if it is expired + pub(super) fn client_expired(&mut self, client_id: u64) { + let _ig = self.trackers.remove(&client_id); + } + /// Release notifiers pub(super) fn release_notifiers(&mut self) { self.er_notifiers @@ -56,10 +70,11 @@ impl CommandBoard { .for_each(|(_, event)| event.notify(usize::MAX)); } - /// Clear + /// Clear, called when leader retires pub(super) fn clear(&mut self) { self.er_buffer.clear(); self.asr_buffer.clear(); + self.trackers.clear(); self.release_notifiers(); } diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 61da4da19..b35e3f132 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -74,15 +74,24 @@ pub(super) struct CurpNode { /// Handlers for clients impl CurpNode { /// Handle `Propose` requests - pub(super) async fn propose(&self, req: ProposeRequest) -> Result { + pub(super) async fn propose( + &self, + req: ProposeRequest, + bypassed: bool, + ) -> Result { if self.curp.is_shutdown() { return Err(CurpError::shutting_down()); } let id = req.propose_id(); self.check_cluster_version(req.cluster_version)?; let cmd: Arc = Arc::new(req.cmd()?); + if bypassed { + self.curp.mark_client_id_bypassed(id.0); + } // handle proposal - let sp_exec = self.curp.handle_propose(id, Arc::clone(&cmd))?; + let sp_exec = self + .curp + .handle_propose(id, Arc::clone(&cmd), req.first_incomplete)?; // if speculatively executed, wait for the result and return if sp_exec { @@ -97,8 +106,12 @@ impl CurpNode { pub(super) async fn shutdown( &self, req: ShutdownRequest, + bypassed: bool, ) -> Result { self.check_cluster_version(req.cluster_version)?; + if bypassed { + self.curp.mark_client_id_bypassed(req.propose_id().0); + } self.curp.handle_shutdown(req.propose_id())?; CommandBoard::wait_for_shutdown_synced(&self.cmd_board).await; Ok(ShutdownResponse::default()) @@ -108,9 +121,13 @@ impl CurpNode { pub(super) async fn propose_conf_change( &self, req: ProposeConfChangeRequest, + bypassed: bool, ) -> Result { self.check_cluster_version(req.cluster_version)?; let id = req.propose_id(); + if bypassed { + self.curp.mark_client_id_bypassed(id.0); + } self.curp.handle_propose_conf_change(id, req.changes)?; CommandBoard::wait_for_conf(&self.cmd_board, id).await; let members = self.curp.cluster().all_members_vec(); @@ -118,7 +135,14 @@ impl CurpNode { } /// Handle `Publish` requests - pub(super) fn publish(&self, req: PublishRequest) -> Result { + pub(super) fn publish( + &self, + req: PublishRequest, + bypassed: bool, + ) -> Result { + if bypassed { + self.curp.mark_client_id_bypassed(req.propose_id().0); + } self.curp.handle_publish(req)?; Ok(PublishResponse::default()) } diff --git a/crates/curp/src/server/gc.rs b/crates/curp/src/server/gc.rs index 1349b3c1c..a744692d5 100644 --- a/crates/curp/src/server/gc.rs +++ b/crates/curp/src/server/gc.rs @@ -34,7 +34,6 @@ pub(super) async fn gc_cmd_board( ) { let mut last_check_len_er = 0; let mut last_check_len_asr = 0; - let mut last_check_len_sync = 0; let mut last_check_len_conf = 0; #[allow(clippy::arithmetic_side_effects)] // introduced by tokio select loop { @@ -58,12 +57,6 @@ pub(super) async fn gc_cmd_board( last_check_len_asr = board.asr_buffer.len(); } - if last_check_len_sync <= board.sync.len() { - let new_sync = board.sync.split_off(last_check_len_sync); - board.sync = new_sync; - last_check_len_sync = board.sync.len(); - } - if last_check_len_conf <= board.conf_buffer.len() { let new_conf = board.conf_buffer.split_off(last_check_len_conf); board.conf_buffer = new_conf; diff --git a/crates/curp/src/server/lease_manager.rs b/crates/curp/src/server/lease_manager.rs index c16381cfb..cb84abb24 100644 --- a/crates/curp/src/server/lease_manager.rs +++ b/crates/curp/src/server/lease_manager.rs @@ -1,8 +1,9 @@ -use std::{cmp::Reverse, ops::Add, sync::Arc, time::Duration}; +use std::{cmp::Reverse, collections::HashSet, ops::Add, sync::Arc, time::Duration}; use parking_lot::RwLock; use priority_queue::PriorityQueue; use tokio::time::Instant; +use tracing::info; /// Ref to lease manager pub(crate) type LeaseManagerRef = Arc>; @@ -14,7 +15,9 @@ const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(8); pub(crate) struct LeaseManager { /// client_id => expired_at /// expiry queue to check the smallest expired_at - pub(super) expiry_queue: PriorityQueue>, + expiry_queue: PriorityQueue>, + /// Bypassed client ids + bypassed: HashSet, } impl LeaseManager { @@ -22,11 +25,15 @@ impl LeaseManager { pub(crate) fn new() -> Self { Self { expiry_queue: PriorityQueue::new(), + bypassed: HashSet::from([12345]), } } /// Check if the client is alive pub(crate) fn check_alive(&self, client_id: u64) -> bool { + if self.bypassed.contains(&client_id) { + return true; + } if let Some(expired_at) = self.expiry_queue.get(&client_id).map(|(_, v)| v.0) { expired_at > Instant::now() } else { @@ -41,7 +48,7 @@ impl LeaseManager { client_id = rand::random(); } let expiry = Instant::now().add(DEFAULT_LEASE_TTL); - let _ig = self.expiry_queue.push(client_id, Reverse(expiry)); + _ = self.expiry_queue.push(client_id, Reverse(expiry)); // gc all expired client id while granting a new client id self.gc_expired(); client_id @@ -53,26 +60,45 @@ impl LeaseManager { if expiry > Instant::now() { return; } - let _ig = self.expiry_queue.pop(); + _ = self.expiry_queue.pop(); } } /// Renew a client id pub(crate) fn renew(&mut self, client_id: u64) { + if self.bypassed.contains(&client_id) { + return; + } let expiry = Instant::now().add(DEFAULT_LEASE_TTL); - let _ig = self + _ = self .expiry_queue .change_priority(&client_id, Reverse(expiry)); } + /// Bypass a client id, the means the client is on the server + pub(crate) fn bypass(&mut self, client_id: u64) { + if self.bypassed.insert(client_id) { + info!("bypassed client_id: {}", client_id); + } + _ = self.expiry_queue.remove(&client_id); + } + /// Clear, called when leader retires pub(crate) fn clear(&mut self) { self.expiry_queue.clear(); + self.bypassed.clear(); + } + + /// Get the online clients count (excluding bypassed clients) + pub(crate) fn online_clients(&self) -> usize { + self.expiry_queue.len() } /// Revoke a lease pub(crate) fn revoke(&mut self, client_id: u64) { - let _ig = self.expiry_queue.remove(&client_id); + _ = self.expiry_queue.remove(&client_id); + _ = self.bypassed.remove(&client_id); + info!("revoked client_id: {}", client_id); } } @@ -88,6 +114,11 @@ mod test { assert!(lm.check_alive(client_id)); lm.revoke(client_id); assert!(!lm.check_alive(client_id)); + + lm.bypass(client_id); + assert!(lm.check_alive(client_id)); + lm.revoke(client_id); + assert!(!lm.check_alive(client_id)); } #[tokio::test] diff --git a/crates/curp/src/server/metrics.rs b/crates/curp/src/server/metrics.rs index 843b8f942..382c3cc0d 100644 --- a/crates/curp/src/server/metrics.rs +++ b/crates/curp/src/server/metrics.rs @@ -117,7 +117,7 @@ impl Metrics { let sp_size = curp.spec_pool().lock().pool.len(); observer.observe_u64(&sp_total, sp_size.numeric_cast(), &[]); - let client_ids = curp.lease_manager().read().expiry_queue.len(); + let client_ids = curp.lease_manager().read().online_clients(); observer.observe_u64(&online_clients, client_ids.numeric_cast(), &[]); }, )?; diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 66470ceec..8d8ee3495 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -16,13 +16,14 @@ use crate::{ members::{ClusterInfo, ServerId}, role_change::RoleChange, rpc::{ - AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, - FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, - InstallSnapshotResponse, LeaseKeepAliveMsg, MoveLeaderRequest, MoveLeaderResponse, - ProposeConfChangeRequest, ProposeConfChangeResponse, ProposeRequest, ProposeResponse, - PublishRequest, PublishResponse, ShutdownRequest, ShutdownResponse, TriggerShutdownRequest, - TriggerShutdownResponse, TryBecomeLeaderNowRequest, TryBecomeLeaderNowResponse, - VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, + connect::Bypass, AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, + FetchClusterResponse, FetchReadStateRequest, FetchReadStateResponse, + InstallSnapshotRequest, InstallSnapshotResponse, LeaseKeepAliveMsg, MoveLeaderRequest, + MoveLeaderResponse, ProposeConfChangeRequest, ProposeConfChangeResponse, ProposeRequest, + ProposeResponse, PublishRequest, PublishResponse, ShutdownRequest, ShutdownResponse, + TriggerShutdownRequest, TriggerShutdownResponse, TryBecomeLeaderNowRequest, + TryBecomeLeaderNowResponse, VoteRequest, VoteResponse, WaitSyncedRequest, + WaitSyncedResponse, }, }; @@ -79,9 +80,10 @@ impl crate::rpc::Protocol for Rpc { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let bypassed = request.metadata().is_bypassed(); request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose(request.into_inner()).await?, + self.inner.propose(request.into_inner(), bypassed).await?, )) } @@ -90,9 +92,10 @@ impl crate::rpc::Protocol for Rpc { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let bypassed = request.metadata().is_bypassed(); request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.shutdown(request.into_inner()).await?, + self.inner.shutdown(request.into_inner(), bypassed).await?, )) } @@ -101,9 +104,12 @@ impl crate::rpc::Protocol for Rpc { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let bypassed = request.metadata().is_bypassed(); request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.propose_conf_change(request.into_inner()).await?, + self.inner + .propose_conf_change(request.into_inner(), bypassed) + .await?, )) } @@ -112,9 +118,10 @@ impl crate::rpc::Protocol for Rpc { &self, request: tonic::Request, ) -> Result, tonic::Status> { + let bypassed = request.metadata().is_bypassed(); request.metadata().extract_span(); Ok(tonic::Response::new( - self.inner.publish(request.into_inner())?, + self.inner.publish(request.into_inner(), bypassed)?, )) } diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index 298677e75..cb89bc45f 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -458,6 +458,10 @@ impl RawCurp { } // Curp handlers +// TODO: Tidy up the handlers +// Possible improvements: +// * split metrics collection from CurpError into a separate function +// * split the handlers into separate modules impl RawCurp { /// Handle `propose` request /// Return `true` if the leader speculatively executed the command @@ -465,6 +469,7 @@ impl RawCurp { &self, propose_id: ProposeId, cmd: Arc, + first_incomplete: u64, ) -> Result { debug!("{} gets proposal for cmd({})", self.id(), propose_id); let mut conflict = self.insert_sp(propose_id, Arc::clone(&cmd)); @@ -482,16 +487,13 @@ impl RawCurp { if self.lst.get_transferee().is_some() { return Err(CurpError::LeaderTransfer("leader transferring".to_owned())); } - if !self - .ctx - .cb - .map_write(|mut cb_w| cb_w.sync.insert(propose_id)) - { - metrics::get() - .proposals_failed - .add(1, &[KeyValue::new("reason", "duplicated proposal")]); - return Err(CurpError::duplicated()); - } + self.deduplicate(propose_id, Some(first_incomplete)) + .map_err(|e| { + metrics::get() + .proposals_failed + .add(1, &[KeyValue::new("reason", "duplicated proposal")]); + e + })?; // leader also needs to check if the cmd conflicts un-synced commands conflict |= self.insert_ucp(propose_id, Arc::clone(&cmd)); @@ -525,6 +527,7 @@ impl RawCurp { if self.lst.get_transferee().is_some() { return Err(CurpError::LeaderTransfer("leader transferring".to_owned())); } + self.deduplicate(propose_id, None)?; let mut log_w = self.log.write(); let entry = log_w .push(st_r.term, propose_id, EntryData::Shutdown) @@ -561,6 +564,8 @@ impl RawCurp { } self.check_new_config(&conf_changes)?; + self.deduplicate(propose_id, None)?; + let mut conflict = self.insert_sp(propose_id, conf_changes.clone()); conflict |= self.insert_ucp(propose_id, conf_changes.clone()); @@ -600,6 +605,9 @@ impl RawCurp { if self.lst.get_transferee().is_some() { return Err(CurpError::leader_transfer("leader transferring")); } + + self.deduplicate(req.propose_id(), None)?; + let mut log_w = self.log.write(); let entry = log_w.push(st_r.term, req.propose_id(), req).map_err(|e| { metrics::get() @@ -1498,6 +1506,12 @@ impl RawCurp { None } + /// Mark a client id as bypassed + pub(super) fn mark_client_id_bypassed(&self, client_id: u64) { + let mut lm_w = self.ctx.lm.write(); + lm_w.bypass(client_id); + } + /// Get client tls config pub(super) fn client_tls_config(&self) -> Option<&ClientTlsConfig> { self.ctx.client_tls_config.as_ref() @@ -1709,12 +1723,11 @@ impl RawCurp { }) .collect_vec(); - let mut cb_w = self.ctx.cb.write(); + // let mut cb_w = self.ctx.cb.write(); let mut sp_l = self.ctx.sp.lock(); let term = st.term; for entry in recovered_cmds { - let _ig_sync = cb_w.sync.insert(entry.id); // may have been inserted before let _ig_spec = sp_l.insert(entry.clone()); // may have been inserted before #[allow(clippy::expect_used)] let entry = log @@ -1915,4 +1928,37 @@ impl RawCurp { conflict_uncommitted }) } + + /// Process deduplication and acknowledge the `first_incomplete` for this client id + fn deduplicate( + &self, + ProposeId(client_id, seq_num): ProposeId, + first_incomplete: Option, + ) -> Result<(), CurpError> { + // deduplication + if self.ctx.lm.read().check_alive(client_id) { + let mut cb_w = self.ctx.cb.write(); + let tracker = cb_w.tracker(client_id); + if tracker.only_record(seq_num) { + // TODO: obtain the previous ER from cmd_board and packed into CurpError::Duplicated as an entry. + return Err(CurpError::duplicated()); + } + if let Some(first_incomplete) = first_incomplete { + let before = tracker.first_incomplete(); + if tracker.must_advance_to(first_incomplete) { + for seq_num_ack in before..first_incomplete { + self.ack(ProposeId(client_id, seq_num_ack)); + } + } + } + } else { + self.ctx.cb.write().client_expired(client_id); + return Err(CurpError::expired_client_id()); + } + Ok(()) + } + + /// Acknowledge the propose id and GC it's cmd board result + #[allow(clippy::unused_self)] // TODO refactor cmd board gc + fn ack(&self, _id: ProposeId) {} } diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 827213497..40ed584ca 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -1,5 +1,3 @@ -use std::{cmp::Reverse, ops::Add, time::Duration}; - use curp_test_utils::{mock_role_change, test_cmd::TestCommand, TEST_CLIENT_ID}; use test_macros::abort_on_panic; use tokio::{ @@ -22,6 +20,7 @@ use crate::{ raw_curp::UncommittedPool, spec_pool::SpeculativePool, }, + tracker::Tracker, LogIndex, }; @@ -76,11 +75,8 @@ impl RawCurp { .unwrap(); let curp_storage = Arc::new(DB::open(&curp_config.engine_cfg).unwrap()); - // grant a infinity expiry lease for test client id - lease_manager.write().expiry_queue.push( - TEST_CLIENT_ID, - Reverse(Instant::now().add(Duration::from_nanos(u64::MAX))), - ); + // bypass test client id + lease_manager.write().bypass(TEST_CLIENT_ID); Self::builder() .cluster_info(cluster_info) @@ -106,6 +102,16 @@ impl RawCurp { self.ctx.connects.entry(id).and_modify(|c| *c = connect); } + pub(crate) fn tracker(&self, client_id: u64) -> Tracker { + self.ctx + .cb + .read() + .trackers + .get(&client_id) + .cloned() + .unwrap_or_else(|| unreachable!("cannot find {client_id} in result trackers")) + } + /// Add a new cmd to the log, will return log entry index pub(crate) fn push_cmd(&self, propose_id: ProposeId, cmd: Arc) -> LogIndex { let st_r = self.st.read(); @@ -142,7 +148,7 @@ fn leader_handle_propose_will_succeed() { }; let cmd = Arc::new(TestCommand::default()); assert!(curp - .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd) + .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd, 0) .unwrap()); } @@ -158,16 +164,16 @@ fn leader_handle_propose_will_reject_conflicted() { let cmd1 = Arc::new(TestCommand::new_put(vec![1], 0)); assert!(curp - .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd1) + .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd1, 0) .unwrap()); let cmd2 = Arc::new(TestCommand::new_put(vec![1, 2], 1)); - let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 1), cmd2); + let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 1), cmd2, 1); assert!(matches!(res, Err(CurpError::KeyConflict(_)))); // leader will also reject cmds that conflict un-synced cmds let cmd3 = Arc::new(TestCommand::new_put(vec![2], 1)); - let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 2), cmd3); + let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 2), cmd3, 2); assert!(matches!(res, Err(CurpError::KeyConflict(_)))); } @@ -182,10 +188,10 @@ fn leader_handle_propose_will_reject_duplicated() { }; let cmd = Arc::new(TestCommand::default()); assert!(curp - .handle_propose(ProposeId(TEST_CLIENT_ID, 0), Arc::clone(&cmd)) + .handle_propose(ProposeId(TEST_CLIENT_ID, 0), Arc::clone(&cmd), 0) .unwrap()); - let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd); + let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd, 0); assert!(matches!(res, Err(CurpError::Duplicated(_)))); } @@ -208,7 +214,7 @@ fn follower_handle_propose_will_succeed() { curp.update_to_term_and_become_follower(&mut *curp.st.write(), 1); let cmd = Arc::new(TestCommand::new_get(vec![1])); assert!(!curp - .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd) + .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd, 0) .unwrap()); } @@ -232,11 +238,11 @@ fn follower_handle_propose_will_reject_conflicted() { let cmd1 = Arc::new(TestCommand::new_get(vec![1])); assert!(!curp - .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd1) + .handle_propose(ProposeId(TEST_CLIENT_ID, 0), cmd1, 0) .unwrap()); let cmd2 = Arc::new(TestCommand::new_get(vec![1])); - let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 1), cmd2); + let res = curp.handle_propose(ProposeId(TEST_CLIENT_ID, 1), cmd2, 1); assert!(matches!(res, Err(CurpError::KeyConflict(_)))); } @@ -798,10 +804,12 @@ fn leader_retires_should_cleanup() { let _ignore = curp.handle_propose( ProposeId(TEST_CLIENT_ID, 0), Arc::new(TestCommand::new_put(vec![1], 0)), + 0, ); let _ignore = curp.handle_propose( ProposeId(TEST_CLIENT_ID, 1), Arc::new(TestCommand::new_get(vec![1])), + 0, ); curp.leader_retires(); @@ -1209,7 +1217,7 @@ fn leader_will_reject_propose_when_transferring() { let propose_id = ProposeId(0, 0); let cmd = Arc::new(TestCommand::new_put(vec![1], 1)); - let res = curp.handle_propose(propose_id, cmd); + let res = curp.handle_propose(propose_id, cmd, 0); assert!(res.is_err()); } diff --git a/crates/curp/tests/it/main.rs b/crates/curp/tests/it/main.rs index b8174b639..9ce91b3b7 100644 --- a/crates/curp/tests/it/main.rs +++ b/crates/curp/tests/it/main.rs @@ -1,5 +1,3 @@ mod common; -mod read_state; - mod server; diff --git a/crates/curp/tests/it/read_state.rs b/crates/curp/tests/it/read_state.rs deleted file mode 100644 index f47dd303a..000000000 --- a/crates/curp/tests/it/read_state.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::time::Duration; - -use curp::{client::ClientApi, rpc::ReadState}; -use curp_test_utils::{ - init_logger, sleep_millis, - test_cmd::{TestCommand, TestCommandResult}, -}; -use test_macros::abort_on_panic; - -use crate::common::curp_group::CurpGroup; - -#[tokio::test(flavor = "multi_thread")] -#[abort_on_panic] -async fn read_state() { - init_logger(); - let group = CurpGroup::new(3).await; - let put_client = group.new_client().await; - let put_cmd = TestCommand::new_put(vec![0], 0).set_exe_dur(Duration::from_millis(100)); - tokio::spawn(async move { - assert_eq!( - put_client - .propose(&put_cmd, None, true) - .await - .unwrap() - .unwrap() - .0, - TestCommandResult::default(), - ); - }); - sleep_millis(10).await; - let get_client = group.new_client().await; - let res = get_client - .fetch_read_state(&TestCommand::new_get(vec![0])) - .await - .unwrap(); - if let ReadState::Ids(v) = res { - assert_eq!(v.inflight_ids.len(), 1); - } else { - unreachable!( - "expected result should be ReadState::Ids(v) where len(v) = 1, but received {:?}", - res - ); - } - - sleep_millis(500).await; - - let res = get_client - .fetch_read_state(&TestCommand::new_get(vec![0])) - .await - .unwrap(); - if let ReadState::CommitIndex(index) = res { - assert_eq!(index, 1); - } else { - unreachable!( - "expected result should be ReadState::CommitIndex({:?}), but received {:?}", - 1, res - ); - } -} diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index edf890f3c..22669963b 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -11,6 +11,7 @@ use curp::{ use curp_test_utils::{ init_logger, sleep_millis, sleep_secs, test_cmd::{TestCommand, TestCommandResult, TestCommandType}, + TEST_CLIENT_ID, }; use madsim::rand::{thread_rng, Rng}; use test_macros::abort_on_panic; @@ -127,11 +128,12 @@ async fn fast_round_is_slower_than_slow_round() { leader_connect .propose(tonic::Request::new(ProposeRequest { propose_id: Some(ProposeId { - client_id: 0, + client_id: TEST_CLIENT_ID, seq_num: 0, }), command: bincode::serialize(&cmd).unwrap(), cluster_version: 0, + first_incomplete: 0, })) .await .unwrap(); @@ -148,11 +150,12 @@ async fn fast_round_is_slower_than_slow_round() { let resp: ProposeResponse = follower_connect .propose(tonic::Request::new(ProposeRequest { propose_id: Some(ProposeId { - client_id: 0, + client_id: TEST_CLIENT_ID, seq_num: 0, }), command: bincode::serialize(&cmd).unwrap(), cluster_version: 0, + first_incomplete: 0, })) .await .unwrap() @@ -177,11 +180,12 @@ async fn concurrent_cmd_order() { tokio::spawn(async move { c.propose(ProposeRequest { propose_id: Some(ProposeId { - client_id: 0, + client_id: TEST_CLIENT_ID, seq_num: 0, }), command: bincode::serialize(&cmd0).unwrap(), cluster_version: 0, + first_incomplete: 0, }) .await .expect("propose failed"); @@ -191,22 +195,24 @@ async fn concurrent_cmd_order() { let response = leader_connect .propose(ProposeRequest { propose_id: Some(ProposeId { - client_id: 0, + client_id: TEST_CLIENT_ID, seq_num: 1, }), command: bincode::serialize(&cmd1).unwrap(), cluster_version: 0, + first_incomplete: 0, }) .await; assert!(response.is_err()); let response = leader_connect .propose(ProposeRequest { propose_id: Some(ProposeId { - client_id: 0, + client_id: TEST_CLIENT_ID, seq_num: 2, }), command: bincode::serialize(&cmd2).unwrap(), cluster_version: 0, + first_incomplete: 0, }) .await; assert!(response.is_err()); diff --git a/crates/simulation/tests/it/curp/server_recovery.rs b/crates/simulation/tests/it/curp/server_recovery.rs index df251a939..dfab00ffe 100644 --- a/crates/simulation/tests/it/curp/server_recovery.rs +++ b/crates/simulation/tests/it/curp/server_recovery.rs @@ -193,6 +193,7 @@ async fn new_leader_will_recover_spec_cmds_cond1() { }), command: bincode::serialize(&cmd1).unwrap(), cluster_version: 0, + first_incomplete: 0, }; for id in group .all_members @@ -304,6 +305,7 @@ async fn old_leader_will_keep_original_states() { }), command: bincode::serialize(&cmd1).unwrap(), cluster_version: 0, + first_incomplete: 0, }; let mut leader1_connect = group.get_connect(&leader1).await; leader1_connect.propose(req1).await.unwrap(); diff --git a/crates/xline/tests/it/lease_test.rs b/crates/xline/tests/it/lease_test.rs index 2eb20274b..ee5feb676 100644 --- a/crates/xline/tests/it/lease_test.rs +++ b/crates/xline/tests/it/lease_test.rs @@ -50,7 +50,7 @@ async fn test_lease_keep_alive() -> Result<(), Box> { let res = client .lease_client() - .grant(LeaseGrantRequest::new(1)) + .grant(LeaseGrantRequest::new(3)) .await?; let lease_id = res.id; assert!(lease_id > 0); @@ -69,7 +69,7 @@ async fn test_lease_keep_alive() -> Result<(), Box> { let (mut keeper, mut stream) = c.keep_alive(LeaseKeepAliveRequest::new(lease_id)).await?; let handle = tokio::spawn(async move { loop { - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(1500)).await; let _ = keeper.keep_alive(); if let Ok(Some(r)) = stream.message().await { info!("keep alive response: {:?}", r); @@ -83,7 +83,7 @@ async fn test_lease_keep_alive() -> Result<(), Box> { assert_eq!(res.kvs[0].value, b"bar"); handle.abort(); - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_secs(6)).await; let res = client.kv_client().range(RangeRequest::new("foo")).await?; assert_eq!(res.kvs.len(), 0);