Skip to content

Commit

Permalink
feat: support meta event service (#293)
Browse files Browse the repository at this point in the history
* feat: support meta rpc service

* feat: support meta event rpc server
  • Loading branch information
ShiKaiWi authored Oct 11, 2022
1 parent 1de1a25 commit 1a88eb9
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use catalog::{
};
use common_types::schema::SchemaName;
use log::{debug, info};
use meta_client::MetaClientRef;
use meta_client::{types::AllocSchemaIdRequest, MetaClientRef};
use snafu::{ensure, ResultExt};
use table_engine::table::{SchemaId, TableRef};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Catalog for CatalogImpl {

let schema_id = self
.meta_client
.alloc_schema_id(cluster::AllocSchemaIdRequest {
.alloc_schema_id(AllocSchemaIdRequest {
name: name.to_string(),
})
.await
Expand Down Expand Up @@ -324,7 +324,7 @@ impl Schema for SchemaImpl {

// Request CeresMeta to drop this table.
self.meta_client
.drop_table(cluster::DropTableRequest {
.drop_table(meta_client::types::DropTableRequest {
schema_name: schema_name.to_string(),
name: table_name.to_string(),
id: table.id().as_u64(),
Expand Down
1 change: 1 addition & 0 deletions cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
analytic_engine = { workspace = true }
async-trait = { workspace = true }
catalog = { workspace = true }
ceresdbproto = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
log = { workspace = true }
Expand Down
23 changes: 21 additions & 2 deletions cluster/src/cluster_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use std::{
};

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest,
};
use common_util::runtime::{JoinHandle, Runtime};
use log::{error, info, warn};
use meta_client::{
Expand All @@ -19,8 +22,8 @@ use tokio::{
};

use crate::{
config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, Cluster,
ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, Result,
config::ClusterConfig, table_manager::TableManager, topology::ClusterTopology, CloseShardsOpts,
Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShardsOpts, Result,
};

/// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`].
Expand Down Expand Up @@ -206,6 +209,22 @@ impl Cluster for ClusterImpl {
Ok(())
}

async fn open_shards(&self, _req: &OpenShardsRequest, _opts: OpenShardsOpts) -> Result<()> {
todo!();
}

async fn close_shards(&self, _req: &CloseShardsRequest, _opts: CloseShardsOpts) -> Result<()> {
todo!();
}

async fn create_table_on_shard(&self, _req: &CreateTableOnShardRequest) -> Result<()> {
todo!();
}

async fn drop_table_on_shard(&self, _req: &DropTableOnShardRequest) -> Result<()> {
todo!();
}

async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse> {
self.inner.route_tables(req).await
}
Expand Down
17 changes: 13 additions & 4 deletions cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
use std::sync::Arc;

use async_trait::async_trait;
use ceresdbproto::meta_event::{
CloseShardsRequest, CreateTableOnShardRequest, DropTableOnShardRequest, OpenShardsRequest,
};
use common_types::schema::SchemaName;
use common_util::define_result;
pub use meta_client::types::{
AllocSchemaIdRequest, AllocSchemaIdResponse, CreateTableRequest, CreateTableResponse,
DropTableRequest, GetShardTablesRequest,
};
use meta_client::types::{ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId};
use snafu::{Backtrace, Snafu};

Expand Down Expand Up @@ -78,11 +77,21 @@ pub struct ClusterNodesResp {
pub cluster_nodes: ClusterNodesRef,
}

#[derive(Debug, Default)]
pub struct OpenShardsOpts {}

#[derive(Debug, Default)]
pub struct CloseShardsOpts {}

/// Cluster manages tables and shard infos in cluster mode.
#[async_trait]
pub trait Cluster {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn open_shards(&self, req: &OpenShardsRequest, opts: OpenShardsOpts) -> Result<()>;
async fn close_shards(&self, req: &CloseShardsRequest, opts: CloseShardsOpts) -> Result<()>;
async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>;
async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>;
async fn route_tables(&self, req: &RouteTablesRequest) -> Result<RouteTablesResponse>;
async fn fetch_nodes(&self) -> Result<ClusterNodesResp>;
}
Loading

0 comments on commit 1a88eb9

Please sign in to comment.