From 0c2565026d43e19cced04410c86ce1264e1ecd65 Mon Sep 17 00:00:00 2001 From: kikkon Date: Wed, 8 Nov 2023 10:32:08 +0800 Subject: [PATCH] refactor: bench client implementation Signed-off-by: kikkon --- Cargo.lock | 1 + benchmark/Cargo.toml | 1 + benchmark/src/bench_client.rs | 133 ++++++++++++++++++++++++++++------ 3 files changed, 111 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1430bc49..6c41b4507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,6 +283,7 @@ dependencies = [ "utils", "xline", "xline-client", + "xline-test-utils", ] [[package]] diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml index acc17e80c..817306fe0 100644 --- a/benchmark/Cargo.toml +++ b/benchmark/Cargo.toml @@ -26,3 +26,4 @@ tracing-subscriber = "0.3.1" utils = { path = "../utils", version = "0.1.0" } xline = { path = "../xline" } xline-client = { path = "../xline-client" } +xline-test-utils = { path = "../xline-test-utils" } diff --git a/benchmark/src/bench_client.rs b/benchmark/src/bench_client.rs index e3a511c91..e834bdf17 100644 --- a/benchmark/src/bench_client.rs +++ b/benchmark/src/bench_client.rs @@ -4,6 +4,8 @@ use anyhow::Result; use etcd_client::{Client as EtcdClient, PutOptions}; use thiserror::Error; use xline::server::Command; +#[cfg(test)] +use xline_client::types::kv::{RangeRequest, RangeResponse}; use xline_client::{ error::XlineClientError as ClientError, types::kv::{PutRequest, PutResponse}, @@ -22,16 +24,30 @@ pub(crate) enum BenchClientError { XlineError(#[from] ClientError), } +/// The KV client enum used in benchmark +pub(crate) enum KVClient { + /// Xline client + Xline(Client), + /// Etcd client + Etcd(EtcdClient), +} + +impl Debug for KVClient { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + KVClient::Xline(ref client) => write!(f, "Xline({client:?})"), + KVClient::Etcd(ref _client) => write!(f, "Etcd"), + } + } +} + /// Benchmark client pub(crate) struct BenchClient { /// Name of the client name: String, - /// etcd client - etcd_client: EtcdClient, - /// xline client - xline_client: Client, - /// Use xline client to send requests when true - use_curp_client: bool, + /// KV client instance + kv_client: KVClient, } impl Debug for BenchClient { @@ -39,8 +55,7 @@ impl Debug for BenchClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") .field("name", &self.name) - .field("use_curp_client", &self.use_curp_client) - .field("xline_client", &self.xline_client) + .field("kv_client", &self.kv_client) .finish() } } @@ -73,12 +88,12 @@ impl BenchClient { config: ClientOptions, ) -> Result { let name = String::from("client"); - Ok(Self { - name, - etcd_client: EtcdClient::connect(addrs.clone(), None).await?, - xline_client: Client::connect(addrs, config).await?, - use_curp_client, - }) + let kv_client = if use_curp_client { + KVClient::Xline(Client::connect(addrs, config).await?) + } else { + KVClient::Etcd(EtcdClient::connect(addrs.clone(), None).await?) + }; + Ok(Self { name, kv_client }) } /// Send `PutRequest` by `XlineClient` or `EtcdClient` @@ -91,16 +106,86 @@ impl BenchClient { &mut self, request: PutRequest, ) -> Result { - if self.use_curp_client { - let response = self.xline_client.kv_client().put(request).await?; - Ok(response) - } else { - let opts = from_request_to_option(&request); - let response = self - .etcd_client - .put(request.key(), request.value(), Some(opts)) - .await?; - Ok(response.into()) + match self.kv_client { + KVClient::Xline(ref mut xline_client) => { + let response = xline_client.kv_client().put(request).await?; + Ok(response) + } + KVClient::Etcd(ref mut etcd_client) => { + let opts = from_request_to_option(&request); + let response = etcd_client + .put(request.key(), request.value(), Some(opts)) + .await?; + Ok(response.into()) + } + } + } + + /// Send `RangeRequest` by `XlineClient` or `EtcdClient` + /// + /// # Errors + /// + /// If `XlineClient` or `EtcdClient` failed to send request + #[inline] + #[cfg(test)] + pub(crate) async fn get( + &mut self, + request: RangeRequest, + ) -> Result { + match self.kv_client { + KVClient::Xline(ref mut xline_client) => { + let response = xline_client.kv_client().range(request).await?; + Ok(response) + } + KVClient::Etcd(ref mut etcd_client) => { + let response = etcd_client.get(request.key(), None).await?; + Ok(response.into()) + } } } } + +#[cfg(test)] +#[allow(clippy::panic)] +#[allow(clippy::unwrap_used)] +#[allow(clippy::indexing_slicing)] +mod test { + use crate::bench_client::{BenchClient, ClientOptions, PutRequest}; + use xline_client::types::kv::RangeRequest; + use xline_test_utils::Cluster; + + #[tokio::test(flavor = "multi_thread")] + async fn test_new_xline_client() { + // create xline client + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let use_curp_client = true; + let config = ClientOptions::default(); + let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config) + .await + .unwrap(); + //check xline client put value exist + let request = PutRequest::new("put", "123"); + let _put_response = client.put(request).await; + let range_request = RangeRequest::new("put"); + let response = client.get(range_request).await.unwrap(); + assert_eq!(response.kvs[0].value, b"123"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_new_etcd_client() { + let mut cluster = Cluster::new(3).await; + cluster.start().await; + let use_curp_client = false; + let config = ClientOptions::default(); + let mut client = BenchClient::new(cluster.addrs(), use_curp_client, config) + .await + .unwrap(); + + let request = PutRequest::new("put", "123"); + let _put_response = client.put(request).await; + let range_request = RangeRequest::new("put"); + let response = client.get(range_request).await.unwrap(); + assert_eq!(response.kvs[0].value, b"123"); + } +}