diff --git a/Cargo.lock b/Cargo.lock index 7a9d975e0f..c7138a39a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1239,6 +1239,7 @@ name = "cluster" version = "1.2.2" dependencies = [ "async-trait", + "catalog", "ceresdbproto", "common_types", "common_util", @@ -1251,6 +1252,7 @@ dependencies = [ "snafu 0.6.10", "table_engine", "tokio", + "wal", ] [[package]] diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index dcf0ecd173..fda87ae401 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -20,7 +20,7 @@ use catalog::{ }, Catalog, CatalogRef, CreateSchemaWithCause, }; -use cluster::shard_tables_cache::ShardTablesCache; +use cluster::shard_set::ShardSet; use common_types::schema::SchemaName; use common_util::error::BoxError; use log::{debug, info}; @@ -32,15 +32,15 @@ use tokio::sync::Mutex; /// ManagerImpl manages multiple volatile catalogs. pub struct ManagerImpl { catalogs: HashMap>, - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, meta_client: MetaClientRef, } impl ManagerImpl { - pub fn new(shard_tables_cache: ShardTablesCache, meta_client: MetaClientRef) -> Self { + pub fn new(shard_set: ShardSet, meta_client: MetaClientRef) -> Self { let mut manager = ManagerImpl { catalogs: HashMap::new(), - shard_tables_cache, + shard_set, meta_client, }; @@ -87,7 +87,7 @@ impl ManagerImpl { let catalog = Arc::new(CatalogImpl { name: catalog_name.clone(), schemas: RwLock::new(HashMap::new()), - shard_tables_cache: self.shard_tables_cache.clone(), + shard_set: self.shard_set.clone(), meta_client: self.meta_client.clone(), }); @@ -107,7 +107,7 @@ struct CatalogImpl { name: String, /// All the schemas belonging to the catalog. schemas: RwLock>, - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, meta_client: MetaClientRef, } @@ -156,7 +156,7 @@ impl Catalog for CatalogImpl { self.name.to_string(), name.to_string(), SchemaId::from_u32(schema_id), - self.shard_tables_cache.clone(), + self.shard_set.clone(), )); schemas.insert(name.to_string(), schema); @@ -188,7 +188,7 @@ struct SchemaImpl { /// Schema name schema_name: String, schema_id: SchemaId, - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, /// Tables of schema tables: RwLock>, /// Guard for creating/dropping table @@ -200,13 +200,13 @@ impl SchemaImpl { catalog_name: String, schema_name: String, schema_id: SchemaId, - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, ) -> Self { Self { catalog_name, schema_name, schema_id, - shard_tables_cache, + shard_set, tables: Default::default(), create_table_mutex: Mutex::new(()), } @@ -303,17 +303,20 @@ impl Schema for SchemaImpl { // Do real create table. // Partition table is not stored in ShardTableManager. if request.partition_info.is_none() { - let _ = self - .shard_tables_cache - .find_table_by_name( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) + let shard = + self.shard_set + .get(request.shard_id) + .with_context(|| schema::CreateTable { + request: request.clone(), + msg: "shard not found".to_string(), + })?; + + // TODO: seems unnecessary? + let _ = shard + .find_table(&request.schema_name, &request.table_name) .with_context(|| schema::CreateTable { request: request.clone(), - msg: format!("table with shards is not found in the ShardTableManager, catalog_name:{}, schema_name:{}, table_name:{}", - request.catalog_name, request.schema_name, request.table_name), + msg: "table not found in shard".to_string(), })?; } let request = request.into_engine_create_request(None, self.schema_id); diff --git a/cluster/Cargo.toml b/cluster/Cargo.toml index 2acbf9ab5d..6b40f42382 100644 --- a/cluster/Cargo.toml +++ b/cluster/Cargo.toml @@ -12,6 +12,7 @@ workspace = true [dependencies] async-trait = { workspace = true } +catalog = { workspace = true } ceresdbproto = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } @@ -24,3 +25,4 @@ serde_json = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } tokio = { workspace = true } +wal = { workspace = true } diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index ee88794d4f..549ef330bc 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -6,13 +6,6 @@ use std::{ }; use async_trait::async_trait; -use ceresdbproto::{ - meta_event::{ - CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, - OpenTableOnShardRequest, UpdateShardInfo, - }, - meta_service::TableInfo as TableInfoPb, -}; use common_types::table::ShardId; use common_util::{ error::BoxError, @@ -23,7 +16,7 @@ use log::{error, info, warn}; use meta_client::{ types::{ GetNodesRequest, GetTablesOfShardsRequest, RouteTablesRequest, RouteTablesResponse, - ShardInfo, TableInfo, TablesOfShard, + ShardInfo, }, MetaClientRef, }; @@ -36,11 +29,10 @@ use tokio::{ use crate::{ config::ClusterConfig, shard_lock_manager::{ShardLockManager, ShardLockManagerRef}, - shard_tables_cache::ShardTablesCache, + shard_set::{Shard, ShardRef, ShardSet}, topology::ClusterTopology, - Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, Internal, - InvalidArguments, MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound, - TableNotFound, + Cluster, ClusterNodesNotFound, ClusterNodesResp, EtcdClientFailureWithCause, InvalidArguments, + MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound, }; /// ClusterImpl is an implementation of [`Cluster`] based [`MetaClient`]. @@ -61,7 +53,7 @@ pub struct ClusterImpl { impl ClusterImpl { pub async fn try_new( node_name: String, - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, meta_client: MetaClientRef, config: ClusterConfig, runtime: Arc, @@ -70,7 +62,7 @@ impl ClusterImpl { return InvalidArguments { msg: e }.fail(); } - let inner = Arc::new(Inner::new(shard_tables_cache, meta_client)?); + let inner = Arc::new(Inner::new(shard_set, meta_client)?); let connect_options = ConnectOptions::from(&config.etcd_client); let etcd_client = etcd_client::Client::connect(&config.etcd_client.server_addrs, Some(connect_options)) @@ -110,7 +102,12 @@ impl ClusterImpl { let handle = self.runtime.spawn(async move { loop { - let shard_infos = inner.shard_tables_cache.all_shard_infos(); + let shards = inner.shard_set.all_shards(); + let mut shard_infos = Vec::with_capacity(shards.len()); + for shard in shards { + let shard_info = shard.shard_info(); + shard_infos.push(shard_info); + } info!("Node heartbeat to meta, shard infos:{:?}", shard_infos); let resp = inner.meta_client.send_heartbeat(shard_infos).await; @@ -163,15 +160,15 @@ impl ClusterImpl { } struct Inner { - shard_tables_cache: ShardTablesCache, + shard_set: ShardSet, meta_client: MetaClientRef, topology: RwLock, } impl Inner { - fn new(shard_tables_cache: ShardTablesCache, meta_client: MetaClientRef) -> Result { + fn new(shard_set: ShardSet, meta_client: MetaClientRef) -> Result { Ok(Self { - shard_tables_cache, + shard_set, meta_client, topology: Default::default(), }) @@ -235,20 +232,21 @@ impl Inner { Ok(resp) } - async fn open_shard(&self, shard_info: &ShardInfo) -> Result { - if let Some(tables_of_shard) = self.shard_tables_cache.get(shard_info.id) { - if tables_of_shard.shard_info.version == shard_info.version { + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { + if let Some(shard) = self.shard_set.get(shard_info.id) { + let cur_shard_info = shard.shard_info(); + if cur_shard_info.version == shard_info.version { info!( "No need to open the exactly same shard again, shard_info:{:?}", shard_info ); - return Ok(tables_of_shard); + return Ok(shard); } ensure!( - tables_of_shard.shard_info.version < shard_info.version, + cur_shard_info.version < shard_info.version, OpenShard { shard_id: shard_info.id, - msg: format!("open a shard with a smaller version, curr_shard_info:{:?}, new_shard_info:{:?}", tables_of_shard.shard_info, shard_info), + msg: format!("open a shard with a smaller version, curr_shard_info:{cur_shard_info:?}, new_shard_info:{shard_info:?}"), } ); } @@ -262,8 +260,8 @@ impl Inner { .get_tables_of_shards(req) .await .box_err() - .context(OpenShardWithCause { - shard_id: shard_info.id, + .with_context(|| OpenShardWithCause { + msg: format!("shard_info:{shard_info:?}"), })?; ensure!( @@ -282,95 +280,24 @@ impl Inner { msg: "shard tables are missing from the response", })?; - self.shard_tables_cache.insert(tables_of_shard.clone()); + let shard_id = tables_of_shard.shard_info.id; + let shard = Arc::new(Shard::new(tables_of_shard)); + self.shard_set.insert(shard_id, shard.clone()); - Ok(tables_of_shard) + Ok(shard) } - fn close_shard(&self, shard_id: ShardId) -> Result { - self.shard_tables_cache - .remove(shard_id) - .with_context(|| ShardNotFound { - msg: format!("close non-existent shard, shard_id:{shard_id}"), - }) + fn shard(&self, shard_id: ShardId) -> Option { + self.shard_set.get(shard_id) } - #[inline] - fn freeze_shard(&self, shard_id: ShardId) -> Result { - self.shard_tables_cache - .freeze(shard_id) + fn close_shard(&self, shard_id: ShardId) -> Result { + self.shard_set + .remove(shard_id) .with_context(|| ShardNotFound { - msg: format!("try to freeze a non-existent shard, shard_id:{shard_id}"), + msg: format!("close non-existent shard, shard_id:{shard_id}"), }) } - - fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> { - self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) - } - - fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> { - self.remove_table_from_shard(req.update_shard_info.clone(), req.table_info.clone()) - } - - fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> { - self.insert_table_to_shard(req.update_shard_info.clone(), req.table_info.clone()) - } - - fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> { - self.remove_table_from_shard(req.update_shard_info.clone(), req.table_info.clone()) - } - - fn insert_table_to_shard( - &self, - update_shard_info: Option, - table_info: Option, - ) -> Result<()> { - let update_shard_info = update_shard_info.context(ShardNotFound { - msg: "update shard info is missing", - })?; - let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound { - msg: "current shard info is missing", - })?; - let table_info = table_info.context(TableNotFound { - msg: "table info is missing", - })?; - - self.shard_tables_cache.try_insert_table_to_shard( - update_shard_info.prev_version, - ShardInfo::from(&curr_shard_info), - TableInfo::try_from(table_info) - .box_err() - .context(Internal { - msg: "Failed to parse tableInfo", - })?, - ) - } - - fn remove_table_from_shard( - &self, - update_shard_info: Option, - table_info: Option, - ) -> Result<()> { - let update_shard_info = update_shard_info.context(ShardNotFound { - msg: "update shard info is missing", - })?; - let curr_shard_info = update_shard_info.curr_shard_info.context(ShardNotFound { - msg: "current shard info is missing", - })?; - let table_info = table_info.context(TableNotFound { - msg: "table info is missing", - })?; - - self.shard_tables_cache.try_remove_table_from_shard( - update_shard_info.prev_version, - ShardInfo::from(&curr_shard_info), - TableInfo::try_from(table_info) - .box_err() - .context(Internal { - msg: "Failed to parse tableInfo", - })?, - ) - } } #[async_trait] @@ -406,32 +333,16 @@ impl Cluster for ClusterImpl { Ok(()) } - async fn open_shard(&self, shard_info: &ShardInfo) -> Result { + async fn open_shard(&self, shard_info: &ShardInfo) -> Result { self.inner.open_shard(shard_info).await } - async fn close_shard(&self, shard_id: ShardId) -> Result { - self.inner.close_shard(shard_id) - } - - async fn freeze_shard(&self, shard_id: ShardId) -> Result { - self.inner.freeze_shard(shard_id) - } - - async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()> { - self.inner.create_table_on_shard(req) - } - - async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()> { - self.inner.drop_table_on_shard(req) - } - - async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()> { - self.inner.open_table_on_shard(req) + fn shard(&self, shard_id: ShardId) -> Option { + self.inner.shard(shard_id) } - async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()> { - self.inner.close_table_on_shard(req) + async fn close_shard(&self, shard_id: ShardId) -> Result { + self.inner.close_shard(shard_id) } async fn route_tables(&self, req: &RouteTablesRequest) -> Result { diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index e91e4cc862..ed6fc34ee9 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -13,23 +13,22 @@ use std::sync::Arc; use async_trait::async_trait; -use ceresdbproto::meta_event::{ - CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, - OpenTableOnShardRequest, -}; use common_types::schema::SchemaName; use common_util::{define_result, error::GenericError}; use meta_client::types::{ ClusterNodesRef, RouteTablesRequest, RouteTablesResponse, ShardId, ShardInfo, ShardVersion, - TablesOfShard, }; use shard_lock_manager::ShardLockManagerRef; use snafu::{Backtrace, Snafu}; +use crate::shard_set::ShardRef; + pub mod cluster_impl; pub mod config; pub mod shard_lock_manager; -pub mod shard_tables_cache; +pub mod shard_operation; +pub mod shard_operator; +pub mod shard_set; #[allow(dead_code)] pub mod topology; @@ -67,17 +66,41 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Fail to open shard, shard_id:{shard_id}, source:{source}."))] - OpenShardWithCause { - shard_id: ShardId, - source: GenericError, - }, + #[snafu(display("Fail to open shard, msg:{msg}, source:{source}."))] + OpenShardWithCause { msg: String, source: GenericError }, - #[snafu(display("Fail to close shard, shard_id:{shard_id}, source:{source}."))] - CloseShardWithCause { - shard_id: ShardId, - source: GenericError, - }, + #[snafu(display("Fail to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + OpenShardNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Fail to close shard, msg:{msg}, source:{source}."))] + CloseShardWithCause { msg: String, source: GenericError }, + + #[snafu(display("Fail to close shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + CloseShardNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Fail to create table on shard, msg:{msg}, source:{source}."))] + CreateTableWithCause { msg: String, source: GenericError }, + + #[snafu(display("Fail to create table on shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + CreateTableNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Fail to drop table on shard, msg:{msg}, source:{source}."))] + DropTableWithCause { msg: String, source: GenericError }, + + #[snafu(display("Fail to drop table on shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + DropTableNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Fail to open table on shard, msg:{msg}, source:{source}."))] + OpenTableWithCause { msg: String, source: GenericError }, + + #[snafu(display("Fail to open table on shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + OpenTableNoCause { msg: String, backtrace: Backtrace }, + + #[snafu(display("Fail to close table on shard, msg:{msg}, source:{source}."))] + CloseTableWithCause { msg: String, source: GenericError }, + + #[snafu(display("Fail to close table on shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + CloseTableNoCause { msg: String, backtrace: Backtrace }, #[snafu(display("Shard not found, msg:{msg}.\nBacktrace:\n{backtrace}"))] ShardNotFound { msg: String, backtrace: Backtrace }, @@ -132,19 +155,21 @@ pub struct ClusterNodesResp { pub trait Cluster { async fn start(&self) -> Result<()>; async fn stop(&self) -> Result<()>; - async fn open_shard(&self, shard_info: &ShardInfo) -> Result; - /// Close the shard. + + /// Fetch related information and open shard. + async fn open_shard(&self, shard_info: &ShardInfo) -> Result; + + /// Get shard. /// - /// Return error if the shard is not found. - async fn close_shard(&self, req: ShardId) -> Result; - /// Freeze the shard to reject create/drop table on the shard. + /// If target shard has opened in cluster, return it. Otherwise, return + /// None. + fn shard(&self, shard_id: ShardId) -> Option; + + /// Close shard. /// /// Return error if the shard is not found. - async fn freeze_shard(&self, req: ShardId) -> Result; - async fn create_table_on_shard(&self, req: &CreateTableOnShardRequest) -> Result<()>; - async fn drop_table_on_shard(&self, req: &DropTableOnShardRequest) -> Result<()>; - async fn open_table_on_shard(&self, req: &OpenTableOnShardRequest) -> Result<()>; - async fn close_table_on_shard(&self, req: &CloseTableOnShardRequest) -> Result<()>; + async fn close_shard(&self, shard_id: ShardId) -> Result; + async fn route_tables(&self, req: &RouteTablesRequest) -> Result; async fn fetch_nodes(&self) -> Result; fn shard_lock_manager(&self) -> ShardLockManagerRef; diff --git a/server/src/grpc/meta_event_service/shard_operation.rs b/cluster/src/shard_operation.rs similarity index 100% rename from server/src/grpc/meta_event_service/shard_operation.rs rename to cluster/src/shard_operation.rs diff --git a/cluster/src/shard_operator.rs b/cluster/src/shard_operator.rs new file mode 100644 index 0000000000..76e3189bbc --- /dev/null +++ b/cluster/src/shard_operator.rs @@ -0,0 +1,401 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::collections::HashMap; + +use catalog::{ + schema::{ + CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions, + DropTableRequest, OpenOptions, OpenTableRequest, TableDef, + }, + table_operator::TableOperator, +}; +use common_util::error::BoxError; +use log::info; +use snafu::ResultExt; +use table_engine::{ + engine::{TableEngineRef, TableState}, + table::TableId, +}; + +use crate::{ + shard_operation::WalRegionCloserRef, + shard_set::{ShardDataRef, UpdatedTableInfo}, + CloseShardWithCause, CloseTableWithCause, CreateTableWithCause, DropTableWithCause, + OpenShardWithCause, OpenTableWithCause, Result, +}; + +pub struct OpenContext { + pub catalog: String, + pub table_engine: TableEngineRef, + pub table_operator: TableOperator, + pub engine: String, +} + +impl std::fmt::Debug for OpenContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenContext") + .field("catalog", &self.catalog) + .field("engine", &self.engine) + .finish() + } +} + +pub struct CloseContext { + pub catalog: String, + pub table_engine: TableEngineRef, + pub table_operator: TableOperator, + pub wal_region_closer: WalRegionCloserRef, + pub engine: String, +} + +impl std::fmt::Debug for CloseContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenContext") + .field("catalog", &self.catalog) + .field("engine", &self.engine) + .finish() + } +} + +pub struct CreateTableContext { + pub catalog: String, + pub table_engine: TableEngineRef, + pub table_operator: TableOperator, + pub partition_table_engine: TableEngineRef, + pub updated_table_info: UpdatedTableInfo, + pub table_schema: common_types::schema::Schema, + pub options: HashMap, + pub create_if_not_exist: bool, + pub engine: String, +} + +impl std::fmt::Debug for CreateTableContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CreateTableContext") + .field("catalog", &self.catalog) + .field("updated_table_info", &self.updated_table_info) + .field("table_schema", &self.table_schema) + .field("options", &self.options) + .field("engine", &self.engine) + .field("create_if_not_exist", &self.create_if_not_exist) + .finish() + } +} + +pub struct AlterContext { + pub catalog: String, + pub table_engine: TableEngineRef, + pub table_operator: TableOperator, + pub updated_table_info: UpdatedTableInfo, + pub engine: String, +} + +impl std::fmt::Debug for AlterContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AlterContext") + .field("catalog", &self.catalog) + .field("updated_table_info", &self.updated_table_info) + .field("engine", &self.engine) + .finish() + } +} + +pub type DropTableContext = AlterContext; +pub type OpenTableContext = AlterContext; +pub type CloseTableContext = AlterContext; + +pub struct ShardOperator { + pub data: ShardDataRef, +} + +impl ShardOperator { + pub async fn open(&self, ctx: OpenContext) -> Result<()> { + let (shard_info, tables) = { + let data = self.data.read().unwrap(); + let shard_info = data.shard_info.clone(); + let tables = data.tables.clone(); + + (shard_info, tables) + }; + + info!("ShardOperator open sequentially begin, shard_id:{shard_info:?}"); + + let table_defs = tables + .into_iter() + .map(|info| TableDef { + catalog_name: ctx.catalog.clone(), + schema_name: info.schema_name, + id: TableId::from(info.id), + name: info.name, + }) + .collect(); + + let open_shard_request = catalog::schema::OpenShardRequest { + shard_id: shard_info.id, + table_defs, + engine: ctx.engine, + }; + let opts = OpenOptions { + table_engine: ctx.table_engine.clone(), + }; + + ctx.table_operator + .open_shard(open_shard_request, opts) + .await + .box_err() + .with_context(|| OpenShardWithCause { + msg: format!("shard_info:{shard_info:?}"), + })?; + + info!("ShardOperator open sequentially finish, shard_id:{shard_info:?}"); + + Ok(()) + } + + pub async fn close(&self, ctx: CloseContext) -> Result<()> { + let (shard_info, tables) = { + let data = self.data.read().unwrap(); + let shard_info = data.shard_info.clone(); + let tables = data.tables.clone(); + + (shard_info, tables) + }; + info!("ShardOperator close sequentially begin, shard_info:{shard_info:?}"); + + { + let mut data = self.data.write().unwrap(); + data.freeze(); + info!("Shard is frozen before closed, shard_id:{}", shard_info.id); + } + + let table_defs = tables + .into_iter() + .map(|info| TableDef { + catalog_name: ctx.catalog.clone(), + schema_name: info.schema_name, + id: TableId::from(info.id), + name: info.name, + }) + .collect(); + let close_shard_request = catalog::schema::CloseShardRequest { + shard_id: shard_info.id, + table_defs, + engine: ctx.engine, + }; + let opts = CloseOptions { + table_engine: ctx.table_engine, + }; + + ctx.table_operator + .close_shard(close_shard_request, opts) + .await + .box_err() + .with_context(|| CloseShardWithCause { + msg: format!("shard_info:{shard_info:?}"), + })?; + + // Try to close wal region + ctx.wal_region_closer + .close_region(shard_info.id) + .await + .with_context(|| CloseShardWithCause { + msg: format!("shard_info:{shard_info:?}"), + })?; + + info!("ShardOperator close sequentially finish, shard_info:{shard_info:?}"); + + Ok(()) + } + + pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { + let shard_info = ctx.updated_table_info.shard_info.clone(); + let table_info = ctx.updated_table_info.table_info.clone(); + + info!( + "ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + // FIXME: maybe should insert table from cluster after having created table. + { + let mut data = self.data.write().unwrap(); + data.try_insert_table(ctx.updated_table_info) + .box_err() + .with_context(|| CreateTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + // Create the table by operator afterwards. + let (table_engine, partition_info) = match table_info.partition_info.clone() { + Some(v) => (ctx.partition_table_engine.clone(), Some(v)), + None => (ctx.table_engine.clone(), None), + }; + + // Build create table request and options. + let create_table_request = CreateTableRequest { + catalog_name: ctx.catalog, + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + table_id: Some(TableId::new(table_info.id)), + table_schema: ctx.table_schema, + engine: ctx.engine, + options: ctx.options, + state: TableState::Stable, + shard_id: shard_info.id, + partition_info, + }; + + let create_opts = CreateOptions { + table_engine, + create_if_not_exists: ctx.create_if_not_exist, + }; + + let _ = ctx + .table_operator + .create_table_on_shard(create_table_request.clone(), create_opts) + .await + .box_err() + .with_context(|| CreateTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + info!( + "ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + Ok(()) + } + + pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { + let shard_info = ctx.updated_table_info.shard_info.clone(); + let table_info = ctx.updated_table_info.table_info.clone(); + + info!( + "ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + // FIXME: maybe should insert table from cluster after having dropped table. + { + let mut data = self.data.write().unwrap(); + data.try_remove_table(ctx.updated_table_info) + .box_err() + .with_context(|| DropTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + // Drop the table by operator afterwards. + let drop_table_request = DropTableRequest { + catalog_name: ctx.catalog, + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + engine: ctx.engine, + }; + let drop_opts = DropOptions { + table_engine: ctx.table_engine, + }; + + ctx.table_operator + .drop_table_on_shard(drop_table_request.clone(), drop_opts) + .await + .box_err() + .with_context(|| DropTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + info!( + "ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + Ok(()) + } + + pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> { + let shard_info = ctx.updated_table_info.shard_info.clone(); + let table_info = ctx.updated_table_info.table_info.clone(); + + info!( + "ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + // FIXME: maybe should insert table from cluster after having opened table. + { + let mut data = self.data.write().unwrap(); + data.try_insert_table(ctx.updated_table_info) + .box_err() + .with_context(|| OpenTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + // Open the table by operator afterwards. + let open_table_request = OpenTableRequest { + catalog_name: ctx.catalog, + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + // FIXME: the engine type should not use the default one. + engine: ctx.engine, + shard_id: shard_info.id, + table_id: TableId::new(table_info.id), + }; + let open_opts = OpenOptions { + table_engine: ctx.table_engine, + }; + + ctx.table_operator + .open_table_on_shard(open_table_request.clone(), open_opts) + .await + .box_err() + .with_context(|| OpenTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + info!( + "ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + ); + + Ok(()) + } + + pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { + let shard_info = ctx.updated_table_info.shard_info.clone(); + let table_info = ctx.updated_table_info.table_info.clone(); + + info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}"); + + // FIXME: maybe should remove table from cluster after having closed table. + { + let mut data = self.data.write().unwrap(); + data.try_remove_table(ctx.updated_table_info) + .box_err() + .with_context(|| CloseTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + // Close the table by catalog manager afterwards. + let close_table_request = CloseTableRequest { + catalog_name: ctx.catalog, + schema_name: table_info.schema_name.clone(), + table_name: table_info.name.clone(), + table_id: TableId::new(table_info.id), + // FIXME: the engine type should not use the default one. + engine: ctx.engine, + }; + let close_opts = CloseOptions { + table_engine: ctx.table_engine, + }; + + ctx.table_operator + .close_table_on_shard(close_table_request.clone(), close_opts) + .await + .box_err() + .with_context(|| CloseTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}"); + + Ok(()) + } +} diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs new file mode 100644 index 0000000000..28b57566ea --- /dev/null +++ b/cluster/src/shard_set.rs @@ -0,0 +1,227 @@ +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{collections::HashMap, sync::Arc}; + +use meta_client::types::{ShardId, ShardInfo, TableInfo, TablesOfShard}; +use snafu::{ensure, OptionExt}; + +use crate::{ + shard_operator::{ + CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext, + OpenTableContext, ShardOperator, + }, + Result, ShardVersionMismatch, TableAlreadyExists, TableNotFound, UpdateFrozenShard, +}; + +/// Shard set +/// +/// Manage all shards opened on current node +#[derive(Debug, Default, Clone)] +pub struct ShardSet { + inner: Arc>>, +} + +impl ShardSet { + // Fetch all the shard infos. + pub fn all_shards(&self) -> Vec { + let inner = self.inner.read().unwrap(); + inner.values().cloned().collect() + } + + // Get the shard by its id. + pub fn get(&self, shard_id: ShardId) -> Option { + let inner = self.inner.read().unwrap(); + inner.get(&shard_id).cloned() + } + + /// Remove the shard. + pub fn remove(&self, shard_id: ShardId) -> Option { + let mut inner = self.inner.write().unwrap(); + inner.remove(&shard_id) + } + + /// Insert the tables of one shard. + pub fn insert(&self, shard_id: ShardId, shard: ShardRef) { + let mut inner = self.inner.write().unwrap(); + inner.insert(shard_id, shard); + } +} + +/// Shard +/// +/// NOTICE: all write operations on a shard will be performed sequentially. +pub struct Shard { + data: ShardDataRef, + operator: tokio::sync::Mutex, +} + +impl std::fmt::Debug for Shard { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Shard").field("data", &self.data).finish() + } +} + +impl Shard { + pub fn new(tables_of_shard: TablesOfShard) -> Self { + let data = Arc::new(std::sync::RwLock::new(ShardData { + shard_info: tables_of_shard.shard_info, + tables: tables_of_shard.tables, + frozen: false, + })); + + let operator = tokio::sync::Mutex::new(ShardOperator { data: data.clone() }); + + Self { data, operator } + } + + pub fn shard_info(&self) -> ShardInfo { + let data = self.data.read().unwrap(); + data.shard_info.clone() + } + + pub fn find_table(&self, schema_name: &str, table_name: &str) -> Option { + let data = self.data.read().unwrap(); + data.find_table(schema_name, table_name) + } + + pub async fn open(&self, ctx: OpenContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.open(ctx).await + } + + pub async fn close(&self, ctx: CloseContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.close(ctx).await + } + + pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.create_table(ctx).await + } + + pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.drop_table(ctx).await + } + + pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.open_table(ctx).await + } + + pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { + let operator = self.operator.lock().await; + operator.close_table(ctx).await + } +} + +pub type ShardRef = Arc; + +#[derive(Debug, Clone)] +pub struct UpdatedTableInfo { + pub prev_version: u64, + pub shard_info: ShardInfo, + pub table_info: TableInfo, +} + +/// Shard data +#[derive(Debug)] +pub struct ShardData { + /// Shard info + pub shard_info: ShardInfo, + + /// Tables in shard + pub tables: Vec, + + /// Flag indicating that further updates are prohibited + pub frozen: bool, +} + +impl ShardData { + pub fn find_table(&self, schema_name: &str, table_name: &str) -> Option { + self.tables + .iter() + .find(|table| table.schema_name == schema_name && table.name == table_name) + .cloned() + } + + pub fn freeze(&mut self) { + self.frozen = true; + } + + pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + let UpdatedTableInfo { + prev_version: prev_shard_version, + shard_info: curr_shard, + table_info: new_table, + } = updated_info; + + ensure!( + !self.frozen, + UpdateFrozenShard { + shard_id: curr_shard.id, + } + ); + + ensure!( + self.shard_info.version == prev_shard_version, + ShardVersionMismatch { + shard_info: self.shard_info.clone(), + expect_version: prev_shard_version, + } + ); + + let table = self.tables.iter().find(|v| v.id == new_table.id); + ensure!( + table.is_none(), + TableAlreadyExists { + msg: "the table to insert has already existed", + } + ); + + // Update tables of shard. + self.shard_info = curr_shard; + self.tables.push(new_table); + + Ok(()) + } + + pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + let UpdatedTableInfo { + prev_version: prev_shard_version, + shard_info: curr_shard, + table_info: new_table, + } = updated_info; + + ensure!( + !self.frozen, + UpdateFrozenShard { + shard_id: curr_shard.id, + } + ); + + ensure!( + self.shard_info.version == prev_shard_version, + ShardVersionMismatch { + shard_info: self.shard_info.clone(), + expect_version: prev_shard_version, + } + ); + + let table_idx = self + .tables + .iter() + .position(|v| v.id == new_table.id) + .with_context(|| TableNotFound { + msg: format!("the table to remove is not found, table:{new_table:?}"), + })?; + + // Update tables of shard. + self.shard_info = curr_shard; + self.tables.swap_remove(table_idx); + + Ok(()) + } +} + +pub type ShardDataRef = Arc>; diff --git a/cluster/src/shard_tables_cache.rs b/cluster/src/shard_tables_cache.rs deleted file mode 100644 index e38f7c1f92..0000000000 --- a/cluster/src/shard_tables_cache.rs +++ /dev/null @@ -1,279 +0,0 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; - -use meta_client::types::{ShardId, ShardInfo, ShardVersion, TableInfo, TablesOfShard}; -use snafu::{ensure, OptionExt}; - -use crate::{ - Result, ShardNotFound, ShardVersionMismatch, TableAlreadyExists, TableNotFound, - UpdateFrozenShard, -}; - -/// [ShardTablesCache] caches the information about tables and shards, and the -/// relationship between them is: one shard -> multiple tables. -#[derive(Debug, Default, Clone)] -pub struct ShardTablesCache { - inner: Arc>, -} - -#[derive(Debug, Clone)] -pub struct TableWithShards { - pub table_info: TableInfo, - pub shard_infos: Vec, -} - -impl ShardTablesCache { - pub fn find_table_by_name( - &self, - catalog: &str, - schema: &str, - table: &str, - ) -> Option { - self.inner - .read() - .unwrap() - .find_table_by_name(catalog, schema, table) - } - - // Fetch all the shard infos. - pub fn all_shard_infos(&self) -> Vec { - self.inner.read().unwrap().all_shard_infos() - } - - // Get the shard by its id. - pub fn get(&self, shard_id: ShardId) -> Option { - self.inner.read().unwrap().get(shard_id) - } - - /// Remove the shard. - pub fn remove(&self, shard_id: ShardId) -> Option { - self.inner.write().unwrap().remove(shard_id) - } - - /// Insert the tables of one shard. - pub fn insert(&self, tables_of_shard: TablesOfShard) { - self.inner.write().unwrap().insert(tables_of_shard) - } - - /// Freeze the shard. - pub fn freeze(&self, shard_id: ShardId) -> Option { - self.inner.write().unwrap().freeze(shard_id) - } - - /// Try to insert a new table to the shard with a newer version. - /// - /// It will fail if: - /// - the shard doesn't exist, or - /// - the shard version in the cache is not equal to the provided - /// `prev_shard_version`, or - /// - the table already exists. - pub fn try_insert_table_to_shard( - &self, - prev_shard_version: ShardVersion, - curr_shard: ShardInfo, - new_table: TableInfo, - ) -> Result<()> { - self.inner.write().unwrap().try_insert_table_to_shard( - prev_shard_version, - curr_shard, - new_table, - ) - } - - /// Try to remove a table from the shard with a newer version. - /// - /// It will fail if: - /// - the shard doesn't exist, or - /// - the shard version in the cache is not equal to the provided - /// `prev_shard_version`, or - /// - the table doesn't exist. - pub fn try_remove_table_from_shard( - &self, - prev_shard_version: ShardVersion, - curr_shard: ShardInfo, - new_table: TableInfo, - ) -> Result<()> { - self.inner.write().unwrap().try_remove_table_from_shard( - prev_shard_version, - curr_shard, - new_table, - ) - } -} - -#[derive(Clone, Debug)] -struct TablesOfShardCacheEntry { - entry: TablesOfShard, - frozen: bool, -} - -#[derive(Debug, Default)] -struct Inner { - // Tables organized by shard. - // TODO: The shard roles should be also taken into considerations. - tables_by_shard: HashMap, -} - -impl Inner { - pub fn find_table_by_name( - &self, - _catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Option { - // TODO: We need a more efficient way to find TableWithShards. - let mut table_info = None; - let mut shard_infos = vec![]; - for tables_of_shard in self.tables_by_shard.values() { - if let Some(v) = tables_of_shard - .entry - .tables - .iter() - .find(|table| table.schema_name == schema_name && table.name == table_name) - { - if table_info.is_none() { - table_info = Some(v.clone()); - } - - shard_infos.push(tables_of_shard.entry.shard_info.clone()); - } - } - - table_info.map(|v| TableWithShards { - table_info: v, - shard_infos, - }) - } - - fn all_shard_infos(&self) -> Vec { - self.tables_by_shard - .values() - .map(|v| v.entry.shard_info.clone()) - .collect() - } - - fn get(&self, shard_id: ShardId) -> Option { - self.tables_by_shard.get(&shard_id).map(|v| v.entry.clone()) - } - - fn remove(&mut self, shard_id: ShardId) -> Option { - self.tables_by_shard.remove(&shard_id).map(|v| v.entry) - } - - fn freeze(&mut self, shard_id: ShardId) -> Option { - let tables_of_shard = self.tables_by_shard.get_mut(&shard_id)?; - tables_of_shard.frozen = true; - - Some(tables_of_shard.entry.clone()) - } - - fn insert(&mut self, tables_of_shard: TablesOfShard) { - let shard_id = tables_of_shard.shard_info.id; - let entry = TablesOfShardCacheEntry { - entry: tables_of_shard, - frozen: false, - }; - self.tables_by_shard.insert(shard_id, entry); - } - - fn try_insert_table_to_shard( - &mut self, - prev_shard_version: ShardVersion, - curr_shard: ShardInfo, - new_table: TableInfo, - ) -> Result<()> { - let tables_of_shard = self - .tables_by_shard - .get_mut(&curr_shard.id) - .with_context(|| ShardNotFound { - msg: format!( - "insert table to a non-existent shard, shard_id:{}", - curr_shard.id - ), - })?; - - ensure!( - !tables_of_shard.frozen, - UpdateFrozenShard { - shard_id: curr_shard.id, - } - ); - - ensure!( - tables_of_shard.entry.shard_info.version == prev_shard_version, - ShardVersionMismatch { - shard_info: tables_of_shard.entry.shard_info.clone(), - expect_version: prev_shard_version, - } - ); - - let table = tables_of_shard - .entry - .tables - .iter() - .find(|v| v.id == new_table.id); - ensure!( - table.is_none(), - TableAlreadyExists { - msg: "the table to insert has already existed", - } - ); - - // Update tables of shard. - tables_of_shard.entry.shard_info = curr_shard; - tables_of_shard.entry.tables.push(new_table); - - Ok(()) - } - - fn try_remove_table_from_shard( - &mut self, - prev_shard_version: ShardVersion, - curr_shard: ShardInfo, - new_table: TableInfo, - ) -> Result<()> { - let tables_of_shard = self - .tables_by_shard - .get_mut(&curr_shard.id) - .with_context(|| ShardNotFound { - msg: format!( - "remove table from a non-existent shard, shard_id:{}", - curr_shard.id - ), - })?; - - ensure!( - !tables_of_shard.frozen, - UpdateFrozenShard { - shard_id: curr_shard.id, - } - ); - - ensure!( - tables_of_shard.entry.shard_info.version == prev_shard_version, - ShardVersionMismatch { - shard_info: tables_of_shard.entry.shard_info.clone(), - expect_version: prev_shard_version, - } - ); - - let table_idx = tables_of_shard - .entry - .tables - .iter() - .position(|v| v.id == new_table.id) - .with_context(|| TableNotFound { - msg: format!("the table to remove is not found, table:{new_table:?}"), - })?; - - // Update tables of shard. - tables_of_shard.entry.shard_info = curr_shard; - tables_of_shard.entry.tables.swap_remove(table_idx); - - Ok(()) - } -} diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index a9e861631f..78a6ab185b 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -162,19 +162,14 @@ impl Router for ClusterBasedRouter { mod tests { use std::{collections::HashMap, sync::Arc, thread::sleep, time::Duration}; - use ceresdbproto::{ - meta_event::{ - CloseTableOnShardRequest, CreateTableOnShardRequest, DropTableOnShardRequest, - OpenTableOnShardRequest, - }, - storage::RequestContext, + use ceresdbproto::storage::RequestContext; + use cluster::{ + shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp, }; - use cluster::{shard_lock_manager::ShardLockManagerRef, Cluster, ClusterNodesResp}; use common_types::table::ShardId; use common_util::config::ReadableDuration; use meta_client::types::{ NodeShard, RouteEntry, RouteTablesResponse, ShardInfo, ShardRole::Leader, TableInfo, - TablesOfShard, }; use super::*; @@ -191,37 +186,15 @@ mod tests { unimplemented!(); } - async fn open_shard(&self, _: &ShardInfo) -> cluster::Result { + async fn open_shard(&self, _: &ShardInfo) -> cluster::Result { unimplemented!(); } - async fn close_shard(&self, _: ShardId) -> cluster::Result { + fn shard(&self, _: ShardId) -> Option { unimplemented!(); } - async fn freeze_shard(&self, _: ShardId) -> cluster::Result { - unimplemented!(); - } - - async fn create_table_on_shard( - &self, - _req: &CreateTableOnShardRequest, - ) -> cluster::Result<()> { - unimplemented!(); - } - - async fn drop_table_on_shard(&self, _req: &DropTableOnShardRequest) -> cluster::Result<()> { - unimplemented!(); - } - - async fn open_table_on_shard(&self, _req: &OpenTableOnShardRequest) -> cluster::Result<()> { - unimplemented!(); - } - - async fn close_table_on_shard( - &self, - _req: &CloseTableOnShardRequest, - ) -> cluster::Result<()> { + async fn close_shard(&self, _: ShardId) -> cluster::Result { unimplemented!(); } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index fcabc3c8de..165f15b4c6 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -6,13 +6,7 @@ use std::{sync::Arc, time::Instant}; use analytic_engine::setup::OpenedWals; use async_trait::async_trait; -use catalog::{ - schema::{ - CloseOptions, CloseTableRequest, CreateOptions, CreateTableRequest, DropOptions, - DropTableRequest, OpenOptions, OpenTableRequest, TableDef, - }, - table_operator::TableOperator, -}; +use catalog::table_operator::TableOperator; use ceresdbproto::meta_event::{ meta_event_service_server::MetaEventService, ChangeShardRoleRequest, ChangeShardRoleResponse, CloseShardRequest, CloseShardResponse, CloseTableOnShardRequest, CloseTableOnShardResponse, @@ -21,34 +15,67 @@ use ceresdbproto::meta_event::{ OpenShardResponse, OpenTableOnShardRequest, OpenTableOnShardResponse, SplitShardRequest, SplitShardResponse, }; -use cluster::ClusterRef; +use cluster::{ + shard_operation::{WalCloserAdapter, WalRegionCloserRef}, + shard_operator::{ + CloseContext, CloseTableContext, CreateTableContext, DropTableContext, OpenContext, + OpenTableContext, + }, + shard_set::UpdatedTableInfo, + ClusterRef, +}; use common_types::{schema::SchemaEncoder, table::ShardId}; use common_util::{error::BoxError, runtime::Runtime, time::InstantExt}; use log::{error, info, warn}; -use meta_client::types::ShardInfo; +use meta_client::types::{ShardInfo, TableInfo}; use paste::paste; use proxy::instance::InstanceRef; use query_engine::executor::Executor as QueryExecutor; use snafu::{OptionExt, ResultExt}; -use table_engine::{ - engine::{TableEngineRef, TableState}, - partition::PartitionInfo, - table::TableId, - ANALYTIC_ENGINE_TYPE, -}; +use table_engine::{engine::TableEngineRef, ANALYTIC_ENGINE_TYPE}; use tonic::Response; -use self::shard_operation::WalCloserAdapter; use crate::grpc::{ - meta_event_service::{ - error::{ErrNoCause, ErrWithCause, Result, StatusCode}, - shard_operation::WalRegionCloserRef, - }, + meta_event_service::error::{ErrNoCause, ErrWithCause, Result, StatusCode}, metrics::META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC, }; mod error; -mod shard_operation; + +macro_rules! extract_updated_table_info { + ($request: expr) => {{ + let update_shard_info = $request.update_shard_info.clone(); + let table_info = $request.table_info.clone(); + + let update_shard_info = update_shard_info.context(ErrNoCause { + code: StatusCode::Internal, + msg: "update shard info is missing", + })?; + let curr_shard_info = update_shard_info.curr_shard_info.context(ErrNoCause { + code: StatusCode::Internal, + msg: "update shard info is missing", + })?; + let table_info = table_info.context(ErrNoCause { + code: StatusCode::Internal, + msg: "update shard info is missing", + })?; + + let prev_version = update_shard_info.prev_version; + let shard_info = ShardInfo::from(&curr_shard_info); + let table_info = TableInfo::try_from(table_info) + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "failed to parse tableInfo", + })?; + + UpdatedTableInfo { + prev_version, + shard_info, + table_info, + } + }}; +} /// Builder for [MetaServiceImpl]. pub struct Builder { @@ -247,9 +274,10 @@ impl HandlerContext { } async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()> { + // Try to lock the shard in node level. ctx.acquire_shard_lock(shard_info.id).await?; - let tables_of_shard = ctx + let shard = ctx .cluster .open_shard(&shard_info) .await @@ -259,42 +287,23 @@ async fn do_open_shard(ctx: HandlerContext, shard_info: ShardInfo) -> Result<()> msg: "fail to open shards in cluster", })?; - let catalog_name = &ctx.default_catalog; - let shard_info = tables_of_shard.shard_info; - let table_defs = tables_of_shard - .tables - .into_iter() - .map(|info| TableDef { - catalog_name: catalog_name.clone(), - schema_name: info.schema_name, - id: TableId::from(info.id), - name: info.name, - }) - .collect(); - - let open_shard_request = catalog::schema::OpenShardRequest { - shard_id: shard_info.id, - table_defs, + let open_ctx = OpenContext { + catalog: ctx.default_catalog.clone(), + table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + // FIXME: the engine type should not use the default one. engine: ANALYTIC_ENGINE_TYPE.to_string(), }; - let opts = OpenOptions { - table_engine: ctx.table_engine, - }; - ctx.table_operator - .open_shard(open_shard_request, opts) - .await - .box_err() - .context(ErrWithCause { - code: StatusCode::Internal, - msg: "failed to open shard", - }) + shard.open(open_ctx).await.box_err().context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to open shard", + }) } // TODO: maybe we should encapsulate the logic of handling meta event into a // trait, so that we don't need to expose the logic to the meta event service // implementation. - async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Result<()> { info!("Receive open shard request, request:{request:?}"); let shard_info = ShardInfo::from(&request.shard.context(ErrNoCause { @@ -317,54 +326,27 @@ async fn handle_open_shard(ctx: HandlerContext, request: OpenShardRequest) -> Re } async fn do_close_shard(ctx: &HandlerContext, shard_id: ShardId) -> Result<()> { - let tables_of_shard = - ctx.cluster - .freeze_shard(shard_id) - .await - .box_err() - .context(ErrWithCause { - code: StatusCode::Internal, - msg: "fail to freeze shard before close it in cluster", - })?; - info!("Shard is frozen before closed, shard_id:{shard_id}"); - - let catalog_name = &ctx.default_catalog.clone(); - let shard_info = tables_of_shard.shard_info; - let table_defs = tables_of_shard - .tables - .into_iter() - .map(|info| TableDef { - catalog_name: catalog_name.clone(), - schema_name: info.schema_name, - id: TableId::from(info.id), - name: info.name, - }) - .collect(); - let close_shard_request = catalog::schema::CloseShardRequest { - shard_id: shard_info.id, - table_defs, - engine: ANALYTIC_ENGINE_TYPE.to_string(), - }; - let opts = CloseOptions { + let shard = ctx.cluster.shard(shard_id).with_context(|| ErrNoCause { + code: StatusCode::Internal, + msg: format!("shard not found when closing shard, shard_id:{shard_id}",), + })?; + + let close_ctx = CloseContext { + catalog: ctx.default_catalog.clone(), table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + wal_region_closer: ctx.wal_region_closer.clone(), + // FIXME: the engine type should not use the default one. + engine: ANALYTIC_ENGINE_TYPE.to_string(), }; - ctx.table_operator - .close_shard(close_shard_request, opts) + shard + .close(close_ctx) .await .box_err() .context(ErrWithCause { code: StatusCode::Internal, - msg: "failed to close shard", - })?; - - // Try to close wal region - ctx.wal_region_closer - .close_region(shard_id) - .await - .with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: format!("fail to close wal region, shard_id:{shard_id}"), + msg: "fail to close shard", })?; // Remove the shard from the cluster topology after the shard is closed indeed. @@ -381,7 +363,11 @@ async fn do_close_shard(ctx: &HandlerContext, shard_id: ShardId) -> Result<()> { ctx.release_shard_lock(shard_id).await.map_err(|e| { error!("Failed to release shard lock, shard_id:{shard_id}, err:{e}"); e - }) + })?; + + info!("Shard close sequentially finish, shard_id:{shard_id}"); + + Ok(()) } async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> Result<()> { @@ -405,33 +391,19 @@ async fn handle_create_table_on_shard( ctx: HandlerContext, request: CreateTableOnShardRequest, ) -> Result<()> { - ctx.cluster - .create_table_on_shard(&request) - .await - .box_err() - .with_context(|| ErrWithCause { + let updated_table_info = extract_updated_table_info!(request); + + let shard = ctx + .cluster + .shard(updated_table_info.shard_info.id) + .with_context(|| ErrNoCause { code: StatusCode::Internal, - msg: format!("fail to create table on shard in cluster, req:{request:?}"), + msg: format!( + "shard not found when creating table on shard, req:{:?}", + request.clone() + ), })?; - // Create the table by operator afterwards. - let catalog_name = &ctx.default_catalog; - let shard_info = request - .update_shard_info - .context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "update shard info is missing in the CreateTableOnShardRequest", - })? - .curr_shard_info - .context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "current shard info is missing ine CreateTableOnShardRequest", - })?; - let table_info = request.table_info.context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "table info is missing in the CreateTableOnShardRequest", - })?; - // Get information for partition table creating. let table_schema = SchemaEncoder::default() .decode(&request.encoded_schema) @@ -444,189 +416,129 @@ async fn handle_create_table_on_shard( ), })?; - let (table_engine, partition_info) = match table_info.partition_info { - Some(v) => { - let partition_info = Some(PartitionInfo::try_from(v.clone()).box_err().with_context( - || ErrWithCause { - code: StatusCode::BadRequest, - msg: format!("fail to parse partition info, partition_info:{v:?}"), - }, - )?); - (ctx.partition_table_engine.clone(), partition_info) - } - None => (ctx.table_engine.clone(), None), - }; - - // Build create table request and options. - let create_table_request = CreateTableRequest { - catalog_name: catalog_name.clone(), - schema_name: table_info.schema_name, - table_name: table_info.name, - table_id: Some(TableId::new(table_info.id)), + let create_table_ctx = CreateTableContext { + catalog: ctx.default_catalog.clone(), + table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + partition_table_engine: ctx.partition_table_engine.clone(), + updated_table_info, table_schema, - engine: request.engine, options: request.options, - state: TableState::Stable, - shard_id: shard_info.id, - partition_info, - }; - - let create_opts = CreateOptions { - table_engine, - create_if_not_exists: request.create_if_not_exist, + create_if_not_exist: request.create_if_not_exist, + engine: request.engine, }; - let _ = ctx - .table_operator - .create_table_on_shard(create_table_request.clone(), create_opts) + shard + .create_table(create_table_ctx) .await .box_err() .context(ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to create table with request:{create_table_request:?}"), - })?; - - Ok(()) + msg: "fail to create table on shard", + }) } async fn handle_drop_table_on_shard( ctx: HandlerContext, request: DropTableOnShardRequest, ) -> Result<()> { - ctx.cluster - .drop_table_on_shard(&request) - .await - .box_err() - .with_context(|| ErrWithCause { + let updated_table_info = extract_updated_table_info!(request); + + let shard = ctx + .cluster + .shard(updated_table_info.shard_info.id) + .with_context(|| ErrNoCause { code: StatusCode::Internal, - msg: format!("fail to drop table on shard in cluster, req:{request:?}"), + msg: format!( + "shard not found when dropping table on shard, req:{:?}", + request.clone() + ), })?; - // Drop the table by operator afterwards. - let catalog_name = ctx.default_catalog.clone(); - let table_info = request.table_info.context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "table info is missing in the DropTableOnShardRequest", - })?; - let drop_table_request = DropTableRequest { - catalog_name, - schema_name: table_info.schema_name, - table_name: table_info.name, + let drop_table_ctx = DropTableContext { + catalog: ctx.default_catalog.clone(), + table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + updated_table_info, // FIXME: the engine type should not use the default one. engine: ANALYTIC_ENGINE_TYPE.to_string(), }; - let drop_opts = DropOptions { - table_engine: ctx.table_engine, - }; - ctx.table_operator - .drop_table_on_shard(drop_table_request.clone(), drop_opts) + shard + .drop_table(drop_table_ctx) .await .box_err() - .with_context(|| ErrWithCause { + .context(ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to drop table with request:{drop_table_request:?}"), - })?; - - Ok(()) + msg: "fail to drop table on shard", + }) } async fn handle_open_table_on_shard( ctx: HandlerContext, request: OpenTableOnShardRequest, ) -> Result<()> { - ctx.cluster - .open_table_on_shard(&request) - .await - .box_err() - .with_context(|| ErrWithCause { + let updated_table_info = extract_updated_table_info!(request); + + let shard = ctx + .cluster + .shard(updated_table_info.shard_info.id) + .with_context(|| ErrNoCause { code: StatusCode::Internal, - msg: format!("fail to open table on shard in cluster, req:{request:?}"), + msg: format!("shard not found when opening table on shard, request:{request:?}",), })?; - // Open the table by operator afterwards. - let catalog_name = ctx.default_catalog.clone(); - let shard_info = request - .update_shard_info - .context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "update shard info is missing in the OpenTableOnShardRequest", - })? - .curr_shard_info - .context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "current shard info is missing ine OpenTableOnShardRequest", - })?; - let table_info = request.table_info.context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "table info is missing in the OpenTableOnShardRequest", - })?; - let open_table_request = OpenTableRequest { - catalog_name, - schema_name: table_info.schema_name, - table_name: table_info.name, + let open_table_ctx = OpenTableContext { + catalog: ctx.default_catalog.clone(), + table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + updated_table_info, // FIXME: the engine type should not use the default one. engine: ANALYTIC_ENGINE_TYPE.to_string(), - shard_id: shard_info.id, - table_id: TableId::new(table_info.id), - }; - let open_opts = OpenOptions { - table_engine: ctx.table_engine, }; - ctx.table_operator - .open_table_on_shard(open_table_request.clone(), open_opts) + shard + .open_table(open_table_ctx) .await .box_err() - .with_context(|| ErrWithCause { + .context(ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to open table with request:{open_table_request:?}"), - })?; - - Ok(()) + msg: "fail to open table on shard", + }) } async fn handle_close_table_on_shard( ctx: HandlerContext, request: CloseTableOnShardRequest, ) -> Result<()> { - ctx.cluster - .close_table_on_shard(&request) - .await - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::Internal, - msg: format!("fail to close table on shard in cluster, req:{request:?}"), - })?; - - // Close the table by catalog manager afterwards. - let catalog_name = &ctx.default_catalog; - let table_info = request.table_info.context(ErrNoCause { - code: StatusCode::BadRequest, - msg: "table info is missing in the CloseTableOnShardRequest", + let updated_table_info = extract_updated_table_info!(request); + let shard_id = updated_table_info.shard_info.id; + + let shard = ctx.cluster.shard(shard_id).with_context(|| ErrNoCause { + code: StatusCode::Internal, + msg: format!( + "shard not found when closing table on shard, req:{:?}", + request.clone() + ), })?; - let close_table_request = CloseTableRequest { - catalog_name: catalog_name.clone(), - schema_name: table_info.schema_name, - table_name: table_info.name, - table_id: TableId::new(table_info.id), + + let close_table_ctx = CloseTableContext { + catalog: ctx.default_catalog.clone(), + table_engine: ctx.table_engine.clone(), + table_operator: ctx.table_operator.clone(), + updated_table_info, // FIXME: the engine type should not use the default one. engine: ANALYTIC_ENGINE_TYPE.to_string(), }; - let close_opts = CloseOptions { - table_engine: ctx.table_engine, - }; - ctx.table_operator - .close_table_on_shard(close_table_request.clone(), close_opts) + shard + .close_table(close_table_ctx) .await .box_err() - .with_context(|| ErrWithCause { + .context(ErrWithCause { code: StatusCode::Internal, - msg: format!("fail to close table with request:{close_table_request:?}"), - })?; - - Ok(()) + msg: "fail to close table on shard", + }) } #[async_trait] diff --git a/src/setup.rs b/src/setup.rs index 286649cea5..ac2db96e22 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -11,9 +11,7 @@ use analytic_engine::{ }; use catalog::{manager::ManagerRef, schema::OpenOptions, table_operator::TableOperator}; use catalog_impls::{table_based::TableBasedManager, volatile, CatalogManagerImpl}; -use cluster::{ - cluster_impl::ClusterImpl, config::ClusterConfig, shard_tables_cache::ShardTablesCache, -}; +use cluster::{cluster_impl::ClusterImpl, config::ClusterConfig, shard_set::ShardSet}; use common_util::runtime; use df_operator::registry::FunctionRegistryImpl; use interpreters::table_manipulator::{catalog_based, meta_based}; @@ -211,11 +209,11 @@ async fn build_with_meta( .await .expect("fail to build meta client"); - let shard_tables_cache = ShardTablesCache::default(); + let shard_set = ShardSet::default(); let cluster = { let cluster_impl = ClusterImpl::try_new( endpoint, - shard_tables_cache.clone(), + shard_set.clone(), meta_client.clone(), cluster_config.clone(), runtimes.meta_runtime.clone(), @@ -240,10 +238,8 @@ async fn build_with_meta( }; let engine_proxy = build_table_engine_proxy(engine_builder).await; - let meta_based_manager_ref = Arc::new(volatile::ManagerImpl::new( - shard_tables_cache, - meta_client.clone(), - )); + let meta_based_manager_ref = + Arc::new(volatile::ManagerImpl::new(shard_set, meta_client.clone())); // Build catalog manager. let catalog_manager = Arc::new(CatalogManagerImpl::new(meta_based_manager_ref));