Skip to content

Commit

Permalink
feat: support meta event rpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 11, 2022
1 parent 3f1f677 commit 95f1b0c
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use catalog::{
};
use common_types::schema::SchemaName;
use log::{debug, info};
use meta_client::MetaClientRef;
use meta_client::{types::AllocSchemaIdRequest, MetaClientRef};
use snafu::{ensure, ResultExt};
use table_engine::table::{SchemaId, TableRef};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Catalog for CatalogImpl {

let schema_id = self
.meta_client
.alloc_schema_id(cluster::AllocSchemaIdRequest {
.alloc_schema_id(AllocSchemaIdRequest {
name: name.to_string(),
})
.await
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Schema for SchemaImpl {

// Request CeresMeta to drop this table.
self.meta_client
.drop_table(cluster::DropTableRequest {
.drop_table(meta_client::types::DropTableRequest {
schema_name: schema_name.to_string(),
name: table_name.to_string(),
id: table.id().as_u64(),
Expand Down
1 change: 1 addition & 0 deletions cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
analytic_engine = { workspace = true }
async-trait = { workspace = true }
catalog = { workspace = true }
ceresdbproto = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
log = { workspace = true }
Expand Down
23 changes: 21 additions & 2 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::{
};

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest,
};
use common_util::runtime::{JoinHandle, Runtime};
use log::{error, info, warn};
use meta_client::{
Expand All @@ -19,8 +22,8 @@ use tokio::{
};

use crate::{
config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, Cluster,
ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result,
config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, CloseShardsOpts,
Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShardsOpts, Result,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand Down Expand Up @@ -206,6 +209,22 @@ impl Cluster for ClusterImpl {
Ok(())
}

async fn open_shards(&self, _req: &OpenShardsRequest, _opts: OpenShardsOpts) -> Result<()> {
todo!();
}

async fn close_shards(&self, _req: &CloseShardsRequest, _opts: CloseShardsOpts) -> Result<()> {
todo!();
}

async fn create_table_on_shard(&self, _req: &CreateTableOnShardRequest) -> Result<()> {
todo!();
}

async fn drop_table_on_shard(&self, _req: &DropTableOnShardRequest) -> Result<()> {
todo!();
}

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse> {
self.inner.route_tables(req).await
}
Expand Down
17 changes: 13 additions & 4 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
use std::sync::Arc;

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest,
};
use common_types::schema::SchemaName;
use common_util::define_result;
pub use meta_client::types::{
AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, CreateTableResponse,
DropTableRequest, GetShardTablesRequest,
};
use meta_client::types::{ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId};
use snafu::{Backtrace, Snafu};

Expand Down Expand Up @@ -78,11 +77,21 @@ pub struct ClusterNodesResp {
pub cluster_nodes: ClusterNodesRef,
}

#[derive(Debug, Default)]
pub struct OpenShardsOpts {}

#[derive(Debug, Default)]
pub struct CloseShardsOpts {}

/// Cluster manages tables and shard infos in cluster mode.
#[async_trait]
pub trait Cluster {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn open_shards(&self, req: &OpenShardsRequest, opts: OpenShardsOpts) -> Result<()>;
async fn close_shards(&self, req: &CloseShardsRequest, opts: CloseShardsOpts) -> Result<()>;
async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>;
async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>;
async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse>;
async fn fetch_nodes(&self) -> Result<ClusterNodesResp>;
}
178 changes: 165 additions & 13 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ceresdbproto::{
WriteResponse,
},
};
use cluster::{config::SchemaConfig, ClusterRef};
use cluster::{config::SchemaConfig, CloseShardsOpts, ClusterRef, OpenShardsOpts};
use common_types::{
column_schema::{self, ColumnSchema},
datum::DatumKind,
Expand Down Expand Up @@ -256,7 +256,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
pub struct RpcServices<Q: QueryExecutor + 'static> {
serve_addr: SocketAddr,
rpc_server: StorageServiceServer<StorageServiceImpl<Q>>,
meta_rpc_server: MetaEventServiceServer<MetaServiceImpl<Q>>,
meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl<Q>>>,
runtime: Arc<Runtime>,
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
Expand All @@ -271,9 +271,14 @@ impl<Q: QueryExecutor + 'static> RpcServices<Q> {
let join_handle = self.runtime.spawn(async move {
info!("Grpc server starts listening on {}", serve_addr);

let serve_res = Server::builder()
.add_service(rpc_server)
.add_service(meta_rpc_server)
let mut router = Server::builder().add_service(rpc_server);

if let Some(s) = meta_rpc_server {
info!("Grpc server serves meta rpc service");
router = router.add_service(s);
};

let serve_res = router
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
.await;

Expand Down Expand Up @@ -302,6 +307,7 @@ pub struct Builder<Q> {
runtimes: Option<Arc<EngineRuntimes>>,
instance: Option<InstanceRef<Q>>,
router: Option<RouterRef>,
cluster: Option<ClusterRef>,
schema_config_provider: Option<SchemaConfigProviderRef>,
}

Expand All @@ -312,6 +318,7 @@ impl<Q> Builder<Q> {
runtimes: None,
instance: None,
router: None,
cluster: None,
schema_config_provider: None,
}
}
Expand All @@ -336,6 +343,12 @@ impl<Q> Builder<Q> {
self
}

// Cluster is an optional field for building [RpcServices].
pub fn cluster(mut self, cluster: Option<ClusterRef>) -> Self {
self.cluster = cluster;
self
}

pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self {
self.schema_config_provider = Some(provider);
self
Expand All @@ -351,6 +364,15 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.schema_config_provider
.context(MissingSchemaConfigProvider)?;

let meta_rpc_server = self.cluster.map(|v| {
let meta_service = MetaServiceImpl {
cluster: v,
instance: instance.clone(),
runtime: runtimes.meta_runtime.clone(),
};
MetaEventServiceServer::new(meta_service)
});

let bg_runtime = runtimes.bg_runtime.clone();
let storage_service = StorageServiceImpl {
router,
Expand All @@ -359,7 +381,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
schema_config_provider,
};
let rpc_server = StorageServiceServer::new(storage_service);
let meta_rpc_server = MetaEventServiceServer::new(MetaServiceImpl);

let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?;

Ok(RpcServices {
Expand Down Expand Up @@ -390,37 +412,167 @@ fn build_ok_header() -> ResponseHeader {
#[derive(Clone)]
struct MetaServiceImpl<Q: QueryExecutor + 'static> {
cluster: ClusterRef,
#[allow(dead_code)]
instance: InstanceRef<Q>,
runtime: Arc<Runtime>,
}

#[async_trait]
impl<Q: QueryExecutor + 'static> MetaEventService for MetaServiceImpl<Q> {
// TODO: use macro to remove the boilerplate codes.
async fn open_shards(
&self,
request: tonic::Request<OpenShardsRequest>,
) -> std::result::Result<tonic::Response<OpenShardsResponse>, tonic::Status> {
todo!()
let cluster = self.cluster.clone();
let handle = self.runtime.spawn(async move {
let request = request.into_inner();
cluster
.open_shards(&request, OpenShardsOpts::default())
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to open shards in cluster",
})
});

let res = handle
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join task",
});

let mut resp = OpenShardsResponse::default();
match res {
Ok(Ok(_)) => {
resp.header = Some(build_ok_header());
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(build_err_header(e));
}
};

Ok(tonic::Response::new(resp))
}

async fn close_shards(
&self,
_request: tonic::Request<CloseShardsRequest>,
request: tonic::Request<CloseShardsRequest>,
) -> std::result::Result<tonic::Response<CloseShardsResponse>, tonic::Status> {
todo!()
let cluster = self.cluster.clone();
let handle = self.runtime.spawn(async move {
let request = request.into_inner();
cluster
.close_shards(&request, CloseShardsOpts::default())
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to close shards in cluster",
})
});

let res = handle
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join task",
});

let mut resp = CloseShardsResponse::default();
match res {
Ok(Ok(_)) => {
resp.header = Some(build_ok_header());
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(build_err_header(e));
}
};

Ok(tonic::Response::new(resp))
}

async fn create_table_on_shard(
&self,
_request: tonic::Request<CreateTableOnShardRequest>,
request: tonic::Request<CreateTableOnShardRequest>,
) -> std::result::Result<tonic::Response<CreateTableOnShardResponse>, tonic::Status> {
todo!()
let cluster = self.cluster.clone();
let handle = self.runtime.spawn(async move {
let request = request.into_inner();
cluster
.create_table_on_shard(&request)
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!(
"fail to create table on shard in cluster, req:{:?}",
request
),
})
});

let res = handle
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join task",
});

let mut resp = CreateTableOnShardResponse::default();
match res {
Ok(Ok(_)) => {
resp.header = Some(build_ok_header());
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(build_err_header(e));
}
};

Ok(tonic::Response::new(resp))
}

async fn drop_table_on_shard(
&self,
_request: tonic::Request<DropTableOnShardRequest>,
request: tonic::Request<DropTableOnShardRequest>,
) -> std::result::Result<tonic::Response<DropTableOnShardResponse>, tonic::Status> {
todo!()
let cluster = self.cluster.clone();
let handle = self.runtime.spawn(async move {
let request = request.into_inner();
cluster
.drop_table_on_shard(&request)
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to drop table on shard in cluster, req:{:?}", request),
})
});

let res = handle
.await
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to join task",
});

let mut resp = DropTableOnShardResponse::default();
match res {
Ok(Ok(_)) => {
resp.header = Some(build_ok_header());
}
Ok(Err(e)) | Err(e) => {
resp.header = Some(build_err_header(e));
}
};

Ok(tonic::Response::new(resp))
}

async fn split_shard(
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.runtimes(runtimes)
.instance(instance.clone())
.router(router)
.cluster(self.cluster.clone())
.schema_config_provider(provider)
.build()
.context(BuildGrpcService)?;
Expand Down

0 comments on commit 95f1b0c

Please sign in to comment.