diff --git a/Cargo.lock b/Cargo.lock index 98f6bd9a40..1b0528dc9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1329,9 +1329,9 @@ dependencies = [ [[package]] name = "ceresdbproto" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce5c24f3f76b35c35fcacdafcd3072a8363c0b098c0704996b9b58c833c314a" +checksum = "aabe958e8ea5fd3da1a2449b7d49c8f8dcaa87195a1498bc7a0215794d3f236e" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index 50280b9146..dab8eed2d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,9 +93,8 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.21" +ceresdbproto = "1.0.22" codec = { path = "components/codec" } -notifier = { path = "components/notifier" } chrono = "0.4" clap = "3.0" clru = "0.6.1" @@ -114,10 +113,6 @@ generic_error = { path = "components/generic_error" } hash_ext = { path = "components/hash_ext" } hex = "0.4.3" hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" } -lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } -lazy_static = "1.4.0" -logger = { path = "components/logger" } -lru = "0.7.6" id_allocator = { path = "components/id_allocator" } influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" } influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" } @@ -125,10 +120,15 @@ influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3 influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" } interpreters = { path = "interpreters" } itertools = "0.10.5" +lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } +lazy_static = "1.4.0" +logger = { path = "components/logger" } +lru = "0.7.6" macros = { path = "components/macros" } message_queue = { path = "components/message_queue" } meta_client = { path = "meta_client" } metric_ext = { path = "components/metric_ext" } +notifier = { path = "components/notifier" } object_store = { path = "components/object_store" } panic_ext = { path = "components/panic_ext" } partitioned_lock = { path = "components/partitioned_lock" } diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index bea44c2668..928375c983 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -326,20 +326,12 @@ impl Schema for SchemaImpl { // Do real create table. // Partition table is not stored in ShardTableManager. if request.params.partition_info.is_none() { - let shard = - self.shard_set - .get(request.shard_id) - .with_context(|| schema::CreateTable { - request: request.clone(), - msg: "shard not found".to_string(), - })?; - - // TODO: seems unnecessary? - let _ = shard - .find_table(&request.params.schema_name, &request.params.table_name) + let _ = self + .shard_set + .get(request.shard_id) .with_context(|| schema::CreateTable { request: request.clone(), - msg: "table not found in shard".to_string(), + msg: "shard not found".to_string(), })?; } let request = request.into_engine_create_request(None, self.schema_id); diff --git a/cluster/src/shard_operator.rs b/cluster/src/shard_operator.rs index 691070bb9f..0078080241 100644 --- a/cluster/src/shard_operator.rs +++ b/cluster/src/shard_operator.rs @@ -21,6 +21,7 @@ use catalog::{ }, table_operator::TableOperator, }; +use common_types::table::ShardVersion; use generic_error::BoxError; use logger::info; use snafu::ResultExt; @@ -219,24 +220,16 @@ impl ShardOperator { Ok(()) } - pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + pub async fn create_table(&self, ctx: CreateTableContext) -> Result { + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator create table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); - // FIXME: maybe should insert table from cluster after having created table. - { - let mut data = self.data.write().unwrap(); - data.try_insert_table(ctx.updated_table_info) - .box_err() - .with_context(|| CreateTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - // Create the table by operator afterwards. let (table_engine, partition_info) = match table_info.partition_info.clone() { Some(v) => (ctx.partition_table_engine.clone(), Some(v)), @@ -275,30 +268,37 @@ impl ShardOperator { })?; info!( - "ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is created by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name, ); - Ok(()) + let latest_version = { + let mut data = self.data.write().unwrap(); + data.try_create_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| CreateTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })? + }; + + info!( + "ShardOperator create table sequentially finish, shard_id:{}, shard_version:{}, table:{}", + shard_info.id, shard_info.version, table_info.name, + ); + + Ok(latest_version) } - pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + pub async fn drop_table(&self, ctx: DropTableContext) -> Result { + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator drop table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); - // FIXME: maybe should insert table from cluster after having dropped table. - { - let mut data = self.data.write().unwrap(); - data.try_remove_table(ctx.updated_table_info) - .box_err() - .with_context(|| DropTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - // Drop the table by operator afterwards. let drop_table_request = DropTableRequest { catalog_name: ctx.catalog, @@ -319,31 +319,41 @@ impl ShardOperator { })?; info!( - "ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is dropped, shard_id:{}, table:{}", + shard_info.id, table_info.name, ); - Ok(()) + // Update the shard info after the table is dropped. + let latest_version = { + let mut data = self.data.write().unwrap(); + data.try_drop_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| DropTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })? + }; + + info!( + "ShardOperator drop table sequentially finish, latest_version:{latest_version}, shard_id:{}, old_shard_version:{}, table:{}", + shard_info.id, + shard_info.version, + table_info.name, + ); + + Ok(latest_version) } pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator open table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); - // FIXME: maybe should insert table from cluster after having opened table. - { - let mut data = self.data.write().unwrap(); - data.try_insert_table(ctx.updated_table_info) - .box_err() - .with_context(|| OpenTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - - // Open the table by operator afterwards. + // Open the table by operator. let open_table_request = OpenTableRequest { catalog_name: ctx.catalog, schema_name: table_info.schema_name.clone(), @@ -366,28 +376,34 @@ impl ShardOperator { })?; info!( - "ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is opened by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name ); - Ok(()) - } - - pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); - - info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}"); - - // FIXME: maybe should remove table from cluster after having closed table. + // Update the shard info after the table is opened. { let mut data = self.data.write().unwrap(); - data.try_remove_table(ctx.updated_table_info) + data.try_open_table(ctx.updated_table_info.clone()) .box_err() - .with_context(|| CloseTableWithCause { + .with_context(|| OpenTableWithCause { msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), })?; } + info!( + "ShardOperator open table sequentially finish, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); + + Ok(()) + } + + pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; + + info!("ShardOperator close table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", shard_info.id, table_info.name); + // Close the table by catalog manager afterwards. let close_table_request = CloseTableRequest { catalog_name: ctx.catalog, @@ -409,7 +425,25 @@ impl ShardOperator { msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), })?; - info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}"); + info!( + "ShardOperator table is closed by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); + + // Update the shard info after the table is closed. + { + let mut data = self.data.write().unwrap(); + data.try_close_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| CloseTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + info!( + "ShardOperator close table sequentially finish, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); Ok(()) } diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 4bc0319c3f..ba70af688a 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -14,6 +14,7 @@ use std::{collections::HashMap, sync::Arc}; +use common_types::table::ShardVersion; use generic_error::BoxError; use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard}; use snafu::{ensure, OptionExt, ResultExt}; @@ -125,8 +126,8 @@ impl Shard { let mut data = self.data.write().unwrap(); data.finish_open(); } - // If open failed, shard status is unchanged(`Opening`), so it can be reschduled - // to open again. + // If open failed, shard status is unchanged(`Opening`), so it can be + // rescheduled to open again. ret } @@ -141,12 +142,12 @@ impl Shard { operator.close(ctx).await } - pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { + pub async fn create_table(&self, ctx: CreateTableContext) -> Result { let operator = self.operator.lock().await; operator.create_table(ctx).await } - pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { + pub async fn drop_table(&self, ctx: DropTableContext) -> Result { let operator = self.operator.lock().await; operator.drop_table(ctx).await } @@ -166,7 +167,6 @@ pub type ShardRef = Arc; #[derive(Debug, Clone)] pub struct UpdatedTableInfo { - pub prev_version: u64, pub shard_info: ShardInfo, pub table_info: TableInfo, } @@ -222,32 +222,50 @@ impl ShardData { } #[inline] - fn update_shard_info(&mut self, new_info: ShardInfo) { - // TODO: refactor to move status out of ShardInfo - self.shard_info.id = new_info.id; - self.shard_info.version = new_info.version; - self.shard_info.role = new_info.role; + fn inc_shard_version(&mut self) { + self.shard_info.version += 1; } - pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + /// Create the table on the shard, whose version will be incremented. + #[inline] + pub fn try_create_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + self.try_insert_table(updated_info, true) + } + + /// Open the table on the shard, whose version won't change. + #[inline] + pub fn try_open_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + self.try_insert_table(updated_info, false)?; + + Ok(()) + } + + /// Try to insert the table into the shard. + /// + /// The shard version may be incremented and the new version will be + /// returned. + fn try_insert_table( + &mut self, + updated_info: UpdatedTableInfo, + inc_version: bool, + ) -> Result { let UpdatedTableInfo { - prev_version: prev_shard_version, - shard_info: curr_shard, + shard_info: curr_shard_info, table_info: new_table, } = updated_info; ensure!( !self.is_frozen(), UpdateFrozenShard { - shard_id: curr_shard.id, + shard_id: curr_shard_info.id, } ); ensure!( - self.shard_info.version == prev_shard_version, + self.shard_info.version == curr_shard_info.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: prev_shard_version, + expect_version: curr_shard_info.version, } ); @@ -259,32 +277,57 @@ impl ShardData { } ); - // Update tables of shard. - self.update_shard_info(curr_shard); + // Insert the new table into the shard. self.tables.push(new_table); + // Update the shard version if necessary. + if inc_version { + self.inc_shard_version(); + } + + Ok(self.shard_info.version) + } + + /// Drop the table from the shard, whose version will be incremented. + #[inline] + pub fn try_drop_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + self.try_remove_table(updated_info, true) + } + + /// Close the table from the shard, whose version won't change. + #[inline] + pub fn try_close_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + self.try_remove_table(updated_info, false)?; + Ok(()) } - pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + /// Try to remove the table from the shard. + /// + /// The shard version may be incremented and the new version will be + /// returned. + fn try_remove_table( + &mut self, + updated_info: UpdatedTableInfo, + inc_version: bool, + ) -> Result { let UpdatedTableInfo { - prev_version: prev_shard_version, - shard_info: curr_shard, + shard_info: curr_shard_info, table_info: new_table, } = updated_info; ensure!( !self.is_frozen(), UpdateFrozenShard { - shard_id: curr_shard.id, + shard_id: curr_shard_info.id, } ); ensure!( - self.shard_info.version == prev_shard_version, + self.shard_info.version == curr_shard_info.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: prev_shard_version, + expect_version: curr_shard_info.version, } ); @@ -296,11 +339,15 @@ impl ShardData { msg: format!("the table to remove is not found, table:{new_table:?}"), })?; - // Update tables of shard. - self.update_shard_info(curr_shard); + // Remove the table from the shard. self.tables.swap_remove(table_idx); - Ok(()) + // Update the shard version if necessary. + if inc_version { + self.inc_shard_version(); + } + + Ok(self.shard_info.version) } } diff --git a/integration_tests/build_meta.sh b/integration_tests/build_meta.sh index 4e920652de..220e2e1a43 100755 --- a/integration_tests/build_meta.sh +++ b/integration_tests/build_meta.sh @@ -5,11 +5,10 @@ set -o nounset set -o pipefail BIN_PATH=${BIN_PATH:-""} -if [[ -z "${BIN_PATH}" ]]; then - echo "Fetch and install ceresmeta-server..." - go install github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@HEAD - BIN_PATH="$(go env GOPATH)/bin/ceresmeta-server" -fi + +echo "Fetch and install ceresmeta-server..." +go install -a github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@main +BIN_PATH="$(go env GOPATH)/bin/ceresmeta-server" TARGET=$(pwd)/ceresmeta mkdir -p ${TARGET} diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index a3f6514653..d9006edbd8 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -210,7 +210,7 @@ pub struct ShardInfo { pub role: ShardRole, pub version: ShardVersion, // This status is only used for request ceresdb send to ceresmeta via heartbeat - // When ceresdb receive this via open shard request, this field is meanless. + // When ceresdb receive this via open shard request, this field is meaningless. // TODO: Use different request and response body between ceresdb and // ceresmeta. pub status: ShardStatus, diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 51e3157b94..5eb273891a 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -38,7 +38,10 @@ use cluster::{ shard_set::UpdatedTableInfo, ClusterRef, }; -use common_types::{schema::SchemaEncoder, table::ShardId}; +use common_types::{ + schema::SchemaEncoder, + table::{ShardId, ShardVersion}, +}; use future_ext::RetryConfig; use generic_error::BoxError; use logger::{error, info, warn}; @@ -77,7 +80,6 @@ macro_rules! extract_updated_table_info { msg: "update shard info is missing", })?; - let prev_version = update_shard_info.prev_version; let shard_info = ShardInfo::from(&curr_shard_info); let table_info = TableInfo::try_from(table_info) .box_err() @@ -87,7 +89,6 @@ macro_rules! extract_updated_table_info { })?; UpdatedTableInfo { - prev_version, shard_info, table_info, } @@ -186,18 +187,68 @@ macro_rules! handle_request { }; } +macro_rules! handle_request_with_version { + ($mod_name: ident, $req_ty: ident, $resp_ty: ident) => { + paste! { + async fn [<$mod_name _internal>] ( + &self, + request: tonic::Request<$req_ty>, + ) -> std::result::Result, tonic::Status> { + let instant = Instant::now(); + let ctx = self.handler_ctx(); + let handle = self.runtime.spawn(async move { + // FIXME: Data race about the operations on the shards should be taken into + // considerations. + + let request = request.into_inner(); + info!("Receive request from meta, req:{:?}", request); + + [](ctx, request).await + }); + + let res = handle + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + }); + + let mut resp = $resp_ty::default(); + match res { + Ok(Ok(latest_shard_version)) => { + resp.header = Some(error::build_ok_header()); + resp.latest_shard_version = latest_shard_version; + } + Ok(Err(e)) | Err(e) => { + error!("Fail to process request from meta, err:{}", e); + resp.header = Some(error::build_err_header(e)); + } + }; + + info!("Finish handling request from meta, resp:{:?}", resp); + + META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .$mod_name + .observe(instant.saturating_elapsed().as_secs_f64()); + Ok(Response::new(resp)) + } + } + }; +} + impl MetaServiceImpl { handle_request!(open_shard, OpenShardRequest, OpenShardResponse); handle_request!(close_shard, CloseShardRequest, CloseShardResponse); - handle_request!( + handle_request_with_version!( create_table_on_shard, CreateTableOnShardRequest, CreateTableOnShardResponse ); - handle_request!( + handle_request_with_version!( drop_table_on_shard, DropTableOnShardRequest, DropTableOnShardResponse @@ -424,7 +475,7 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> async fn handle_create_table_on_shard( ctx: HandlerContext, request: CreateTableOnShardRequest, -) -> Result<()> { +) -> Result { let updated_table_info = extract_updated_table_info!(request); let shard = ctx @@ -475,7 +526,7 @@ async fn handle_create_table_on_shard( async fn handle_drop_table_on_shard( ctx: HandlerContext, request: DropTableOnShardRequest, -) -> Result<()> { +) -> Result { let updated_table_info = extract_updated_table_info!(request); let shard = ctx