Skip to content

Commit

Permalink
feat: implement dedup
Browse files Browse the repository at this point in the history
chore: update curp submodule for dedup
Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com>
  • Loading branch information
bsbds committed Aug 12, 2024
1 parent b767d75 commit d2b93f2
Show file tree
Hide file tree
Showing 22 changed files with 292 additions and 141 deletions.
1 change: 1 addition & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ retries = 0
slow-timeout = { period = "10s", terminate-after = 3 }
status-level = "all"
final-status-level = "slow"
fail-fast = false
2 changes: 1 addition & 1 deletion crates/curp/proto/common
Submodule common updated 1 files
+8 −0 src/curp-command.proto
52 changes: 48 additions & 4 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ mod tests;

#[cfg(madsim)]
use std::sync::atomic::AtomicU64;
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc};

use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::{stream::FuturesUnordered, StreamExt};
use parking_lot::RwLock;
use tokio::task::JoinHandle;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
Expand All @@ -47,6 +48,7 @@ use crate::{
protocol_client::ProtocolClient, ConfChange, FetchClusterRequest, FetchClusterResponse,
Member, ProposeId, Protocol, ReadState,
},
tracker::Tracker,
};

/// The response of propose command, deserialized from [`crate::rpc::ProposeResponse`] or
Expand Down Expand Up @@ -122,11 +124,43 @@ pub trait ClientApi {
}
}

/// Propose id guard, used to ensure the sequence of propose id is recorded.
struct ProposeIdGuard<'a> {
/// The propose id
propose_id: ProposeId,
/// The tracker
tracker: &'a RwLock<Tracker>,
}

impl Deref for ProposeIdGuard<'_> {
type Target = ProposeId;

fn deref(&self) -> &Self::Target {
&self.propose_id
}
}

impl<'a> ProposeIdGuard<'a> {
/// Create a new propose id guard
fn new(tracker: &'a RwLock<Tracker>, propose_id: ProposeId) -> Self {
Self {
propose_id,
tracker,
}
}
}

impl Drop for ProposeIdGuard<'_> {
fn drop(&mut self) {
let _ig = self.tracker.write().record(self.propose_id.1);
}
}

/// This trait override some unrepeatable methods in ClientApi, and a client with this trait will be able to retry.
#[async_trait]
trait RepeatableClientApi: ClientApi {
/// Generate a unique propose id during the retry process.
fn gen_propose_id(&self) -> Result<ProposeId, Self::Error>;
fn gen_propose_id(&self) -> Result<ProposeIdGuard<'_>, Self::Error>;

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
Expand Down Expand Up @@ -355,6 +389,14 @@ impl ClientBuilder {
})
}

/// Wait for client id
async fn wait_for_client_id(&self, state: Arc<state::State>) {
while state.client_id() == 0 {
tokio::time::sleep(*self.config.propose_timeout()).await;
debug!("waiting for client_id");
}
}

/// Build the client
///
/// # Errors
Expand All @@ -371,8 +413,9 @@ impl ClientBuilder {
let client = Retry::new(
Unary::new(Arc::clone(&state), self.init_unary_config()),
self.init_retry_config(),
Some(self.spawn_bg_tasks(state)),
Some(self.spawn_bg_tasks(Arc::clone(&state))),
);
self.wait_for_client_id(state).await;
Ok(client)
}

Expand Down Expand Up @@ -422,8 +465,9 @@ impl<P: Protocol> ClientBuilderWithBypass<P> {
let client = Retry::new(
Unary::new(Arc::clone(&state), self.inner.init_unary_config()),
self.inner.init_retry_config(),
Some(self.inner.spawn_bg_tasks(state)),
Some(self.inner.spawn_bg_tasks(Arc::clone(&state))),
);
self.inner.wait_for_client_id(state).await;
Ok(client)
}
}
11 changes: 6 additions & 5 deletions crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{ops::SubAssign, time::Duration};
use async_trait::async_trait;
use futures::Future;
use tokio::task::JoinHandle;
use tracing::warn;
use tracing::{info, warn};

use super::{ClientApi, LeaderStateUpdate, ProposeResponse, RepeatableClientApi};
use crate::{
Expand Down Expand Up @@ -110,6 +110,7 @@ pub(super) struct Retry<Api> {
impl<Api> Drop for Retry<Api> {
fn drop(&mut self) {
if let Some(handle) = self.bg_handle.as_ref() {
info!("stopping background task");
handle.abort();
}
}
Expand Down Expand Up @@ -225,7 +226,7 @@ where
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path)
RepeatableClientApi::propose(client, *propose_id, cmd, token, use_fast_path)
})
.await
}
Expand All @@ -238,15 +239,15 @@ where
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| {
let changes_c = changes.clone();
RepeatableClientApi::propose_conf_change(client, propose_id, changes_c)
RepeatableClientApi::propose_conf_change(client, *propose_id, changes_c)
})
.await
}

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), tonic::Status> {
let propose_id = self.inner.gen_propose_id()?;
self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, propose_id))
self.retry::<_, _>(|client| RepeatableClientApi::propose_shutdown(client, *propose_id))
.await
}

Expand All @@ -263,7 +264,7 @@ where
let node_client_urls_c = node_client_urls.clone();
RepeatableClientApi::propose_publish(
client,
propose_id,
*propose_id,
node_id,
name_c,
node_client_urls_c,
Expand Down
9 changes: 6 additions & 3 deletions crates/curp/src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ impl Streaming {
);
self.state.leader_notifier().listen().await;
}
CurpError::ShuttingDown(()) => {
debug!("shutting down stream client background task");
break Err(err);
CurpError::RpcTransport(()) => {
warn!(
"got rpc transport error when keep heartbeat, refreshing state..."
);
let _ig = self.state.try_refresh_state().await;
tokio::time::sleep(RETRY_DELAY).await;
}
CurpError::RpcTransport(()) => {
warn!(
Expand Down
1 change: 0 additions & 1 deletion crates/curp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ async fn test_retry_propose_return_no_retry_error() {
#[tokio::test]
async fn test_retry_propose_return_retry_error() {
for early_err in [
CurpError::expired_client_id(),
CurpError::key_conflict(),
CurpError::RpcTransport(()),
CurpError::internal("No reason"),
Expand Down
47 changes: 37 additions & 10 deletions crates/curp/src/client/unary.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use std::{cmp::Ordering, marker::PhantomData, sync::Arc, time::Duration};
use std::{
cmp::Ordering,
marker::PhantomData,
sync::{atomic::AtomicU64, Arc},
time::Duration,
};

use async_trait::async_trait;
use curp_external_api::cmd::Command;
use futures::{future, stream::FuturesUnordered, Future, Stream, StreamExt};
use parking_lot::RwLock;
use tonic::{Response, Status};
use tracing::{debug, warn};

use super::{state::State, ClientApi, LeaderStateUpdate, ProposeResponse, RepeatableClientApi};
use super::{
state::State, ClientApi, LeaderStateUpdate, ProposeIdGuard, ProposeResponse,
RepeatableClientApi,
};
use crate::{
members::ServerId,
quorum,
Expand All @@ -18,6 +27,7 @@ use crate::{
ShutdownRequest,
},
super_quorum,
tracker::Tracker,
};

/// The unary client config
Expand Down Expand Up @@ -48,6 +58,10 @@ pub(super) struct Unary<C: Command> {
state: Arc<State>,
/// Unary config
config: UnaryConfig,
/// Request tracker
tracker: RwLock<Tracker>,
/// Last sent sequence number
last_sent_seq: AtomicU64,
/// marker
phantom: PhantomData<C>,
}
Expand All @@ -58,6 +72,8 @@ impl<C: Command> Unary<C> {
Self {
state,
config,
tracker: RwLock::new(Tracker::default()),
last_sent_seq: AtomicU64::new(0),
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -97,7 +113,8 @@ impl<C: Command> Unary<C> {
/// New a seq num and record it
#[allow(clippy::unused_self)] // TODO: implement request tracker
fn new_seq_num(&self) -> u64 {
rand::random()
self.last_sent_seq
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}

Expand Down Expand Up @@ -169,7 +186,7 @@ impl<C: Command> ClientApi for Unary<C> {
use_fast_path: bool,
) -> Result<ProposeResponse<C>, CurpError> {
let propose_id = self.gen_propose_id()?;
RepeatableClientApi::propose(self, propose_id, cmd, token, use_fast_path).await
RepeatableClientApi::propose(self, *propose_id, cmd, token, use_fast_path).await
}

/// Send propose configuration changes to the cluster
Expand All @@ -178,13 +195,13 @@ impl<C: Command> ClientApi for Unary<C> {
changes: Vec<ConfChange>,
) -> Result<Vec<Member>, CurpError> {
let propose_id = self.gen_propose_id()?;
RepeatableClientApi::propose_conf_change(self, propose_id, changes).await
RepeatableClientApi::propose_conf_change(self, *propose_id, changes).await
}

/// Send propose to shutdown cluster
async fn propose_shutdown(&self) -> Result<(), CurpError> {
let propose_id = self.gen_propose_id()?;
RepeatableClientApi::propose_shutdown(self, propose_id).await
RepeatableClientApi::propose_shutdown(self, *propose_id).await
}

/// Send propose to publish a node id and name
Expand All @@ -195,8 +212,14 @@ impl<C: Command> ClientApi for Unary<C> {
node_client_urls: Vec<String>,
) -> Result<(), Self::Error> {
let propose_id = self.gen_propose_id()?;
RepeatableClientApi::propose_publish(self, propose_id, node_id, node_name, node_client_urls)
.await
RepeatableClientApi::propose_publish(
self,
*propose_id,
node_id,
node_name,
node_client_urls,
)
.await
}

/// Send move leader request
Expand Down Expand Up @@ -339,10 +362,13 @@ impl<C: Command> ClientApi for Unary<C> {
#[async_trait]
impl<C: Command> RepeatableClientApi for Unary<C> {
/// Generate a unique propose id during the retry process.
fn gen_propose_id(&self) -> Result<ProposeId, Self::Error> {
fn gen_propose_id(&self) -> Result<ProposeIdGuard<'_>, Self::Error> {
let client_id = self.state.client_id();
let seq_num = self.new_seq_num();
Ok(ProposeId(client_id, seq_num))
Ok(ProposeIdGuard::new(
&self.tracker,
ProposeId(client_id, seq_num),
))
}

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
Expand All @@ -361,6 +387,7 @@ impl<C: Command> RepeatableClientApi for Unary<C> {
self.state.cluster_version().await,
self.state.term().await,
!use_fast_path,
self.tracker.read().first_incomplete(),
);
let record_req = RecordRequest::new::<C>(propose_id, cmd_arc.as_ref());
let superquorum = super_quorum(self.state.connects_len().await);
Expand Down
2 changes: 2 additions & 0 deletions crates/curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ impl ProposeRequest {
cluster_version: u64,
term: u64,
slow_path: bool,
first_incomplete: u64,
) -> Self {
Self {
propose_id: Some(propose_id.into()),
command: cmd.encode(),
cluster_version,
term,
slow_path,
first_incomplete,
}
}

Expand Down
Loading

0 comments on commit d2b93f2

Please sign in to comment.