Skip to content

Commit

Permalink
feat: support meta rpc service
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 10, 2022
1 parent 92dd1af commit 3f1f677
Showing 1 changed file with 74 additions and 1 deletion.
75 changes: 74 additions & 1 deletion server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ use std::{
use async_trait::async_trait;
use ceresdbproto::{
common::ResponseHeader,
meta_event::{
meta_event_service_server::{MetaEventService, MetaEventServiceServer},
ChangeShardRoleRequest, ChangeShardRoleResponse, CloseShardsRequest, CloseShardsResponse,
CreateTableOnShardRequest, CreateTableOnShardResponse, DropTableOnShardRequest,
DropTableOnShardResponse, MergeShardsRequest, MergeShardsResponse, OpenShardsRequest,
OpenShardsResponse, SplitShardRequest, SplitShardResponse,
},
prometheus::{PrometheusQueryRequest, PrometheusQueryResponse},
storage::{
storage_service_server::{StorageService, StorageServiceServer},
Expand All @@ -21,7 +28,7 @@ use ceresdbproto::{
WriteResponse,
},
};
use cluster::config::SchemaConfig;
use cluster::{config::SchemaConfig, ClusterRef};
use common_types::{
column_schema::{self, ColumnSchema},
datum::DatumKind,
Expand Down Expand Up @@ -249,6 +256,7 @@ impl<'a, Q> HandlerContext<'a, Q> {
pub struct RpcServices<Q: QueryExecutor + 'static> {
serve_addr: SocketAddr,
rpc_server: StorageServiceServer<StorageServiceImpl<Q>>,
meta_rpc_server: MetaEventServiceServer<MetaServiceImpl<Q>>,
runtime: Arc<Runtime>,
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
Expand All @@ -257,13 +265,15 @@ pub struct RpcServices<Q: QueryExecutor + 'static> {
impl<Q: QueryExecutor + 'static> RpcServices<Q> {
pub async fn start(&mut self) -> Result<()> {
let rpc_server = self.rpc_server.clone();
let meta_rpc_server = self.meta_rpc_server.clone();
let serve_addr = self.serve_addr;
let (stop_tx, stop_rx) = oneshot::channel();
let join_handle = self.runtime.spawn(async move {
info!("Grpc server starts listening on {}", serve_addr);

let serve_res = Server::builder()
.add_service(rpc_server)
.add_service(meta_rpc_server)
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
.await;

Expand Down Expand Up @@ -349,11 +359,13 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
schema_config_provider,
};
let rpc_server = StorageServiceServer::new(storage_service);
let meta_rpc_server = MetaEventServiceServer::new(MetaServiceImpl);
let serve_addr = self.endpoint.parse().context(InvalidRpcServeAddr)?;

Ok(RpcServices {
serve_addr,
rpc_server,
meta_rpc_server,
runtime: bg_runtime,
stop_tx: None,
join_handle: None,
Expand All @@ -375,6 +387,67 @@ fn build_ok_header() -> ResponseHeader {
}
}

#[derive(Clone)]
struct MetaServiceImpl<Q: QueryExecutor + 'static> {
cluster: ClusterRef,
instance: InstanceRef<Q>,
}

#[async_trait]
impl<Q: QueryExecutor + 'static> MetaEventService for MetaServiceImpl<Q> {
async fn open_shards(
&self,
request: tonic::Request<OpenShardsRequest>,
) -> std::result::Result<tonic::Response<OpenShardsResponse>, tonic::Status> {
todo!()
}

async fn close_shards(
&self,
_request: tonic::Request<CloseShardsRequest>,
) -> std::result::Result<tonic::Response<CloseShardsResponse>, tonic::Status> {
todo!()
}

async fn create_table_on_shard(
&self,
_request: tonic::Request<CreateTableOnShardRequest>,
) -> std::result::Result<tonic::Response<CreateTableOnShardResponse>, tonic::Status> {
todo!()
}

async fn drop_table_on_shard(
&self,
_request: tonic::Request<DropTableOnShardRequest>,
) -> std::result::Result<tonic::Response<DropTableOnShardResponse>, tonic::Status> {
todo!()
}

async fn split_shard(
&self,
request: tonic::Request<SplitShardRequest>,
) -> std::result::Result<tonic::Response<SplitShardResponse>, tonic::Status> {
info!("Receive split shard request:{:?}", request);
return Err(tonic::Status::new(tonic::Code::Unimplemented, ""));
}

async fn merge_shards(
&self,
request: tonic::Request<MergeShardsRequest>,
) -> std::result::Result<tonic::Response<MergeShardsResponse>, tonic::Status> {
info!("Receive merge shards request:{:?}", request);
return Err(tonic::Status::new(tonic::Code::Unimplemented, ""));
}

async fn change_shard_role(
&self,
request: tonic::Request<ChangeShardRoleRequest>,
) -> std::result::Result<tonic::Response<ChangeShardRoleResponse>, tonic::Status> {
info!("Receive change shard role request:{:?}", request);
return Err(tonic::Status::new(tonic::Code::Unimplemented, ""));
}
}

struct StorageServiceImpl<Q: QueryExecutor + 'static> {
router: Arc<dyn Router + Send + Sync>,
instance: InstanceRef<Q>,
Expand Down

0 comments on commit 3f1f677

Please sign in to comment.