Skip to content

Commit

Permalink
refactor: generate inner client from curp server
Browse files Browse the repository at this point in the history
  • Loading branch information
Phoenix500526 committed Jun 6, 2023
1 parent c51eba1 commit 2f0b026
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 40 deletions.
30 changes: 17 additions & 13 deletions curp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{

/// Protocol client
pub struct Client<C: Command> {
/// client id
id: ServerId,
/// local server id. Only use in an inner client.
local_server_id: Option<ServerId>,
/// Current leader and term
state: RwLock<State>,
/// All servers's `Connect`
Expand Down Expand Up @@ -94,12 +94,12 @@ where
/// Create a new protocol client based on the addresses
#[inline]
pub async fn new(
self_id: ServerId,
self_id: Option<ServerId>,
addrs: HashMap<ServerId, String>,
timeout: ClientTimeout,
) -> Self {
Self {
id: self_id,
local_server_id: self_id,
state: RwLock::new(State::new()),
connects: rpc::connect(addrs, None).await,
timeout,
Expand Down Expand Up @@ -508,17 +508,21 @@ where
}

/// Fetch the current leader id and term from the curp server where is on the same node.
/// Note that this method should not be invoked by an outside client.
#[inline]
async fn fetch_local_leader_info(&self) -> Result<(Option<ServerId>, u64), ProposeError> {
let resp = self
.connects
.get(self.id.as_str())
.unwrap_or_else(|| unreachable!("self id {} not found", self.id.as_str()))
.fetch_leader(FetchLeaderRequest::new(), *self.timeout.retry_timeout())
.await?
.into_inner();

Ok((resp.leader_id, resp.term))
if let Some(ref local_server) = self.local_server_id {
let resp = self
.connects
.get(local_server)
.unwrap_or_else(|| unreachable!("self id {} not found", local_server))
.fetch_leader(FetchLeaderRequest::new(), *self.timeout.retry_timeout())
.await?
.into_inner();
Ok((resp.leader_id, resp.term))
} else {
unreachable!("The outer client shouldn't invoke fetch_local_leader_info");
}
}

/// Fetch the current leader id without cache
Expand Down
9 changes: 8 additions & 1 deletion curp/src/server/curp_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::{
time::MissedTickBehavior,
};
use tracing::{debug, error, info, warn};
use utils::config::CurpConfig;
use utils::config::{ClientTimeout, CurpConfig};

use super::{
cmd_board::{CmdBoardRef, CommandBoard},
Expand All @@ -24,6 +24,7 @@ use super::{
storage::{StorageApi, StorageError},
};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::ProposeError,
log_entry::LogEntry,
Expand Down Expand Up @@ -275,6 +276,12 @@ impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {

/// Spawned tasks
impl<C: 'static + Command, RC: RoleChange + 'static> CurpNode<C, RC> {
/// get curp inner client from `CurpNode`
#[inline]
pub(crate) async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.curp.inner_client(client_timeout).await
}

/// Tick periodically
async fn election_task(
curp: Arc<RawCurp<C, RC>>,
Expand Down
12 changes: 11 additions & 1 deletion curp/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ use tokio::{net::TcpListener, sync::broadcast};
use tokio_stream::wrappers::TcpListenerStream;
use tower::filter::FilterLayer;
use tracing::{info, instrument};
use utils::{config::CurpConfig, tracing::Extract};
use utils::{
config::{ClientTimeout, CurpConfig},
tracing::Extract,
};

use self::curp_node::{CurpError, CurpNode};
use crate::{
client::Client,
cmd::{Command, CommandExecutor},
error::ServerError,
members::ClusterMember,
Expand Down Expand Up @@ -295,6 +299,12 @@ impl<C: Command + 'static, RC: RoleChange + 'static> Rpc<C, RC> {
pub fn leader_rx(&self) -> broadcast::Receiver<Option<ServerId>> {
self.inner.leader_rx()
}

/// Get an inner client
#[inline]
pub async fn inner_client(&self, client_timeout: ClientTimeout) -> Client<C> {
self.inner.inner_client(client_timeout).await
}
}

impl From<CurpError> for tonic::Status {
Expand Down
13 changes: 12 additions & 1 deletion curp/src/server/raw_curp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{
log::{log_enabled, Level},
};
use utils::{
config::CurpConfig,
config::{ClientTimeout, CurpConfig},
parking_lot_lock::{MutexMap, RwLockMap},
};

Expand All @@ -39,6 +39,7 @@ use self::{
};
use super::cmd_worker::CEEventTxApi;
use crate::{
client::Client,
cmd::{Command, ProposeId},
error::ProposeError,
log_entry::LogEntry,
Expand Down Expand Up @@ -615,6 +616,16 @@ impl<C: 'static + Command, RC: RoleChange + 'static> RawCurp<C, RC> {
raw_curp
}

/// get curp inner client
pub(crate) async fn inner_client(&self, timeout: ClientTimeout) -> Client<C> {
Client::<C>::new(
Some(self.id().clone()),
self.ctx.cluster_info.all_members(),
timeout,
)
.await
}

/// Create a new `RawCurp`
/// `is_leader` will only take effect when all servers start from a fresh state
#[allow(clippy::too_many_arguments)] // only called once
Expand Down
2 changes: 1 addition & 1 deletion curp/tests/common/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CurpGroup {
}

pub async fn new_client(&self, timeout: ClientTimeout) -> Client<TestCommand> {
Client::<TestCommand>::new("S0".to_owned(), self.all.clone(), timeout).await
Client::<TestCommand>::new(None, self.all.clone(), timeout).await
}

pub fn exe_rxs(
Expand Down
2 changes: 1 addition & 1 deletion xline/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Client {
let name = String::from("client");
let etcd_client =
EtcdClient::connect(all_members.values().cloned().collect_vec(), None).await?;
let curp_client = CurpClient::new(name.clone(), all_members, timeout).await;
let curp_client = CurpClient::new(None, all_members, timeout).await;
Ok(Self {
name,
curp_client,
Expand Down
2 changes: 1 addition & 1 deletion xline/src/server/lease_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
};
return Ok(tonic::Response::new(res));
}
let leader_id = self.client.get_leader_id().await;
let leader_id = self.client.get_leader_id_from_curp().await;
let leader_addr = self.cluster_info.address(&leader_id).unwrap_or_else(|| {
unreachable!(
"The address of leader {} not found in all_members {:?}",
Expand Down
37 changes: 16 additions & 21 deletions xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};

use anyhow::Result;
use clippy_utilities::{Cast, OverflowArithmetic};
use curp::{
client::Client, members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator,
};
use curp::{members::ClusterMember, server::Rpc, ProtocolServer, SnapshotAllocator};
use event_listener::Event;
use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::net::TcpListener;
Expand Down Expand Up @@ -276,14 +274,7 @@ impl XlineServer {
Arc::clone(&auth_revision_gen),
key_pair,
)?;
let client = Arc::new(
Client::<Command>::new(
self.cluster_info.self_id().clone(),
self.cluster_info.all_members(),
self.client_timeout,
)
.await,
);

let index_barrier = Arc::new(IndexBarrier::new());
let id_barrier = Arc::new(IdBarrier::new());

Expand All @@ -305,6 +296,19 @@ impl XlineServer {
_ => unimplemented!(),
};

let curp_server = CurpServer::new(
Arc::clone(&self.cluster_info),
self.is_leader,
ce,
snapshot_allocator,
state,
Arc::clone(&self.curp_cfg),
None,
)
.await;

let client = Arc::new(curp_server.inner_client(self.client_timeout).await);

Ok((
KvServer::new(
kv_storage,
Expand All @@ -331,16 +335,7 @@ impl XlineServer {
AuthServer::new(auth_storage, client, self.cluster_info.self_id().clone()),
WatchServer::new(watcher, Arc::clone(&header_gen)),
MaintenanceServer::new(persistent, header_gen),
CurpServer::new(
Arc::clone(&self.cluster_info),
self.is_leader,
ce,
snapshot_allocator,
state,
Arc::clone(&self.curp_cfg),
None,
)
.await,
curp_server,
))
}
}
Expand Down

0 comments on commit 2f0b026

Please sign in to comment.