diff --git a/Cargo.lock b/Cargo.lock index 2fe1f0aa6d..53947e6a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1004,6 +1004,7 @@ dependencies = [ "analytic_engine", "async-trait", "catalog", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=165cbd489e864fa3fb1ace9c6bef1de25b6dc11a)", "common_types 0.1.0", "common_util", "log", diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index 083f8fdb3d..f45a18739f 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -22,7 +22,7 @@ use catalog::{ }; use common_types::schema::SchemaName; use log::{debug, info}; -use meta_client::MetaClientRef; +use meta_client::{types::AllocSchemaIdRequest, MetaClientRef}; use snafu::{ensure, ResultExt}; use table_engine::table::{SchemaId, TableRef}; use tokio::sync::Mutex; @@ -127,7 +127,7 @@ impl Catalog for CatalogImpl { let schema_id = self .meta_client - .alloc_schema_id(cluster::AllocSchemaIdRequest { + .alloc_schema_id(AllocSchemaIdRequest { name: name.to_string(), }) .await @@ -324,7 +324,7 @@ impl Schema for SchemaImpl { // Request CeresMeta to drop this table. self.meta_client - .drop_table(cluster::DropTableRequest { + .drop_table(meta_client::types::DropTableRequest { schema_name: schema_name.to_string(), name: table_name.to_string(), id: table.id().as_u64(), diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 7db7d5f4f0..1b4c69a0cb 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -11,6 +11,7 @@ workspace = true analytic_engine = { workspace = true } async-trait = { workspace = true } catalog = { workspace = true } +ceresdbproto = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } log = { workspace = true } diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index f35cc24d1f..a123f8eed5 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -6,6 +6,9 @@ use std::{ }; use async_trait::async_trait; +use ceresdbproto::meta_event::{ + CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest, +}; use common_util::runtime::{JoinHandle, Runtime}; use log::{error, info, warn}; use meta_client::{ @@ -19,8 +22,8 @@ use tokio::{ }; use crate::{ - config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, Cluster, - ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result, + config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, CloseShardsOpts, + Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShardsOpts, Result, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -206,6 +209,22 @@ impl Cluster for ClusterImpl { Ok(()) } + async fn open_shards(&self, _req: &OpenShardsRequest, _opts: OpenShardsOpts) -> Result<()> { + todo!(); + } + + async fn close_shards(&self, _req: &CloseShardsRequest, _opts: CloseShardsOpts) -> Result<()> { + todo!(); + } + + async fn create_table_on_shard(&self, _req: &CreateTableOnShardRequest) -> Result<()> { + todo!(); + } + + async fn drop_table_on_shard(&self, _req: &DropTableOnShardRequest) -> Result<()> { + todo!(); + } + async fn route_tables(&self, req: &RouteTablesRequest) -> Result { self.inner.route_tables(req).await } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 4eceb8a77a..27f0ab7c9b 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -11,12 +11,11 @@ use std::sync::Arc; use async_trait::async_trait; +use ceresdbproto::meta_event::{ + CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest, +}; use common_types::schema::SchemaName; use common_util::define_result; -pub use meta_client::types::{ - AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, CreateTableResponse, - DropTableRequest, GetShardTablesRequest, -}; use meta_client::types::{ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId}; use snafu::{Backtrace, Snafu}; @@ -78,11 +77,21 @@ pub struct ClusterNodesResp { pub cluster_nodes: ClusterNodesRef, } +#[derive(Debug, Default)] +pub struct OpenShardsOpts {} + +#[derive(Debug, Default)] +pub struct CloseShardsOpts {} + /// Cluster manages tables and shard infos in cluster mode. #[async_trait] pub trait Cluster { async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; + async fn open_shards(&self, req: &OpenShardsRequest, opts: OpenShardsOpts) -> Result<()>; + async fn close_shards(&self, req: &CloseShardsRequest, opts: CloseShardsOpts) -> Result<()>; + async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>; + async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>; async fn route_tables(&self, req: &RouteTablesRequest) -> Result; async fn fetch_nodes(&self) -> Result; } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index bbfc34ea0f..d55158d825 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -28,7 +28,7 @@ use ceresdbproto::{ WriteResponse, }, }; -use cluster::{config::SchemaConfig, ClusterRef}; +use cluster::{config::SchemaConfig, CloseShardsOpts, ClusterRef, OpenShardsOpts}; use common_types::{ column_schema::{self, ColumnSchema}, datum::DatumKind, @@ -256,7 +256,7 @@ impl<'a, Q> HandlerContext<'a, Q> { pub struct RpcServices { serve_addr: SocketAddr, rpc_server: StorageServiceServer>, - meta_rpc_server: MetaEventServiceServer>, + meta_rpc_server: Option>>, runtime: Arc, stop_tx: Option>, join_handle: Option>, @@ -271,9 +271,14 @@ impl RpcServices { let join_handle = self.runtime.spawn(async move { info!("Grpc server starts listening on {}", serve_addr); - let serve_res = Server::builder() - .add_service(rpc_server) - .add_service(meta_rpc_server) + let mut router = Server::builder().add_service(rpc_server); + + if let Some(s) = meta_rpc_server { + info!("Grpc server serves meta rpc service"); + router = router.add_service(s); + }; + + let serve_res = router .serve_with_shutdown(serve_addr, stop_rx.map(drop)) .await; @@ -302,6 +307,7 @@ pub struct Builder { runtimes: Option>, instance: Option>, router: Option, + cluster: Option, schema_config_provider: Option, } @@ -312,6 +318,7 @@ impl Builder { runtimes: None, instance: None, router: None, + cluster: None, schema_config_provider: None, } } @@ -336,6 +343,12 @@ impl Builder { self } + // Cluster is an optional field for building [RpcServices]. + pub fn cluster(mut self, cluster: Option) -> Self { + self.cluster = cluster; + self + } + pub fn schema_config_provider(mut self, provider: SchemaConfigProviderRef) -> Self { self.schema_config_provider = Some(provider); self @@ -351,6 +364,15 @@ impl Builder { .schema_config_provider .context(MissingSchemaConfigProvider)?; + let meta_rpc_server = self.cluster.map(|v| { + let meta_service = MetaServiceImpl { + cluster: v, + instance: instance.clone(), + runtime: runtimes.meta_runtime.clone(), + }; + MetaEventServiceServer::new(meta_service) + }); + let bg_runtime = runtimes.bg_runtime.clone(); let storage_service = StorageServiceImpl { router, @@ -359,7 +381,7 @@ impl Builder { schema_config_provider, }; let rpc_server = StorageServiceServer::new(storage_service); - let meta_rpc_server = MetaEventServiceServer::new(MetaServiceImpl); + let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?; Ok(RpcServices { @@ -390,37 +412,167 @@ fn build_ok_header() -> ResponseHeader { #[derive(Clone)] struct MetaServiceImpl { cluster: ClusterRef, + #[allow(dead_code)] instance: InstanceRef, + runtime: Arc, } #[async_trait] impl MetaEventService for MetaServiceImpl { + // TODO: use macro to remove the boilerplate codes. async fn open_shards( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - todo!() + let cluster = self.cluster.clone(); + let handle = self.runtime.spawn(async move { + let request = request.into_inner(); + cluster + .open_shards(&request, OpenShardsOpts::default()) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to open shards in cluster", + }) + }); + + let res = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to join task", + }); + + let mut resp = OpenShardsResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(build_err_header(e)); + } + }; + + Ok(tonic::Response::new(resp)) } async fn close_shards( &self, - _request: tonic::Request, + request: tonic::Request, ) -> std::result::Result, tonic::Status> { - todo!() + let cluster = self.cluster.clone(); + let handle = self.runtime.spawn(async move { + let request = request.into_inner(); + cluster + .close_shards(&request, CloseShardsOpts::default()) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to close shards in cluster", + }) + }); + + let res = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to join task", + }); + + let mut resp = CloseShardsResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(build_err_header(e)); + } + }; + + Ok(tonic::Response::new(resp)) } async fn create_table_on_shard( &self, - _request: tonic::Request, + request: tonic::Request, ) -> std::result::Result, tonic::Status> { - todo!() + let cluster = self.cluster.clone(); + let handle = self.runtime.spawn(async move { + let request = request.into_inner(); + cluster + .create_table_on_shard(&request) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!( + "fail to create table on shard in cluster, req:{:?}", + request + ), + }) + }); + + let res = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to join task", + }); + + let mut resp = CreateTableOnShardResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(build_err_header(e)); + } + }; + + Ok(tonic::Response::new(resp)) } async fn drop_table_on_shard( &self, - _request: tonic::Request, + request: tonic::Request, ) -> std::result::Result, tonic::Status> { - todo!() + let cluster = self.cluster.clone(); + let handle = self.runtime.spawn(async move { + let request = request.into_inner(); + cluster + .drop_table_on_shard(&request) + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("fail to drop table on shard in cluster, req:{:?}", request), + }) + }); + + let res = handle + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "fail to join task", + }); + + let mut resp = DropTableOnShardResponse::default(); + match res { + Ok(Ok(_)) => { + resp.header = Some(build_ok_header()); + } + Ok(Err(e)) | Err(e) => { + resp.header = Some(build_err_header(e)); + } + }; + + Ok(tonic::Response::new(resp)) } async fn split_shard( diff --git a/server/src/server.rs b/server/src/server.rs index 5f95c2e1fe..3b4b840891 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -261,6 +261,7 @@ impl Builder { .runtimes(runtimes) .instance(instance.clone()) .router(router) + .cluster(self.cluster.clone()) .schema_config_provider(provider) .build() .context(BuildGrpcService)?;