diff --git a/curp/proto/message.proto b/curp/proto/message.proto index b291d4c39..741952ef0 100644 --- a/curp/proto/message.proto +++ b/curp/proto/message.proto @@ -28,6 +28,15 @@ message FetchLeaderResponse { uint64 term = 2; } +message FetchClusterRequest { +} + +message FetchClusterResponse { + optional string leader_id = 1; + map all_members = 2; + uint64 term = 3; +} + message WaitSyncedRequest { bytes id = 1; } @@ -105,7 +114,8 @@ service Protocol { rpc WaitSynced (WaitSyncedRequest) returns (WaitSyncedResponse); rpc AppendEntries (AppendEntriesRequest) returns (AppendEntriesResponse); rpc Vote (VoteRequest) returns (VoteResponse); - rpc FetchLeader (FetchLeaderRequest) returns (FetchLeaderResponse); rpc InstallSnapshot (stream InstallSnapshotRequest) returns (InstallSnapshotResponse); + rpc FetchLeader (FetchLeaderRequest) returns (FetchLeaderResponse); + rpc FetchCluster (FetchClusterRequest) returns (FetchClusterResponse); rpc FetchReadState (FetchReadStateRequest) returns (FetchReadStateResponse); } diff --git a/curp/src/client.rs b/curp/src/client.rs index 57c6db0c4..b75e47f84 100644 --- a/curp/src/client.rs +++ b/curp/src/client.rs @@ -1,7 +1,8 @@ use std::{cmp::Ordering, collections::HashMap, fmt::Debug, iter, marker::PhantomData, sync::Arc}; use event_listener::Event; -use futures::{pin_mut, stream::FuturesUnordered, StreamExt}; +use futures::{future::select_all, pin_mut, stream::FuturesUnordered, StreamExt}; +use itertools::Itertools; use parking_lot::RwLock; use tokio::time::timeout; use tracing::{debug, instrument, warn}; @@ -9,22 +10,197 @@ use utils::{config::ClientTimeout, parking_lot_lock::RwLockMap}; use crate::{ cmd::{Command, ProposeId}, - error::{CommandProposeError, CommandSyncError, ProposeError, RpcError, SyncError}, + error::{ + ClientBuildError, CommandProposeError, CommandSyncError, ProposeError, RpcError, SyncError, + }, rpc::{ - self, connect::ConnectApi, FetchLeaderRequest, FetchReadStateRequest, ProposeRequest, + self, connect::ConnectApi, protocol_client::ProtocolClient, FetchClusterRequest, + FetchClusterResponse, FetchLeaderRequest, FetchReadStateRequest, ProposeRequest, ReadState as PbReadState, SyncResult, WaitSyncedRequest, }, LogIndex, ServerId, }; +/// Client builder +#[derive(Debug)] +pub struct Builder { + /// Server addresses + addrs: Vec, + /// Server addresses and ids (used in an inner client) + all_members: HashMap, + /// Local server id (used in an inner client) + local_server_id: Option, + /// Client timeout + timeout: Option, + /// Phantom data + phantom: PhantomData, +} + +impl Default for Builder { + #[inline] + #[must_use] + fn default() -> Self { + Self { + addrs: Vec::new(), + all_members: HashMap::new(), + local_server_id: None, + timeout: None, + phantom: PhantomData, + } + } +} + +impl Builder +where + C: Command + 'static, +{ + /// Create new client builder + #[inline] + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Set addresses of servers + /// # Panics + /// Panics when set `all_members` before + #[inline] + #[must_use] + pub fn addrs(self, addrs: Vec) -> Self { + assert!( + self.all_members.is_empty(), + "Client builder can't set addrs and all_members at the same time" + ); + Self { addrs, ..self } + } + + /// Set server id and all addresses of servers + /// # Panics + /// Panics when set `addrs` before + #[inline] + #[must_use] + pub fn all_members(self, all_members: HashMap) -> Self { + assert!( + self.addrs.is_empty(), + "Client builder can't set addrs and all_members at the same time" + ); + Self { + all_members, + ..self + } + } + + /// Set client timeout + #[inline] + #[must_use] + pub fn timeout(self, timeout: ClientTimeout) -> Self { + Self { + timeout: Some(timeout), + ..self + } + } + + /// Set local server id. (only used in an inner client) + #[inline] + #[must_use] + pub fn local_server_id(self, local_server_id: ServerId) -> Self { + Self { + local_server_id: Some(local_server_id), + ..self + } + } + + /// Build client + /// # Errors + /// Return error when meet rpc error or missing some arguments + #[inline] + pub async fn build(self) -> Result, ClientBuildError> { + let Some(timeout) = self.timeout else { + return Err(ClientBuildError::invalid_aurguments("timeout is required")); + }; + match (self.addrs.is_empty(), self.all_members.is_empty()) { + (false, true) => { + Self::build_from_addrs(timeout, self.local_server_id, self.addrs).await + } + (true, false) => { + Self::build_from_all_members( + timeout, + self.local_server_id, + self.all_members, + None, + 0, + ) + .await + } + (true, true) => Err(ClientBuildError::invalid_aurguments( + "One of addrs and all_members is required", + )), + (false, false) => Err(ClientBuildError::invalid_aurguments( + "Client builder can't set addrs and all_members at the same time", + )), + } + } + + /// Build client from all members + async fn build_from_all_members( + timeout: ClientTimeout, + local_server_id: Option, + all_members: HashMap, + leader_id: Option, + term: u64, + ) -> Result, ClientBuildError> { + let connects = rpc::connect(all_members).await; + let client = Client:: { + local_server_id, + state: RwLock::new(State::new(leader_id, term, connects)), + timeout, + phantom: PhantomData, + }; + Ok(client) + } + + /// Build client from addresses, this method will fetch all members from servers + async fn build_from_addrs( + timeout: ClientTimeout, + local_server_id: Option, + addrs: Vec, + ) -> Result, ClientBuildError> { + let futs = addrs.into_iter().map(|mut addr| { + if !addr.starts_with("http://") { + addr.insert_str(0, "http://"); + } + let timeout = *timeout.propose_timeout(); + let fut = async move { + let mut protocol_client = ProtocolClient::connect(addr).await?; + let mut req = tonic::Request::new(FetchClusterRequest::new()); + req.set_timeout(timeout); + let fetch_cluster_res = protocol_client.fetch_cluster(req).await?.into_inner(); + Ok::(fetch_cluster_res) + }; + Box::pin(fut) + }); + + let res = select_all(futs).await.0?; + + let client = Self::build_from_all_members( + timeout, + local_server_id, + res.all_members, + res.leader_id, + res.term, + ) + .await?; + + Ok(client) + } +} + /// Protocol client pub struct Client { /// local server id. Only use in an inner client. local_server_id: Option, /// Current leader and term state: RwLock, - /// All servers's `Connect` - connects: HashMap>, /// Curp client timeout settings timeout: ClientTimeout, /// To keep Command type @@ -50,15 +226,22 @@ struct State { term: u64, /// When a new leader is set, notify leader_notify: Arc, + /// All servers's `Connect` + connects: HashMap>, } impl State { /// Create the initial client state - fn new() -> Self { + fn new( + leader: Option, + term: u64, + connects: HashMap>, + ) -> Self { Self { - leader: None, - term: 0, + leader, + term, leader_notify: Arc::new(Event::new()), + connects, } } @@ -91,20 +274,11 @@ impl Client where C: Command + 'static, { - /// Create a new protocol client based on the addresses + /// TODO #[inline] - pub async fn new( - self_id: Option, - addrs: HashMap, - timeout: ClientTimeout, - ) -> Self { - Self { - local_server_id: self_id, - state: RwLock::new(State::new()), - connects: rpc::connect(addrs).await, - timeout, - phantom: PhantomData, - } + #[must_use] + pub fn builder() -> Builder { + Builder::new() } /// The fast round of Curp protocol @@ -116,9 +290,15 @@ where ) -> Result<(Option<::ER>, bool), CommandProposeError> { debug!("fast round for cmd({}) started", cmd_arc.id()); let req = ProposeRequest::new(cmd_arc.as_ref()).map_err(Into::::into)?; - let mut rpcs: FuturesUnordered<_> = self - .connects - .values() + + let (connects, superquorum) = self.state.map_read(|state_r| { + ( + state_r.connects.values().cloned().collect_vec(), + superquorum(state_r.connects.len()), + ) + }); + let mut rpcs: FuturesUnordered<_> = connects + .iter() .zip(iter::repeat(req)) .map(|(connect, req_cloned)| { connect.propose(req_cloned, *self.timeout.propose_timeout()) @@ -127,7 +307,6 @@ where let mut ok_cnt: usize = 0; let mut execute_result: Option = None; - let superquorum = superquorum(self.connects.len()); while let Some(resp_result) = rpcs.next().await { let resp = match resp_result { Ok(resp) => resp.into_inner(), @@ -200,9 +379,9 @@ where let leader_id = self.get_leader_id().await; debug!("wait synced request sent to {}", leader_id); + let resp = match self - .connects - .get(&leader_id) + .get_connect(&leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .wait_synced( WaitSyncedRequest::new(cmd.id()).map_err(Into::::into)?, @@ -230,11 +409,12 @@ where term, ))) => { let new_leader = new_leader.and_then(|id| { - let mut state = self.state.write(); - (state.term <= term).then(|| { - state.leader = Some(id.clone()); - state.term = term; - id + self.state.map_write(|mut state| { + (state.term <= term).then(|| { + state.leader = Some(id.clone()); + state.term = term; + id + }) }) }); self.resend_propose(Arc::clone(&cmd), new_leader).await?; // resend the propose to the new leader @@ -269,8 +449,7 @@ where debug!("resend propose to {leader_id}"); let resp = self - .connects - .get(&leader_id) + .get_connect(&leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .propose( ProposeRequest::new(cmd.as_ref())?, @@ -281,8 +460,7 @@ where let resp = match resp { Ok(resp) => { let resp = resp.into_inner(); - #[allow(clippy::pattern_type_mismatch)] // can't satisfy clippy - if let Some(rpc::ExeResult::Error(e)) = resp.exe_result.as_ref() { + if let Some(rpc::ExeResult::Error(ref e)) = resp.exe_result { let err: ProposeError = bincode::deserialize(e)?; if matches!(err, ProposeError::Duplicated) { return Ok(()); @@ -339,9 +517,9 @@ where /// Note: The fetched leader may still be outdated async fn fetch_leader(&self) -> ServerId { loop { - let mut rpcs: FuturesUnordered<_> = self - .connects - .values() + let connects = self.all_connects(); + let mut rpcs: FuturesUnordered<_> = connects + .iter() .map(|connect| async { ( connect.id().clone(), @@ -356,7 +534,7 @@ where let mut ok_cnt = 0; #[allow(clippy::integer_arithmetic)] - let majority_cnt = self.connects.len() / 2 + 1; + let majority_cnt = connects.len() / 2 + 1; while let Some((id, resp)) = rpcs.next().await { let resp = match resp { Ok(resp) => resp.into_inner(), @@ -421,7 +599,6 @@ where /// # Panics /// If leader index is out of bound of all the connections, panic #[inline] - #[allow(clippy::too_many_lines)] // FIXME: split to smaller functions #[instrument(skip_all, fields(cmd_id=%cmd.id()))] pub async fn propose(&self, cmd: C) -> Result> { let cmd_arc = Arc::new(cmd); @@ -464,7 +641,6 @@ where /// # Panics /// If leader index is out of bound of all the connections, panic #[inline] - #[allow(clippy::else_if_without_else)] // the else is redundant #[instrument(skip_all, fields(cmd_id=%cmd.id()))] pub async fn propose_indexed(&self, cmd: C) -> Result<(C::ER, C::ASR), CommandProposeError> { let cmd_arc = Arc::new(cmd); @@ -490,8 +666,7 @@ where let leader_id = self.get_leader_id().await; debug!("fetch read state request sent to {}", leader_id); let resp = match self - .connects - .get(&leader_id) + .get_connect(&leader_id) .unwrap_or_else(|| unreachable!("leader {leader_id} not found")) .fetch_read_state( FetchReadStateRequest::new(cmd)?, @@ -528,8 +703,7 @@ where async fn fetch_local_leader_info(&self) -> Result<(Option, u64), RpcError> { if let Some(ref local_server) = self.local_server_id { let resp = self - .connects - .get(local_server) + .get_connect(local_server) .unwrap_or_else(|| unreachable!("self id {} not found", local_server)) .fetch_leader(FetchLeaderRequest::new(), *self.timeout.retry_timeout()) .await? @@ -548,6 +722,16 @@ where } self.fetch_leader().await } + + /// Get the connect by server id + fn get_connect(&self, id: &ServerId) -> Option> { + self.state.read().connects.get(id).cloned() + } + + /// Get all connects + fn all_connects(&self) -> Vec> { + self.state.read().connects.values().cloned().collect() + } } /// Get the superquorum for curp protocol diff --git a/curp/src/error.rs b/curp/src/error.rs index a9580b819..7f2e6d017 100644 --- a/curp/src/error.rs +++ b/curp/src/error.rs @@ -6,6 +6,42 @@ use thiserror::Error; use crate::{cmd::ProposeId, ServerId}; +/// Error type of client builder +#[allow(clippy::module_name_repetitions)] // this-error generate code false-positive +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum ClientBuildError { + /// Rpc error + #[error("Rpc error: {0}")] + RpcError(String), + /// Invalid arguments + #[error("Invalid arguments: {0}")] + InvalidArguments(String), +} + +impl ClientBuildError { + /// Create a new `ClientBuildError::InvalidArguments` + #[inline] + #[must_use] + pub fn invalid_aurguments(msg: &str) -> Self { + Self::InvalidArguments(msg.to_owned()) + } +} + +impl From for ClientBuildError { + #[inline] + fn from(e: tonic::transport::Error) -> Self { + Self::RpcError(e.to_string()) + } +} + +impl From for ClientBuildError { + #[inline] + fn from(e: tonic::Status) -> Self { + Self::RpcError(e.to_string()) + } +} + /// Server side error #[allow(clippy::module_name_repetitions)] // this-error generate code false-positive #[non_exhaustive] diff --git a/curp/src/rpc/connect.rs b/curp/src/rpc/connect.rs index 987a570fb..6cf50dbb5 100644 --- a/curp/src/rpc/connect.rs +++ b/curp/src/rpc/connect.rs @@ -59,7 +59,7 @@ pub(crate) async fn connect( /// Connect interface #[cfg_attr(test, automock)] #[async_trait] -pub(crate) trait ConnectApi: Send + Sync + 'static { +pub(crate) trait ConnectApi: Send + Sync + 'static + Debug { /// Get server id fn id(&self) -> &ServerId; diff --git a/curp/src/rpc/mod.rs b/curp/src/rpc/mod.rs index ac4a475d4..a21f0535a 100644 --- a/curp/src/rpc/mod.rs +++ b/curp/src/rpc/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use serde::{de::DeserializeOwned, Serialize}; @@ -7,9 +7,9 @@ pub(crate) use self::proto::{ propose_response::ExeResult, protocol_server::Protocol, wait_synced_response::{Success, SyncResult as SyncResultRaw}, - AppendEntriesRequest, AppendEntriesResponse, FetchReadStateRequest, FetchReadStateResponse, - IdSet, InstallSnapshotRequest, InstallSnapshotResponse, VoteRequest, VoteResponse, - WaitSyncedRequest, WaitSyncedResponse, + AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, + FetchReadStateRequest, FetchReadStateResponse, IdSet, InstallSnapshotRequest, + InstallSnapshotResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }; pub use self::proto::{ propose_response, protocol_client, protocol_server::ProtocolServer, FetchLeaderRequest, @@ -59,6 +59,28 @@ impl FetchLeaderResponse { } } +impl FetchClusterRequest { + /// Create a new `FetchClusterRequest` + pub(crate) fn new() -> Self { + Self {} + } +} + +impl FetchClusterResponse { + /// Create a new `FetchClusterResponse` + pub(crate) fn new( + leader_id: Option, + all_members: HashMap, + term: u64, + ) -> Self { + Self { + leader_id, + all_members, + term, + } + } +} + impl ProposeRequest { /// Create a new `Propose` request pub(crate) fn new(cmd: &C) -> bincode::Result { diff --git a/curp/src/server/curp_node.rs b/curp/src/server/curp_node.rs index 54caac77b..0872b199a 100644 --- a/curp/src/server/curp_node.rs +++ b/curp/src/server/curp_node.rs @@ -30,8 +30,9 @@ use crate::{ members::ClusterMember, role_change::RoleChange, rpc::{ - self, connect::ConnectApi, AppendEntriesRequest, AppendEntriesResponse, FetchLeaderRequest, - FetchLeaderResponse, FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, + self, connect::ConnectApi, AppendEntriesRequest, AppendEntriesResponse, + FetchClusterRequest, FetchClusterResponse, FetchLeaderRequest, FetchLeaderResponse, + FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, InstallSnapshotResponse, ProposeRequest, ProposeResponse, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, @@ -193,6 +194,17 @@ impl CurpNode { Ok(FetchLeaderResponse::new(leader_id, term)) } + /// TODO + #[allow(clippy::unnecessary_wraps, clippy::needless_pass_by_value)] // To keep type consistent with other request handlers + pub(super) fn fetch_cluster( + &self, + _req: FetchClusterRequest, + ) -> Result { + let (leader_id, term) = self.curp.leader(); + let all_members = self.curp.cluster().all_members(); + Ok(FetchClusterResponse::new(leader_id, all_members, term)) + } + /// Install snapshot #[allow(clippy::integer_arithmetic)] // can't overflow pub(super) async fn install_snapshot( diff --git a/curp/src/server/mod.rs b/curp/src/server/mod.rs index 7d2879416..de5994936 100644 --- a/curp/src/server/mod.rs +++ b/curp/src/server/mod.rs @@ -18,10 +18,10 @@ use crate::{ members::ClusterMember, role_change::RoleChange, rpc::{ - AppendEntriesRequest, AppendEntriesResponse, FetchLeaderRequest, FetchLeaderResponse, - FetchReadStateRequest, FetchReadStateResponse, InstallSnapshotRequest, - InstallSnapshotResponse, ProposeRequest, ProposeResponse, ProtocolServer, VoteRequest, - VoteResponse, WaitSyncedRequest, WaitSyncedResponse, + AppendEntriesRequest, AppendEntriesResponse, FetchClusterRequest, FetchClusterResponse, + FetchLeaderRequest, FetchLeaderResponse, FetchReadStateRequest, FetchReadStateResponse, + InstallSnapshotRequest, InstallSnapshotResponse, ProposeRequest, ProposeResponse, + ProtocolServer, VoteRequest, VoteResponse, WaitSyncedRequest, WaitSyncedResponse, }, ServerId, SnapshotAllocator, }; @@ -113,6 +113,16 @@ impl crate::rpc::Protocol for Rp )) } + #[instrument(skip_all, name = "curp_fetch_cluster")] + async fn fetch_cluster( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + Ok(tonic::Response::new( + self.inner.fetch_cluster(request.into_inner())?, + )) + } + #[instrument(skip_all, name = "curp_install_snapshot")] async fn install_snapshot( &self, diff --git a/curp/src/server/raw_curp/mod.rs b/curp/src/server/raw_curp/mod.rs index bf979b7ef..0212373eb 100644 --- a/curp/src/server/raw_curp/mod.rs +++ b/curp/src/server/raw_curp/mod.rs @@ -686,6 +686,11 @@ impl RawCurp { self.st.map_read(|st_r| (st_r.leader_id.clone(), st_r.term)) } + /// Get cluster info + pub(super) fn cluster(&self) -> &ClusterMember { + self.ctx.cluster_info.as_ref() + } + /// Get self's id pub(super) fn id(&self) -> &ServerId { self.ctx.cluster_info.self_id() diff --git a/curp/tests/common/curp_group.rs b/curp/tests/common/curp_group.rs index 494552927..bf1669bfa 100644 --- a/curp/tests/common/curp_group.rs +++ b/curp/tests/common/curp_group.rs @@ -134,7 +134,12 @@ impl CurpGroup { } pub async fn new_client(&self, timeout: ClientTimeout) -> Client { - Client::::new(None, self.all.clone(), timeout).await + Client::builder() + .addrs(self.all.values().cloned().collect_vec()) + .timeout(timeout) + .build() + .await + .unwrap() } pub fn exe_rxs( diff --git a/curp/tests/server.rs b/curp/tests/server.rs index 6e3c2d97f..d886c06e4 100644 --- a/curp/tests/server.rs +++ b/curp/tests/server.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use clippy_utilities::NumericCast; +use curp::client::Builder; use curp_test_utils::{init_logger, sleep_millis, sleep_secs, test_cmd::TestCommand}; use madsim::rand::{thread_rng, Rng}; use test_macros::abort_on_panic; @@ -11,6 +12,7 @@ use utils::config::ClientTimeout; use crate::common::curp_group::{ proto::propose_response::ExeResult, CurpGroup, ProposeRequest, ProposeResponse, }; + mod common; #[tokio::test] @@ -36,6 +38,22 @@ async fn basic_propose() { group.stop(); } +#[tokio::test] +#[abort_on_panic] +async fn fetch_cluster() { + init_logger(); + let group = CurpGroup::new(3).await; + + let all_addrs = group.all.values().cloned().collect::>(); + let client_builder = Builder::::default() + .addrs(all_addrs) + .timeout(ClientTimeout::default()); + + let _client = client_builder.build().await.unwrap(); + + group.stop(); +} + #[tokio::test] #[abort_on_panic] async fn synced_propose() { diff --git a/xline-client/src/error.rs b/xline-client/src/error.rs index 66be9c901..ddd85ddfc 100644 --- a/xline-client/src/error.rs +++ b/xline-client/src/error.rs @@ -1,3 +1,4 @@ +use curp::error::ClientBuildError; use thiserror::Error; use xline::server::Command; @@ -26,6 +27,9 @@ pub enum ClientError { /// Error in lease client #[error("Lease client error: {0}")] LeaseError(String), + /// Curp client build error + #[error("Curp client build error: {0}")] + BuildError(#[from] ClientBuildError), } impl From for ClientError { diff --git a/xline-client/src/lib.rs b/xline-client/src/lib.rs index e861e4ac6..6497d40b9 100644 --- a/xline-client/src/lib.rs +++ b/xline-client/src/lib.rs @@ -216,7 +216,13 @@ impl Client { .collect(); let name = String::from("client"); let channel = Self::build_channel(all_members.values().cloned().collect()).await?; - let curp_client = Arc::new(CurpClient::new(None, all_members, options.curp_timeout).await); + let curp_client = Arc::new( + CurpClient::builder() + .addrs(all_members.into_values().collect()) + .timeout(options.curp_timeout) + .build() + .await?, + ); let id_gen = Arc::new(lease_gen::LeaseIdGenerator::new()); let token = match options.user { diff --git a/xline/src/client/errors.rs b/xline/src/client/errors.rs index 00ace766d..055f4ca08 100644 --- a/xline/src/client/errors.rs +++ b/xline/src/client/errors.rs @@ -1,3 +1,4 @@ +use curp::error::ClientBuildError; use thiserror::Error; use crate::server::Command; @@ -18,6 +19,9 @@ pub enum ClientError { /// Engine error #[error("Engine error {0}")] EngineError(#[from] engine::EngineError), + /// Build error + #[error("Build error {0}")] + BuildError(#[from] ClientBuildError), } impl From for ClientError { diff --git a/xline/src/client/mod.rs b/xline/src/client/mod.rs index cbed9a5be..2d8f06926 100644 --- a/xline/src/client/mod.rs +++ b/xline/src/client/mod.rs @@ -71,7 +71,11 @@ impl Client { let name = String::from("client"); let etcd_client = EtcdClient::connect(all_members.values().cloned().collect_vec(), None).await?; - let curp_client = CurpClient::new(None, all_members, timeout).await; + let curp_client = CurpClient::builder() + .addrs(all_members.into_values().collect()) + .timeout(timeout) + .build() + .await?; Ok(Self { name, curp_client, diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index f2bc28988..7492d738e 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -319,12 +319,12 @@ impl XlineServer { }; let client = Arc::new( - CurpClient::new( - Some(self.cluster_info.self_id().clone()), - self.cluster_info.all_members(), - self.client_timeout, - ) - .await, + CurpClient::builder() + .all_members(self.cluster_info.all_members()) + .local_server_id(self.cluster_info.self_id().clone()) + .timeout(self.client_timeout) + .build() + .await?, ); let auto_compactor = if let Some(auto_config_cfg) = *self.compact_cfg.auto_compact_config()