Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: upgrade implementation of GRPC (from grpcio to tonic) #18

Merged
merged 12 commits into from
Nov 10, 2022
Merged
765 changes: 551 additions & 214 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ edition = "2021"

[dependencies]
async-trait = "0.1.57"
grpcio = "0.9.1"
avro-rs = "0.13.0"
dashmap = "5.3.4"
futures = "0.3"
tonic = "0.8.1"
tokio = "1.15"

[dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "2c10152d021cd5a26b9c870cdede6a0317adca3d"
rev = "29cb0c6fba76401fd9a4ae5b8cacc9002ad78650"

[dependencies.common_types]
git = "https://github.com/CeresDB/ceresdb.git"
Expand Down
15 changes: 5 additions & 10 deletions src/db_client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::sync::Arc;

use crate::{
db_client::{cluster::ClusterImpl, standalone::StandaloneImpl, DbClient},
router::RouterImpl,
rpc_client::RpcClientImplBuilder,
rpc_client::RpcClientImplFactory,
RpcConfig, RpcOptions,
};

Expand Down Expand Up @@ -52,17 +51,13 @@ impl Builder {
}

pub fn build(self) -> Arc<dyn DbClient> {
let rpc_client_builder = RpcClientImplBuilder::new(self.grpc_config, self.rpc_opts);
let rpc_client_factory =
Arc::new(RpcClientImplFactory::new(self.grpc_config, self.rpc_opts));

match self.mode {
Mode::Standalone => {
Arc::new(StandaloneImpl::new(rpc_client_builder.build(self.endpoint)))
}
Mode::Standalone => Arc::new(StandaloneImpl::new(rpc_client_factory, self.endpoint)),

Mode::Cluster => {
let router = RouterImpl::new(rpc_client_builder.build(self.endpoint));
Arc::new(ClusterImpl::new(router, rpc_client_builder))
}
Mode::Cluster => Arc::new(ClusterImpl::new(rpc_client_factory, self.endpoint)),
}
}
}
81 changes: 47 additions & 34 deletions src/db_client/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::future::join_all;
use tokio::sync::OnceCell;

use super::{standalone::StandaloneImpl, DbClient};
use super::{direct::DirectInnerClient, DbClient};
use crate::{
errors::ClusterWriteError,
model::{
Expand All @@ -15,29 +16,48 @@ use crate::{
write::{WriteRequest, WriteResponse},
QueryResponse,
},
router::Router,
rpc_client::{RpcClientImpl, RpcClientImplBuilder, RpcContext},
router::{Router, RouterImpl},
rpc_client::{RpcClientFactory, RpcContext},
util::should_refresh,
Error, Result,
};

/// Client for ceresdb of cluster mode.
pub struct ClusterImpl<R: Router> {
router: R,
// Server connection handler pool.
standalone_pool: StandalonePool,
/// Client implementation for ceresdb while using cluster mode.
pub struct ClusterImpl<F: RpcClientFactory> {
factory: Arc<F>,
router_endpoint: String,
router: OnceCell<Box<dyn Router>>,
standalone_pool: DirectClientPool<F>,
}

impl<F: RpcClientFactory> ClusterImpl<F> {
pub fn new(factory: Arc<F>, router_endpoint: String) -> Self {
Self {
factory: factory.clone(),
router_endpoint,
router: OnceCell::new(),
standalone_pool: DirectClientPool::new(factory),
}
}

#[inline]
async fn init_router(&self) -> Result<Box<dyn Router>> {
let router_client = self.factory.build(self.router_endpoint.clone()).await?;
Ok(Box::new(RouterImpl::new(router_client)))
}
}

#[async_trait]
impl<R: Router> DbClient for ClusterImpl<R> {
impl<F: RpcClientFactory> DbClient for ClusterImpl<F> {
async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result<QueryResponse> {
if req.metrics.is_empty() {
return Err(Error::Unknown(
"Metrics in query request can't be empty in cluster mode".to_string(),
));
}
let router_handle = self.router.get_or_try_init(|| self.init_router()).await?;

let endpoint = match self.router.route(&req.metrics, ctx).await {
let endpoint = match router_handle.route(&req.metrics, ctx).await {
Ok(mut eps) => {
if let Some(ep) = eps[0].take() {
ep
Expand All @@ -54,16 +74,17 @@ impl<R: Router> DbClient for ClusterImpl<R> {

let client = self.standalone_pool.get_or_create(&endpoint).clone();

client.query_internal(ctx, req.clone()).await.map_err(|e| {
self.router.evict(&req.metrics);
client.query_internal(ctx, req).await.map_err(|e| {
router_handle.evict(&req.metrics);
e
})
}

async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result<WriteResponse> {
// Get metrics' related endpoints(some may not exist).
let should_routes: Vec<_> = req.write_entries.iter().map(|(m, _)| m.clone()).collect();
let endpoints = self.router.route(&should_routes, ctx).await?;
let router_handle = self.router.get_or_try_init(|| self.init_router()).await?;
let endpoints = router_handle.route(&should_routes, ctx).await?;

// Partition write entries in request according to related endpoints.
let mut no_corresponding_endpoints = Vec::new();
Expand Down Expand Up @@ -99,7 +120,7 @@ impl<R: Router> DbClient for ClusterImpl<R> {
.collect();
let mut futures = Vec::with_capacity(client_req_paris.len());
for (client, req) in client_req_paris {
futures.push(async move { client.write_internal(ctx, req).await })
futures.push(async move { client.write_internal(ctx, &req).await })
}

// Await rpc results and collect results.
Expand Down Expand Up @@ -134,7 +155,7 @@ impl<R: Router> DbClient for ClusterImpl<R> {
})
.flatten()
.collect();
self.router.evict(&evicts);
router_handle.evict(&evicts);

let cluster_error: ClusterWriteError = metrics_result_pairs.into();
if cluster_error.all_ok() {
Expand All @@ -145,39 +166,31 @@ impl<R: Router> DbClient for ClusterImpl<R> {
}
}

impl<R: Router> ClusterImpl<R> {
pub fn new(route_client: R, standalone_builder: RpcClientImplBuilder) -> Self {
Self {
router: route_client,
standalone_pool: StandalonePool::new(standalone_builder),
}
}
}

struct StandalonePool {
pool: DashMap<Endpoint, Arc<StandaloneImpl<RpcClientImpl>>>,
standalone_builder: RpcClientImplBuilder,
/// DirectClientPool is the pool actually holding connections to data nodes.
struct DirectClientPool<F: RpcClientFactory> {
pool: DashMap<Endpoint, Arc<DirectInnerClient<F>>>,
factory: Arc<F>,
}

// TODO better to add gc.
impl StandalonePool {
fn new(standalone_builder: RpcClientImplBuilder) -> Self {
impl<F: RpcClientFactory> DirectClientPool<F> {
fn new(factory: Arc<F>) -> Self {
Self {
pool: DashMap::new(),
standalone_builder,
factory,
}
}

fn get_or_create(&self, endpoint: &Endpoint) -> Arc<StandaloneImpl<RpcClientImpl>> {
fn get_or_create(&self, endpoint: &Endpoint) -> Arc<DirectInnerClient<F>> {
if let Some(c) = self.pool.get(endpoint) {
// If exist in cache, return.
c.value().clone()
} else {
// If not exist, build --> insert --> return.
self.pool
.entry(endpoint.clone())
.or_insert(Arc::new(StandaloneImpl::new(
self.standalone_builder.build(endpoint.to_string()),
.or_insert(Arc::new(DirectInnerClient::new(
self.factory.clone(),
endpoint.to_string(),
)))
.clone()
}
Expand Down
74 changes: 74 additions & 0 deletions src/db_client/direct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use tokio::sync::OnceCell;

use crate::{
model::{
convert,
request::QueryRequest,
write::{WriteRequest, WriteResponse},
QueryResponse, Schema,
},
rpc_client::{RpcClient, RpcClientFactory, RpcContext},
Error, Result,
};

/// Inner client for both standalone and cluster modes.
///
/// Now, [`DirectInnerClient`] just wraps [`RpcClient`] simply.
pub(crate) struct DirectInnerClient<F: RpcClientFactory> {
factory: Arc<F>,
endpoint: String,
inner_client: OnceCell<Arc<dyn RpcClient>>,
}

impl<F: RpcClientFactory> DirectInnerClient<F> {
pub fn new(factory: Arc<F>, endpoint: String) -> Self {
DirectInnerClient {
factory,
endpoint,
inner_client: OnceCell::new(),
}
}

#[inline]
async fn init(&self) -> Result<Arc<dyn RpcClient>> {
self.factory.build(self.endpoint.clone()).await
}

pub async fn query_internal(
&self,
ctx: &RpcContext,
req: &QueryRequest,
) -> Result<QueryResponse> {
let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?;
let result_pb = client_handle.as_ref().query(ctx, req.clone().into()).await;

result_pb.and_then(|resp_pb| {
if !resp_pb.schema_content.is_empty() {
convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows)
.map_err(Error::Client)
} else {
Ok(QueryResponse {
schema: Schema::default(),
rows: Vec::new(),
affected_rows: resp_pb.affected_rows,
})
}
})
}

pub async fn write_internal(
&self,
ctx: &RpcContext,
req: &WriteRequest,
) -> Result<WriteResponse> {
let client_handle = self.inner_client.get_or_try_init(|| self.init()).await?;
client_handle
.write(ctx, req.clone().into())
.await
.map(|resp_pb| resp_pb.into())
}
}
1 change: 1 addition & 0 deletions src/db_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

mod builder;
mod cluster;
mod direct;
mod standalone;

use async_trait::async_trait;
Expand Down
65 changes: 19 additions & 46 deletions src/db_client/standalone.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,43 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use async_trait::async_trait;

use super::direct::DirectInnerClient;
use crate::{
db_client::DbClient,
model::{
convert,
request::QueryRequest,
write::{WriteRequest, WriteResponse},
QueryResponse, Schema,
QueryResponse,
},
rpc_client::{RpcClient, RpcContext},
Error, Result,
rpc_client::{RpcClientFactory, RpcContext},
Result,
};

/// Client for ceresdb of standalone mode.
///
/// Now, [`StandaloneImpl`] just wraps [`RpcClient`] simply.
archerny marked this conversation as resolved.
Show resolved Hide resolved
pub struct StandaloneImpl<R: RpcClient> {
pub rpc_client: R,
pub struct StandaloneImpl<F: RpcClientFactory> {
inner_client: DirectInnerClient<F>,
archerny marked this conversation as resolved.
Show resolved Hide resolved
}

#[async_trait]
impl<R: RpcClient> DbClient for StandaloneImpl<R> {
async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result<QueryResponse> {
self.query_internal(ctx, req.clone()).await
}

async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result<WriteResponse> {
self.write_internal(ctx, req.clone()).await
impl<F: RpcClientFactory> StandaloneImpl<F> {
pub fn new(factory: Arc<F>, endpoint: String) -> Self {
Self {
inner_client: DirectInnerClient::new(factory, endpoint),
}
}
}

impl<R: RpcClient> StandaloneImpl<R> {
pub fn new(rpc_client: R) -> Self {
Self { rpc_client }
}

pub async fn query_internal(
&self,
ctx: &RpcContext,
req: QueryRequest,
) -> Result<QueryResponse> {
let result_pb = self.rpc_client.query(ctx, &req.into()).await;
result_pb.and_then(|resp_pb| {
if !resp_pb.schema_content.is_empty() {
convert::parse_queried_rows(&resp_pb.schema_content, &resp_pb.rows)
.map_err(Error::Client)
} else {
Ok(QueryResponse {
schema: Schema::default(),
rows: Vec::new(),
affected_rows: resp_pb.affected_rows,
})
}
})
#[async_trait]
impl<F: RpcClientFactory> DbClient for StandaloneImpl<F> {
async fn query(&self, ctx: &RpcContext, req: &QueryRequest) -> Result<QueryResponse> {
self.inner_client.query_internal(ctx, req).await
}

pub async fn write_internal(
&self,
ctx: &RpcContext,
req: WriteRequest,
) -> Result<WriteResponse> {
self.rpc_client
.write(ctx, &req.into())
.await
.map(|resp_pb| resp_pb.into())
async fn write(&self, ctx: &RpcContext, req: &WriteRequest) -> Result<WriteResponse> {
self.inner_client.write_internal(ctx, req).await
}
}
Loading