Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dedup #690

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ retries = 0
slow-timeout = { period = "10s", terminate-after = 3 }
status-level = "all"
final-status-level = "slow"
fail-fast = false
2 changes: 1 addition & 1 deletion crates/curp/proto/common
Submodule common updated 1 files
+1 −0 src/curp-command.proto
52 changes: 48 additions & 4 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ mod state;
#[cfg(test)]
mod tests;

use std::{collections::HashMap, fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc};

use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::{stream::FuturesUnordered, StreamExt};
use parking_lot::RwLock;
use tokio::task::JoinHandle;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
Expand All @@ -45,6 +46,7 @@ use crate::{
protocol_client::ProtocolClient, ConfChange, FetchClusterRequest, FetchClusterResponse,
Member, ProposeId, Protocol, ReadState,
},
tracker::Tracker,
};

/// The response of propose command, deserialized from [`crate::rpc::ProposeResponse`] or
Expand Down Expand Up @@ -119,11 +121,43 @@ pub trait ClientApi {
}
}

/// Propose id guard, used to ensure the sequence of propose id is recorded.
struct ProposeIdGuard<'a> {
/// The propose id
propose_id: ProposeId,
/// The tracker
tracker: &'a RwLock<Tracker>,
}

impl Deref for ProposeIdGuard<'_> {
type Target = ProposeId;

fn deref(&self) -> &Self::Target {
&self.propose_id
}
}

impl<'a> ProposeIdGuard<'a> {
/// Create a new propose id guard
fn new(tracker: &'a RwLock<Tracker>, propose_id: ProposeId) -> Self {
Self {
propose_id,
tracker,
}
}
}

impl Drop for ProposeIdGuard<'_> {
fn drop(&mut self) {
let _ig = self.tracker.write().record(self.propose_id.1);
}
}

/// This trait override some unrepeatable methods in ClientApi, and a client with this trait will be able to retry.
#[async_trait]
trait RepeatableClientApi: ClientApi {
/// Generate a unique propose id during the retry process.
fn gen_propose_id(&self) -> Result<ProposeId, Self::Error>;
fn gen_propose_id(&self) -> Result<ProposeIdGuard<'_>, Self::Error>;

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
Expand Down Expand Up @@ -352,6 +386,14 @@ impl ClientBuilder {
})
}

/// Wait for client id
async fn wait_for_client_id(&self, state: Arc<state::State>) {
while state.client_id() == 0 {
tokio::time::sleep(*self.config.propose_timeout()).await;
debug!("waiting for client_id");
}
}

/// Build the client
///
/// # Errors
Expand All @@ -368,8 +410,9 @@ impl ClientBuilder {
let client = Retry::new(
Unary::new(Arc::clone(&state), self.init_unary_config()),
self.init_retry_config(),
Some(self.spawn_bg_tasks(state)),
Some(self.spawn_bg_tasks(Arc::clone(&state))),
);
self.wait_for_client_id(state).await;
Ok(client)
}
}
Expand All @@ -393,8 +436,9 @@ impl<P: Protocol> ClientBuilderWithBypass<P> {
let client = Retry::new(
Unary::new(Arc::clone(&state), self.inner.init_unary_config()),
self.inner.init_retry_config(),
Some(self.inner.spawn_bg_tasks(state)),
Some(self.inner.spawn_bg_tasks(Arc::clone(&state))),
);
self.inner.wait_for_client_id(state).await;
Ok(client)
}
}
15 changes: 8 additions & 7 deletions crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{ops::SubAssign, time::Duration};
use async_trait::async_trait;
use futures::Future;
use tokio::task::JoinHandle;
use tracing::warn;
use tracing::{info, warn};

use super::{ClientApi, LeaderStateUpdate, ProposeResponse, RepeatableClientApi};
use crate::{
Expand Down Expand Up @@ -110,6 +110,7 @@ pub(super) struct Retry<Api> {
impl<Api> Drop for Retry<Api> {
fn drop(&mut self) {
if let Some(handle) = self.bg_handle.as_ref() {
info!("stopping background task");
handle.abort();
}
}
Expand Down Expand Up @@ -147,14 +148,14 @@ where
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeNotExists(_)
| CurpError::ExpiredClientId(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::LearnerNotCatchUp(_) => {
return Err(tonic::Status::from(err));
}

// some errors that could have a retry
CurpError::ExpiredClientId(_)
| CurpError::KeyConflict(_)
CurpError::KeyConflict(_)
| CurpError::Internal(_)
| CurpError::LeaderTransfer(_) => {}

Expand Down Expand Up @@ -218,7 +219,7 @@ where
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path)
RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path)
})
.await
}
Expand All @@ -231,15 +232,15 @@ where
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
let changes_c = changes.clone();
RepeatableClientApi::propose_conf_change(client, propose_id, changes_c)
RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c)
})
.await
}

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, propose_id))
self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id))
.await
}

Expand All @@ -256,7 +257,7 @@ where
let node_client_urls_c = node_client_urls.clone();
RepeatableClientApi::propose_publish(
client,
propose_id,
*propose_id,
node_id,
name_c,
node_client_urls_c,
Expand Down
26 changes: 25 additions & 1 deletion crates/curp/src/client/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap, HashSet},
sync::{atomic::AtomicU64, Arc},
time::Duration,
};

use event_listener::Event;
use futures::{stream::FuturesUnordered, Future};
use rand::seq::IteratorRandom;
use tokio::sync::RwLock;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
Expand All @@ -18,7 +20,7 @@ use crate::{
rpc::{
self,
connect::{BypassedConnect, ConnectApi},
CurpError, FetchClusterResponse, Protocol,
CurpError, FetchClusterRequest, FetchClusterResponse, Protocol,
},
};

Expand Down Expand Up @@ -127,6 +129,28 @@ impl State {
}
}

/// Choose a random server to try to refresh the state
/// Use when the current leader is missing.
pub(crate) async fn try_refresh_state(&self) -> Result<(), CurpError> {
/// The timeout for refreshing the state
const REFRESH_TIMEOUT: Duration = Duration::from_secs(1);

let rand_conn = {
let state = self.mutable.read().await;
state
.connects
.values()
.choose(&mut rand::thread_rng())
.map(Arc::clone)
.ok_or_else(CurpError::wrong_cluster_version)?
};
let resp = rand_conn
.fetch_cluster(FetchClusterRequest::default(), REFRESH_TIMEOUT)
.await?;
self.check_and_update(&resp.into_inner()).await?;
Ok(())
}

/// Get the local server connection
pub(super) async fn local_connect(&self) -> Option<Arc<dyn ConnectApi>> {
let id = self.immutable.local_server?;
Expand Down
22 changes: 14 additions & 8 deletions crates/curp/src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub(super) struct Streaming {
config: StreamingConfig,
}

/// Prevent lock contention when leader crashed or some unknown errors
const RETRY_DELAY: Duration = Duration::from_millis(100);

impl Streaming {
/// Create a stream client
pub(super) fn new(state: Arc<State>, config: StreamingConfig) -> Self {
Expand All @@ -44,8 +47,11 @@ impl Streaming {
) -> Result<R, CurpError> {
loop {
let Some(leader_id) = self.state.leader_id().await else {
debug!("cannot find the leader id in state, wait for leadership update");
self.state.leader_notifier().listen().await;
warn!(
"cannot find leader_id, refreshing state..."
);
let _ig = self.state.try_refresh_state().await;
tokio::time::sleep(RETRY_DELAY).await;
continue;
};
if let Some(local_id) = self.state.local_server_id() {
Expand All @@ -62,9 +68,6 @@ impl Streaming {

/// Keep heartbeat
pub(super) async fn keep_heartbeat(&self) {
/// Prevent lock contention when leader crashed or some unknown errors
const RETRY_DELAY: Duration = Duration::from_millis(100);

loop {
let heartbeat = self.map_remote_leader::<(), _>(|conn| async move {
loop {
Expand All @@ -85,9 +88,12 @@ impl Streaming {
);
self.state.leader_notifier().listen().await;
}
CurpError::ShuttingDown(_) => {
debug!("shutting down stream client background task");
break Err(err);
CurpError::RpcTransport(_) => {
warn!(
"got rpc transport error when keep heartbeat, refreshing state..."
);
let _ig = self.state.try_refresh_state().await;
tokio::time::sleep(RETRY_DELAY).await;
}
_ => {
warn!("got unexpected error {err:?} when keep heartbeat, retrying...");
Expand Down
1 change: 0 additions & 1 deletion crates/curp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,6 @@ async fn test_retry_propose_return_no_retry_error() {
#[tokio::test]
async fn test_retry_propose_return_retry_error() {
for early_err in [
CurpError::expired_client_id(),
CurpError::key_conflict(),
CurpError::RpcTransport(()),
CurpError::internal("No reason"),
Expand Down
Loading
Loading