Skip to content

Commit

Permalink
refactor: bench client implementation
Browse files Browse the repository at this point in the history
Signed-off-by: kikkon <nian920@outlook.com>
  • Loading branch information
Kikkon committed Nov 12, 2023
1 parent 84c685a commit d363638
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ tracing-subscriber = "0.3.1"
utils = { path = "../utils", version = "0.1.0" }
xline = { path = "../xline" }
xline-client = { path = "../xline-client" }
xline-test-utils = { path = "../xline-test-utils" }
148 changes: 123 additions & 25 deletions benchmark/src/bench_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use thiserror::Error;
use xline::server::Command;
use xline_client::{
error::XlineClientError as ClientError,
types::kv::{PutRequest, PutResponse},
types::kv::{PutRequest, PutResponse, RangeRequest, RangeResponse},
Client, ClientOptions,
};

Expand All @@ -22,25 +22,38 @@ pub(crate) enum BenchClientError {
XlineError(#[from] ClientError<Command>),
}

/// The KV client enum used in benchmark
pub(crate) enum KVClient {
/// Xline client
Xline(Client),
/// Etcd client
Etcd(EtcdClient),
}

impl Debug for KVClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
KVClient::Xline(ref client) => write!(f, "Xline({client:?})"),
KVClient::Etcd(ref _client) => write!(f, "Etcd"),
}
}
}

/// Benchmark client
pub(crate) struct BenchClient {
/// Name of the client
name: String,
/// etcd client
etcd_client: EtcdClient,
/// xline client
xline_client: Client,
/// Use xline client to send requests when true
use_curp_client: bool,
/// KV client instance
kv_client: KVClient,
}

impl Debug for BenchClient {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("name", &self.name)
.field("use_curp_client", &self.use_curp_client)
.field("xline_client", &self.xline_client)
.field("kv_client", &self.kv_client)
.finish()
}
}
Expand Down Expand Up @@ -73,12 +86,12 @@ impl BenchClient {
config: ClientOptions,
) -> Result<Self> {
let name = String::from("client");
Ok(Self {
name,
etcd_client: EtcdClient::connect(addrs.clone(), None).await?,
xline_client: Client::connect(addrs, config).await?,
use_curp_client,
})
let kv_client = if use_curp_client {
KVClient::Xline(Client::connect(addrs, config).await?)
} else {
KVClient::Etcd(EtcdClient::connect(addrs.clone(), None).await?)
};
Ok(Self { name, kv_client })
}

/// Send `PutRequest` by `XlineClient` or `EtcdClient`
Expand All @@ -91,16 +104,101 @@ impl BenchClient {
&mut self,
request: PutRequest,
) -> Result<PutResponse, BenchClientError> {
if self.use_curp_client {
let response = self.xline_client.kv_client().put(request).await?;
Ok(response)
} else {
let opts = from_request_to_option(&request);
let response = self
.etcd_client
.put(request.key(), request.value(), Some(opts))
.await?;
Ok(response.into())
match self.kv_client {
KVClient::Xline(ref mut xline_client) => {
let response = xline_client.kv_client().put(request).await?;
Ok(response)
}
KVClient::Etcd(ref mut etcd_client) => {
let opts = from_request_to_option(&request);
let response = etcd_client
.put(request.key(), request.value(), Some(opts))
.await?;
Ok(response.into())
}
}
}

/// Send `RangeRequest` by `XlineClient` or `EtcdClient`
///
/// # Errors
///
/// If `XlineClient` or `EtcdClient` failed to send request
#[inline]
#[allow(dead_code)]
pub(crate) async fn get(
&mut self,
request: RangeRequest,
) -> Result<RangeResponse, BenchClientError> {
match self.kv_client {
KVClient::Xline(ref mut xline_client) => {
let response = xline_client.kv_client().range(request).await?;
Ok(response)
}
KVClient::Etcd(ref mut etcd_client) => {
let response = etcd_client.get(request.key(), None).await?;
Ok(response.into())
}
}
}
}

#[cfg(test)]
#[allow(clippy::panic)]
#[allow(clippy::unwrap_used)]
#[allow(clippy::indexing_slicing)]
mod test {
use crate::bench_client::{BenchClient, ClientOptions, PutRequest};
use xline_client::types::kv::RangeRequest;
use xline_test_utils::Cluster;

#[tokio::test(flavor = "multi_thread")]
async fn test_new_xline_client() {
// create xline client
let mut cluster = Cluster::new(3).await;
cluster.start().await;
let use_curp_client = true;
let config = ClientOptions::default();
let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config)
.await
.unwrap();
//check xline client put value exist
let request = PutRequest::new("put", "123");
let _put_response = client.put(request).await;
let range_request = RangeRequest::new("put");
match client.get(range_request).await {
Ok(response) => {
assert_eq!(response.kvs[0].value, b"123");
}
Err(err) => {
panic!("should not be error: {err}");
}
}
}

#[tokio::test(flavor = "multi_thread")]
async fn test_new_etcd_client() {
let mut cluster = Cluster::new(3).await;
cluster.start().await;
let use_curp_client = false;
let config = ClientOptions::default();
let mut client = match BenchClient::new(cluster.addrs(), use_curp_client, config).await {
Ok(client) => client,
Err(err) => {
panic!("should not be error: {err}");
}
};

let request = PutRequest::new("put", "123");
let _put_response = client.put(request).await;
let range_request = RangeRequest::new("put");
match client.get(range_request).await {
Ok(response) => {
assert_eq!(response.kvs[0].value, b"123");
}
Err(err) => {
panic!("should not be error: {err}");
}
}
}
}

0 comments on commit d363638

Please sign in to comment.