diff --git a/Cargo.lock b/Cargo.lock index e1430bc49..6c41b4507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,7 @@ dependencies = [ "utils", "xline", "xline-client", + "xline-test-utils", ] [[package]] diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml index acc17e80c..817306fe0 100644 --- a/benchmark/Cargo.toml +++ b/benchmark/Cargo.toml @@ -26,3 +26,4 @@ tracing-subscriber = "0.3.1" utils = { path = "../utils", version = "0.1.0" } xline = { path = "../xline" } xline-client = { path = "../xline-client" } +xline-test-utils = { path = "../xline-test-utils" } diff --git a/benchmark/src/bench_client.rs b/benchmark/src/bench_client.rs index e3a511c91..850ffc41a 100644 --- a/benchmark/src/bench_client.rs +++ b/benchmark/src/bench_client.rs @@ -6,7 +6,7 @@ use thiserror::Error; use xline::server::Command; use xline_client::{ error::XlineClientError as ClientError, - types::kv::{PutRequest, PutResponse}, + types::kv::{PutRequest, PutResponse, RangeRequest, RangeResponse}, Client, ClientOptions, }; @@ -22,16 +22,30 @@ pub(crate) enum BenchClientError { XlineError(#[from] ClientError), } +/// The KV client enum used in benchmark +pub(crate) enum KVClient { + /// Xline client + Xline(Client), + /// Etcd client + Etcd(EtcdClient), +} + +impl Debug for KVClient { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + KVClient::Xline(ref client) => write!(f, "Xline({client:?})"), + KVClient::Etcd(ref _client) => write!(f, "Etcd"), + } + } +} + /// Benchmark client pub(crate) struct BenchClient { /// Name of the client name: String, - /// etcd client - etcd_client: EtcdClient, - /// xline client - xline_client: Client, - /// Use xline client to send requests when true - use_curp_client: bool, + /// KV client instance + kv_client: KVClient, } impl Debug for BenchClient { @@ -39,8 +53,7 @@ impl Debug for BenchClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") .field("name", &self.name) - .field("use_curp_client", &self.use_curp_client) - .field("xline_client", &self.xline_client) + .field("kv_client", &self.kv_client) .finish() } } @@ -73,12 +86,12 @@ impl BenchClient { config: ClientOptions, ) -> Result { let name = String::from("client"); - Ok(Self { - name, - etcd_client: EtcdClient::connect(addrs.clone(), None).await?, - xline_client: Client::connect(addrs, config).await?, - use_curp_client, - }) + let kv_client = if use_curp_client { + KVClient::Xline(Client::connect(addrs, config).await?) + } else { + KVClient::Etcd(EtcdClient::connect(addrs.clone(), None).await?) + }; + Ok(Self { name, kv_client }) } /// Send `PutRequest` by `XlineClient` or `EtcdClient` @@ -91,16 +104,101 @@ impl BenchClient { &mut self, request: PutRequest, ) -> Result { - if self.use_curp_client { - let response = self.xline_client.kv_client().put(request).await?; - Ok(response) - } else { - let opts = from_request_to_option(&request); - let response = self - .etcd_client - .put(request.key(), request.value(), Some(opts)) - .await?; - Ok(response.into()) + match self.kv_client { + KVClient::Xline(ref mut xline_client) => { + let response = xline_client.kv_client().put(request).await?; + Ok(response) + } + KVClient::Etcd(ref mut etcd_client) => { + let opts = from_request_to_option(&request); + let response = etcd_client + .put(request.key(), request.value(), Some(opts)) + .await?; + Ok(response.into()) + } + } + } + + /// Send `RangeRequest` by `XlineClient` or `EtcdClient` + /// + /// # Errors + /// + /// If `XlineClient` or `EtcdClient` failed to send request + #[inline] + #[allow(dead_code)] + pub(crate) async fn get( + &mut self, + request: RangeRequest, + ) -> Result { + match self.kv_client { + KVClient::Xline(ref mut xline_client) => { + let response = xline_client.kv_client().range(request).await?; + Ok(response) + } + KVClient::Etcd(ref mut etcd_client) => { + let response = etcd_client.get(request.key(), None).await?; + Ok(response.into()) + } + } + } +} + +#[cfg(test)] +#[allow(clippy::panic)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod test { + use crate::bench_client::{BenchClient, ClientOptions, PutRequest}; + use xline_client::types::kv::RangeRequest; + use xline_test_utils::Cluster; + + #[tokio::test(flavor = "multi_thread")] + async fn test_new_xline_client() { + // create xline client + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let use_curp_client = true; + let config = ClientOptions::default(); + let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config) + .await + .unwrap(); + //check xline client put value exist + let request = PutRequest::new("put", "123"); + let _put_response = client.put(request).await; + let range_request = RangeRequest::new("put"); + match client.get(range_request).await { + Ok(response) => { + assert_eq!(response.kvs[0].value, b"123"); + } + Err(err) => { + panic!("should not be error: {err}"); + } + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_new_etcd_client() { + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let use_curp_client = false; + let config = ClientOptions::default(); + let mut client = match BenchClient::new(cluster.addrs(), use_curp_client, config).await { + Ok(client) => client, + Err(err) => { + panic!("should not be error: {err}"); + } + }; + + let request = PutRequest::new("put", "123"); + let _put_response = client.put(request).await; + let range_request = RangeRequest::new("put"); + match client.get(range_request).await { + Ok(response) => { + assert_eq!(response.kvs[0].value, b"123"); + } + Err(err) => { + panic!("should not be error: {err}"); + } } } }