From 5c51ce5fc901d0fd89d1333a2a104c9e92f62be7 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 30 Oct 2023 11:06:33 +0800 Subject: [PATCH 1/7] add shard version in create/drop table response --- Cargo.lock | 44 ++++++++++++++--------- Cargo.toml | 2 +- cluster/src/shard_set.rs | 11 +++--- server/src/grpc/meta_event_service/mod.rs | 2 -- 4 files changed, 33 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98f6bd9a40..063dc36ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.22", "codec", "common_types", "datafusion", @@ -1302,7 +1302,7 @@ checksum = "8ef195bacb1ca0eb02d6a0562b09852941d01de2b962c7066c922115fab7dcb7" dependencies = [ "arrow 38.0.0", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.21", "dashmap 5.4.0", "futures 0.3.28", "paste 1.0.12", @@ -1340,6 +1340,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "ceresdbproto" +version = "1.0.22" +source = "git+https://github.com/ZuLiangWang/ceresdbproto.git?rev=f7abb77b37cb826f6689e883d2f29ed54475e472#f7abb77b37cb826f6689e883d2f29ed54475e472" +dependencies = [ + "prost", + "protoc-bin-vendored", + "tonic 0.8.3", + "tonic-build", + "walkdir", +] + [[package]] name = "cexpr" version = "0.6.0" @@ -1514,7 +1526,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "common_types", "etcd-client", "future_ext", @@ -1592,7 +1604,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.22", "chrono", "datafusion", "hash_ext", @@ -2347,7 +2359,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "common_types", "datafusion", "datafusion-proto", @@ -3899,7 +3911,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.22", "common_types", "futures 0.3.28", "generic_error", @@ -4424,7 +4436,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto", + "ceresdbproto 1.0.22", "chrono", "clru", "crc", @@ -5301,7 +5313,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "clru", "cluster", "common_types", @@ -5428,7 +5440,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "cluster", "codec", "common_types", @@ -5739,7 +5751,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.22", "common_types", "futures 0.3.28", "generic_error", @@ -5868,7 +5880,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.22", "cluster", "common_types", "generic_error", @@ -6243,7 +6255,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "clru", "cluster", "common_types", @@ -6769,7 +6781,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.22", "codec", "common_types", "futures 0.3.28", @@ -6791,7 +6803,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.22", "common_types", "datafusion", "datafusion-proto", @@ -6994,7 +7006,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto", + "ceresdbproto 1.0.22", "chrono", "common_types", "macros", @@ -7646,7 +7658,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.22", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index 50280b9146..b563d00379 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.21" +ceresdbproto = { git = "https://github.com/ZuLiangWang/ceresdbproto.git", rev = "f7abb77b37cb826f6689e883d2f29ed54475e472" } codec = { path = "components/codec" } notifier = { path = "components/notifier" } chrono = "0.4" diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 4bc0319c3f..09a7f546d4 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -166,7 +166,6 @@ pub type ShardRef = Arc; #[derive(Debug, Clone)] pub struct UpdatedTableInfo { - pub prev_version: u64, pub shard_info: ShardInfo, pub table_info: TableInfo, } @@ -231,7 +230,6 @@ impl ShardData { pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { let UpdatedTableInfo { - prev_version: prev_shard_version, shard_info: curr_shard, table_info: new_table, } = updated_info; @@ -244,10 +242,10 @@ impl ShardData { ); ensure!( - self.shard_info.version == prev_shard_version, + self.shard_info.version == curr_shard.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: prev_shard_version, + expect_version: curr_shard.version, } ); @@ -268,7 +266,6 @@ impl ShardData { pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { let UpdatedTableInfo { - prev_version: prev_shard_version, shard_info: curr_shard, table_info: new_table, } = updated_info; @@ -281,10 +278,10 @@ impl ShardData { ); ensure!( - self.shard_info.version == prev_shard_version, + self.shard_info.version == curr_shard.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: prev_shard_version, + expect_version: curr_shard.version, } ); diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 51e3157b94..f968af6497 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -77,7 +77,6 @@ macro_rules! extract_updated_table_info { msg: "update shard info is missing", })?; - let prev_version = update_shard_info.prev_version; let shard_info = ShardInfo::from(&curr_shard_info); let table_info = TableInfo::try_from(table_info) .box_err() @@ -87,7 +86,6 @@ macro_rules! extract_updated_table_info { })?; UpdatedTableInfo { - prev_version, shard_info, table_info, } From 90180b65b8163b01c50deebe9e708e2c9003e676 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 30 Oct 2023 15:02:39 +0800 Subject: [PATCH 2/7] refactor shard version --- catalog_impls/src/volatile.rs | 8 --- cluster/src/shard_operator.rs | 47 +++++++++-------- cluster/src/shard_set.rs | 15 +++--- server/src/grpc/meta_event_service/mod.rs | 63 +++++++++++++++++++++-- 4 files changed, 89 insertions(+), 44 deletions(-) diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index bea44c2668..faabbe122d 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -333,14 +333,6 @@ impl Schema for SchemaImpl { request: request.clone(), msg: "shard not found".to_string(), })?; - - // TODO: seems unnecessary? - let _ = shard - .find_table(&request.params.schema_name, &request.params.table_name) - .with_context(|| schema::CreateTable { - request: request.clone(), - msg: "table not found in shard".to_string(), - })?; } let request = request.into_engine_create_request(None, self.schema_id); diff --git a/cluster/src/shard_operator.rs b/cluster/src/shard_operator.rs index 691070bb9f..fa885094fe 100644 --- a/cluster/src/shard_operator.rs +++ b/cluster/src/shard_operator.rs @@ -21,6 +21,7 @@ use catalog::{ }, table_operator::TableOperator, }; +use common_types::table::ShardVersion; use generic_error::BoxError; use logger::info; use snafu::ResultExt; @@ -219,7 +220,7 @@ impl ShardOperator { Ok(()) } - pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { + pub async fn create_table(&self, ctx: CreateTableContext) -> Result { let shard_info = ctx.updated_table_info.shard_info.clone(); let table_info = ctx.updated_table_info.table_info.clone(); @@ -227,16 +228,6 @@ impl ShardOperator { "ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", ); - // FIXME: maybe should insert table from cluster after having created table. - { - let mut data = self.data.write().unwrap(); - data.try_insert_table(ctx.updated_table_info) - .box_err() - .with_context(|| CreateTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - // Create the table by operator afterwards. let (table_engine, partition_info) = match table_info.partition_info.clone() { Some(v) => (ctx.partition_table_engine.clone(), Some(v)), @@ -278,10 +269,19 @@ impl ShardOperator { "ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", ); - Ok(()) + // FIXME: maybe should insert table from cluster after having created table. + let mut data = self.data.write().unwrap(); + let latest_version = data + .try_insert_table(ctx.updated_table_info) + .box_err() + .with_context(|| CreateTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + Ok(latest_version) } - pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { + pub async fn drop_table(&self, ctx: DropTableContext) -> Result { let shard_info = ctx.updated_table_info.shard_info.clone(); let table_info = ctx.updated_table_info.table_info.clone(); @@ -289,16 +289,6 @@ impl ShardOperator { "ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", ); - // FIXME: maybe should insert table from cluster after having dropped table. - { - let mut data = self.data.write().unwrap(); - data.try_remove_table(ctx.updated_table_info) - .box_err() - .with_context(|| DropTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - // Drop the table by operator afterwards. let drop_table_request = DropTableRequest { catalog_name: ctx.catalog, @@ -322,7 +312,16 @@ impl ShardOperator { "ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", ); - Ok(()) + // FIXME: maybe should insert table from cluster after having dropped table. + let mut data = self.data.write().unwrap(); + let latest_version = data + .try_remove_table(ctx.updated_table_info) + .box_err() + .with_context(|| DropTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + + Ok(latest_version) } pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> { diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 09a7f546d4..22fce9476c 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -14,6 +14,7 @@ use std::{collections::HashMap, sync::Arc}; +use common_types::table::ShardVersion; use generic_error::BoxError; use meta_client::types::{ShardId, ShardInfo, ShardStatus, TableInfo, TablesOfShard}; use snafu::{ensure, OptionExt, ResultExt}; @@ -141,12 +142,12 @@ impl Shard { operator.close(ctx).await } - pub async fn create_table(&self, ctx: CreateTableContext) -> Result<()> { + pub async fn create_table(&self, ctx: CreateTableContext) -> Result { let operator = self.operator.lock().await; operator.create_table(ctx).await } - pub async fn drop_table(&self, ctx: DropTableContext) -> Result<()> { + pub async fn drop_table(&self, ctx: DropTableContext) -> Result { let operator = self.operator.lock().await; operator.drop_table(ctx).await } @@ -224,11 +225,11 @@ impl ShardData { fn update_shard_info(&mut self, new_info: ShardInfo) { // TODO: refactor to move status out of ShardInfo self.shard_info.id = new_info.id; - self.shard_info.version = new_info.version; + self.shard_info.version = new_info.version + 1; self.shard_info.role = new_info.role; } - pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result { let UpdatedTableInfo { shard_info: curr_shard, table_info: new_table, @@ -261,10 +262,10 @@ impl ShardData { self.update_shard_info(curr_shard); self.tables.push(new_table); - Ok(()) + Ok(self.shard_info.version.clone()) } - pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result { let UpdatedTableInfo { shard_info: curr_shard, table_info: new_table, @@ -297,7 +298,7 @@ impl ShardData { self.update_shard_info(curr_shard); self.tables.swap_remove(table_idx); - Ok(()) + Ok(self.shard_info.version.clone()) } } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index f968af6497..a90bfd0898 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -38,7 +38,10 @@ use cluster::{ shard_set::UpdatedTableInfo, ClusterRef, }; -use common_types::{schema::SchemaEncoder, table::ShardId}; +use common_types::{ + schema::SchemaEncoder, + table::{ShardId, ShardVersion}, +}; use future_ext::RetryConfig; use generic_error::BoxError; use logger::{error, info, warn}; @@ -184,18 +187,68 @@ macro_rules! handle_request { }; } +macro_rules! handle_request_with_version { + ($mod_name: ident, $req_ty: ident, $resp_ty: ident) => { + paste! { + async fn [<$mod_name _internal>] ( + &self, + request: tonic::Request<$req_ty>, + ) -> std::result::Result, tonic::Status> { + let instant = Instant::now(); + let ctx = self.handler_ctx(); + let handle = self.runtime.spawn(async move { + // FIXME: Data race about the operations on the shards should be taken into + // considerations. + + let request = request.into_inner(); + info!("Receive request from meta, req:{:?}", request); + + [](ctx, request).await + }); + + let res = handle + .await + .box_err() + .context(ErrWithCause { + code: StatusCode::Internal, + msg: "fail to join task", + }); + + let mut resp = $resp_ty::default(); + match res { + Ok(Ok(cur_version)) => { + resp.header = Some(error::build_ok_header()); + resp.cur_version = cur_version; + } + Ok(Err(e)) | Err(e) => { + error!("Fail to process request from meta, err:{}", e); + resp.header = Some(error::build_err_header(e)); + } + }; + + info!("Finish handling request from meta, resp:{:?}", resp); + + META_EVENT_GRPC_HANDLER_DURATION_HISTOGRAM_VEC + .$mod_name + .observe(instant.saturating_elapsed().as_secs_f64()); + Ok(Response::new(resp)) + } + } + }; +} + impl MetaServiceImpl { handle_request!(open_shard, OpenShardRequest, OpenShardResponse); handle_request!(close_shard, CloseShardRequest, CloseShardResponse); - handle_request!( + handle_request_with_version!( create_table_on_shard, CreateTableOnShardRequest, CreateTableOnShardResponse ); - handle_request!( + handle_request_with_version!( drop_table_on_shard, DropTableOnShardRequest, DropTableOnShardResponse @@ -422,7 +475,7 @@ async fn handle_close_shard(ctx: HandlerContext, request: CloseShardRequest) -> async fn handle_create_table_on_shard( ctx: HandlerContext, request: CreateTableOnShardRequest, -) -> Result<()> { +) -> Result { let updated_table_info = extract_updated_table_info!(request); let shard = ctx @@ -473,7 +526,7 @@ async fn handle_create_table_on_shard( async fn handle_drop_table_on_shard( ctx: HandlerContext, request: DropTableOnShardRequest, -) -> Result<()> { +) -> Result { let updated_table_info = extract_updated_table_info!(request); let shard = ctx From 3d2ec5db9ebeeb3e35d4dd8c44d8da3478a50706 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Mon, 30 Oct 2023 15:40:04 +0800 Subject: [PATCH 3/7] refactor shard version --- catalog_impls/src/volatile.rs | 14 +++++++------- cluster/src/shard_set.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index faabbe122d..928375c983 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -326,13 +326,13 @@ impl Schema for SchemaImpl { // Do real create table. // Partition table is not stored in ShardTableManager. if request.params.partition_info.is_none() { - let shard = - self.shard_set - .get(request.shard_id) - .with_context(|| schema::CreateTable { - request: request.clone(), - msg: "shard not found".to_string(), - })?; + let _ = self + .shard_set + .get(request.shard_id) + .with_context(|| schema::CreateTable { + request: request.clone(), + msg: "shard not found".to_string(), + })?; } let request = request.into_engine_create_request(None, self.schema_id); diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 22fce9476c..70716474e6 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -262,7 +262,7 @@ impl ShardData { self.update_shard_info(curr_shard); self.tables.push(new_table); - Ok(self.shard_info.version.clone()) + Ok(self.shard_info.clone().version) } pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result { @@ -298,7 +298,7 @@ impl ShardData { self.update_shard_info(curr_shard); self.tables.swap_remove(table_idx); - Ok(self.shard_info.version.clone()) + Ok(self.shard_info.clone().version) } } From 01a10d0d263a8b5a1b92f0c9cabb157a90beac8b Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Mon, 6 Nov 2023 15:59:09 +0800 Subject: [PATCH 4/7] refactor: different logic between create/drop and open/close table of shard --- Cargo.lock | 2 +- Cargo.toml | 2 +- cluster/src/shard_operator.rs | 139 ++++++++++++++-------- cluster/src/shard_set.rs | 95 +++++++++++---- meta_client/src/types.rs | 2 +- server/src/grpc/meta_event_service/mod.rs | 4 +- 6 files changed, 164 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 063dc36ba8..110de76d27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,7 +1343,7 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "1.0.22" -source = "git+https://github.com/ZuLiangWang/ceresdbproto.git?rev=f7abb77b37cb826f6689e883d2f29ed54475e472#f7abb77b37cb826f6689e883d2f29ed54475e472" +source = "git+https://github.com/ZuLiangWang/ceresdbproto.git?rev=b7e1fc49a3defe280b3cc4282f060a46df56ed91#b7e1fc49a3defe280b3cc4282f060a46df56ed91" dependencies = [ "prost", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index b563d00379..57c5211686 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,7 +93,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = { git = "https://github.com/ZuLiangWang/ceresdbproto.git", rev = "f7abb77b37cb826f6689e883d2f29ed54475e472" } +ceresdbproto = { git = "https://github.com/ZuLiangWang/ceresdbproto.git", rev = "b7e1fc49a3defe280b3cc4282f060a46df56ed91" } codec = { path = "components/codec" } notifier = { path = "components/notifier" } chrono = "0.4" diff --git a/cluster/src/shard_operator.rs b/cluster/src/shard_operator.rs index fa885094fe..0078080241 100644 --- a/cluster/src/shard_operator.rs +++ b/cluster/src/shard_operator.rs @@ -221,11 +221,13 @@ impl ShardOperator { } pub async fn create_table(&self, ctx: CreateTableContext) -> Result { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator create table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator create table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); // Create the table by operator afterwards. @@ -266,27 +268,35 @@ impl ShardOperator { })?; info!( - "ShardOperator create table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is created by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name, ); - // FIXME: maybe should insert table from cluster after having created table. - let mut data = self.data.write().unwrap(); - let latest_version = data - .try_insert_table(ctx.updated_table_info) - .box_err() - .with_context(|| CreateTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; + let latest_version = { + let mut data = self.data.write().unwrap(); + data.try_create_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| CreateTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })? + }; + + info!( + "ShardOperator create table sequentially finish, shard_id:{}, shard_version:{}, table:{}", + shard_info.id, shard_info.version, table_info.name, + ); Ok(latest_version) } pub async fn drop_table(&self, ctx: DropTableContext) -> Result { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator drop table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator drop table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); // Drop the table by operator afterwards. @@ -309,40 +319,41 @@ impl ShardOperator { })?; info!( - "ShardOperator drop table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is dropped, shard_id:{}, table:{}", + shard_info.id, table_info.name, ); - // FIXME: maybe should insert table from cluster after having dropped table. - let mut data = self.data.write().unwrap(); - let latest_version = data - .try_remove_table(ctx.updated_table_info) - .box_err() - .with_context(|| DropTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; + // Update the shard info after the table is dropped. + let latest_version = { + let mut data = self.data.write().unwrap(); + data.try_drop_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| DropTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })? + }; + + info!( + "ShardOperator drop table sequentially finish, latest_version:{latest_version}, shard_id:{}, old_shard_version:{}, table:{}", + shard_info.id, + shard_info.version, + table_info.name, + ); Ok(latest_version) } pub async fn open_table(&self, ctx: OpenTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; info!( - "ShardOperator open table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator open table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", + shard_info.id, + table_info.name, ); - // FIXME: maybe should insert table from cluster after having opened table. - { - let mut data = self.data.write().unwrap(); - data.try_insert_table(ctx.updated_table_info) - .box_err() - .with_context(|| OpenTableWithCause { - msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), - })?; - } - - // Open the table by operator afterwards. + // Open the table by operator. let open_table_request = OpenTableRequest { catalog_name: ctx.catalog, schema_name: table_info.schema_name.clone(), @@ -365,28 +376,34 @@ impl ShardOperator { })?; info!( - "ShardOperator open table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}", + "ShardOperator table is opened by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name ); - Ok(()) - } - - pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { - let shard_info = ctx.updated_table_info.shard_info.clone(); - let table_info = ctx.updated_table_info.table_info.clone(); - - info!("ShardOperator close table sequentially begin, shard_info:{shard_info:?}, table_info:{table_info:?}"); - - // FIXME: maybe should remove table from cluster after having closed table. + // Update the shard info after the table is opened. { let mut data = self.data.write().unwrap(); - data.try_remove_table(ctx.updated_table_info) + data.try_open_table(ctx.updated_table_info.clone()) .box_err() - .with_context(|| CloseTableWithCause { + .with_context(|| OpenTableWithCause { msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), })?; } + info!( + "ShardOperator open table sequentially finish, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); + + Ok(()) + } + + pub async fn close_table(&self, ctx: CloseTableContext) -> Result<()> { + let shard_info = &ctx.updated_table_info.shard_info; + let table_info = &ctx.updated_table_info.table_info; + + info!("ShardOperator close table sequentially begin, shard_id:{}, table:{}, shard_info:{shard_info:?}, table_info:{table_info:?}", shard_info.id, table_info.name); + // Close the table by catalog manager afterwards. let close_table_request = CloseTableRequest { catalog_name: ctx.catalog, @@ -408,7 +425,25 @@ impl ShardOperator { msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), })?; - info!("ShardOperator close table sequentially finish, shard_info:{shard_info:?}, table_info:{table_info:?}"); + info!( + "ShardOperator table is closed by operator, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); + + // Update the shard info after the table is closed. + { + let mut data = self.data.write().unwrap(); + data.try_close_table(ctx.updated_table_info.clone()) + .box_err() + .with_context(|| CloseTableWithCause { + msg: format!("shard_info:{shard_info:?}, table_info:{table_info:?}"), + })?; + } + + info!( + "ShardOperator close table sequentially finish, shard_id:{}, table:{}", + shard_info.id, table_info.name + ); Ok(()) } diff --git a/cluster/src/shard_set.rs b/cluster/src/shard_set.rs index 70716474e6..ba70af688a 100644 --- a/cluster/src/shard_set.rs +++ b/cluster/src/shard_set.rs @@ -126,8 +126,8 @@ impl Shard { let mut data = self.data.write().unwrap(); data.finish_open(); } - // If open failed, shard status is unchanged(`Opening`), so it can be reschduled - // to open again. + // If open failed, shard status is unchanged(`Opening`), so it can be + // rescheduled to open again. ret } @@ -222,31 +222,50 @@ impl ShardData { } #[inline] - fn update_shard_info(&mut self, new_info: ShardInfo) { - // TODO: refactor to move status out of ShardInfo - self.shard_info.id = new_info.id; - self.shard_info.version = new_info.version + 1; - self.shard_info.role = new_info.role; + fn inc_shard_version(&mut self) { + self.shard_info.version += 1; } - pub fn try_insert_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + /// Create the table on the shard, whose version will be incremented. + #[inline] + pub fn try_create_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + self.try_insert_table(updated_info, true) + } + + /// Open the table on the shard, whose version won't change. + #[inline] + pub fn try_open_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + self.try_insert_table(updated_info, false)?; + + Ok(()) + } + + /// Try to insert the table into the shard. + /// + /// The shard version may be incremented and the new version will be + /// returned. + fn try_insert_table( + &mut self, + updated_info: UpdatedTableInfo, + inc_version: bool, + ) -> Result { let UpdatedTableInfo { - shard_info: curr_shard, + shard_info: curr_shard_info, table_info: new_table, } = updated_info; ensure!( !self.is_frozen(), UpdateFrozenShard { - shard_id: curr_shard.id, + shard_id: curr_shard_info.id, } ); ensure!( - self.shard_info.version == curr_shard.version, + self.shard_info.version == curr_shard_info.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: curr_shard.version, + expect_version: curr_shard_info.version, } ); @@ -258,31 +277,57 @@ impl ShardData { } ); - // Update tables of shard. - self.update_shard_info(curr_shard); + // Insert the new table into the shard. self.tables.push(new_table); - Ok(self.shard_info.clone().version) + // Update the shard version if necessary. + if inc_version { + self.inc_shard_version(); + } + + Ok(self.shard_info.version) } - pub fn try_remove_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + /// Drop the table from the shard, whose version will be incremented. + #[inline] + pub fn try_drop_table(&mut self, updated_info: UpdatedTableInfo) -> Result { + self.try_remove_table(updated_info, true) + } + + /// Close the table from the shard, whose version won't change. + #[inline] + pub fn try_close_table(&mut self, updated_info: UpdatedTableInfo) -> Result<()> { + self.try_remove_table(updated_info, false)?; + + Ok(()) + } + + /// Try to remove the table from the shard. + /// + /// The shard version may be incremented and the new version will be + /// returned. + fn try_remove_table( + &mut self, + updated_info: UpdatedTableInfo, + inc_version: bool, + ) -> Result { let UpdatedTableInfo { - shard_info: curr_shard, + shard_info: curr_shard_info, table_info: new_table, } = updated_info; ensure!( !self.is_frozen(), UpdateFrozenShard { - shard_id: curr_shard.id, + shard_id: curr_shard_info.id, } ); ensure!( - self.shard_info.version == curr_shard.version, + self.shard_info.version == curr_shard_info.version, ShardVersionMismatch { shard_info: self.shard_info.clone(), - expect_version: curr_shard.version, + expect_version: curr_shard_info.version, } ); @@ -294,11 +339,15 @@ impl ShardData { msg: format!("the table to remove is not found, table:{new_table:?}"), })?; - // Update tables of shard. - self.update_shard_info(curr_shard); + // Remove the table from the shard. self.tables.swap_remove(table_idx); - Ok(self.shard_info.clone().version) + // Update the shard version if necessary. + if inc_version { + self.inc_shard_version(); + } + + Ok(self.shard_info.version) } } diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index a3f6514653..d9006edbd8 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -210,7 +210,7 @@ pub struct ShardInfo { pub role: ShardRole, pub version: ShardVersion, // This status is only used for request ceresdb send to ceresmeta via heartbeat - // When ceresdb receive this via open shard request, this field is meanless. + // When ceresdb receive this via open shard request, this field is meaningless. // TODO: Use different request and response body between ceresdb and // ceresmeta. pub status: ShardStatus, diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index a90bfd0898..a5cc98cdc9 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -216,9 +216,9 @@ macro_rules! handle_request_with_version { let mut resp = $resp_ty::default(); match res { - Ok(Ok(cur_version)) => { + Ok(Ok(latest_version)) => { resp.header = Some(error::build_ok_header()); - resp.cur_version = cur_version; + resp.latest_version = latest_version; } Ok(Err(e)) | Err(e) => { error!("Fail to process request from meta, err:{}", e); From 7df20d7b58a3b50d726e1afa81315365a4cb9268 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Wed, 8 Nov 2023 16:59:21 +0800 Subject: [PATCH 5/7] fix: upgrade the ceresdbproto crate --- Cargo.lock | 48 +++++++++-------------- Cargo.toml | 12 +++--- server/src/grpc/meta_event_service/mod.rs | 4 +- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 110de76d27..1b0528dc9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto 1.0.22", + "ceresdbproto", "codec", "common_types", "datafusion", @@ -1302,7 +1302,7 @@ checksum = "8ef195bacb1ca0eb02d6a0562b09852941d01de2b962c7066c922115fab7dcb7" dependencies = [ "arrow 38.0.0", "async-trait", - "ceresdbproto 1.0.21", + "ceresdbproto", "dashmap 5.4.0", "futures 0.3.28", "paste 1.0.12", @@ -1327,23 +1327,11 @@ dependencies = [ "uuid", ] -[[package]] -name = "ceresdbproto" -version = "1.0.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ce5c24f3f76b35c35fcacdafcd3072a8363c0b098c0704996b9b58c833c314a" -dependencies = [ - "prost", - "protoc-bin-vendored", - "tonic 0.8.3", - "tonic-build", - "walkdir", -] - [[package]] name = "ceresdbproto" version = "1.0.22" -source = "git+https://github.com/ZuLiangWang/ceresdbproto.git?rev=b7e1fc49a3defe280b3cc4282f060a46df56ed91#b7e1fc49a3defe280b3cc4282f060a46df56ed91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aabe958e8ea5fd3da1a2449b7d49c8f8dcaa87195a1498bc7a0215794d3f236e" dependencies = [ "prost", "protoc-bin-vendored", @@ -1526,7 +1514,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "common_types", "etcd-client", "future_ext", @@ -1604,7 +1592,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto 1.0.22", + "ceresdbproto", "chrono", "datafusion", "hash_ext", @@ -2359,7 +2347,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "common_types", "datafusion", "datafusion-proto", @@ -3911,7 +3899,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.22", + "ceresdbproto", "common_types", "futures 0.3.28", "generic_error", @@ -4436,7 +4424,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto 1.0.22", + "ceresdbproto", "chrono", "clru", "crc", @@ -5313,7 +5301,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "clru", "cluster", "common_types", @@ -5440,7 +5428,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "cluster", "codec", "common_types", @@ -5751,7 +5739,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto 1.0.22", + "ceresdbproto", "common_types", "futures 0.3.28", "generic_error", @@ -5880,7 +5868,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto 1.0.22", + "ceresdbproto", "cluster", "common_types", "generic_error", @@ -6255,7 +6243,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "clru", "cluster", "common_types", @@ -6781,7 +6769,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto 1.0.22", + "ceresdbproto", "codec", "common_types", "futures 0.3.28", @@ -6803,7 +6791,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto 1.0.22", + "ceresdbproto", "common_types", "datafusion", "datafusion-proto", @@ -7006,7 +6994,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto 1.0.22", + "ceresdbproto", "chrono", "common_types", "macros", @@ -7658,7 +7646,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto 1.0.22", + "ceresdbproto", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index 57c5211686..dab8eed2d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,9 +93,8 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = { git = "https://github.com/ZuLiangWang/ceresdbproto.git", rev = "b7e1fc49a3defe280b3cc4282f060a46df56ed91" } +ceresdbproto = "1.0.22" codec = { path = "components/codec" } -notifier = { path = "components/notifier" } chrono = "0.4" clap = "3.0" clru = "0.6.1" @@ -114,10 +113,6 @@ generic_error = { path = "components/generic_error" } hash_ext = { path = "components/hash_ext" } hex = "0.4.3" hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev = "425487ce910f26636fbde8c4d640b538431aad50" } -lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } -lazy_static = "1.4.0" -logger = { path = "components/logger" } -lru = "0.7.6" id_allocator = { path = "components/id_allocator" } influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query_influxql" } influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "influxdb_influxql_parser" } @@ -125,10 +120,15 @@ influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3 influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" } interpreters = { path = "interpreters" } itertools = "0.10.5" +lz4_flex = { version = "0.11", default-features = false, features = ["frame"] } +lazy_static = "1.4.0" +logger = { path = "components/logger" } +lru = "0.7.6" macros = { path = "components/macros" } message_queue = { path = "components/message_queue" } meta_client = { path = "meta_client" } metric_ext = { path = "components/metric_ext" } +notifier = { path = "components/notifier" } object_store = { path = "components/object_store" } panic_ext = { path = "components/panic_ext" } partitioned_lock = { path = "components/partitioned_lock" } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index a5cc98cdc9..5eb273891a 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -216,9 +216,9 @@ macro_rules! handle_request_with_version { let mut resp = $resp_ty::default(); match res { - Ok(Ok(latest_version)) => { + Ok(Ok(latest_shard_version)) => { resp.header = Some(error::build_ok_header()); - resp.latest_version = latest_version; + resp.latest_shard_version = latest_shard_version; } Ok(Err(e)) | Err(e) => { error!("Fail to process request from meta, err:{}", e); From c0eb5bd75436ee69294602b4e49f7e10f9f46edb Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Thu, 9 Nov 2023 11:41:16 +0800 Subject: [PATCH 6/7] fix build_meta script --- integration_tests/build_meta.sh | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/integration_tests/build_meta.sh b/integration_tests/build_meta.sh index 4e920652de..6cfb25842f 100755 --- a/integration_tests/build_meta.sh +++ b/integration_tests/build_meta.sh @@ -5,11 +5,10 @@ set -o nounset set -o pipefail BIN_PATH=${BIN_PATH:-""} -if [[ -z "${BIN_PATH}" ]]; then - echo "Fetch and install ceresmeta-server..." - go install github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@HEAD - BIN_PATH="$(go env GOPATH)/bin/ceresmeta-server" -fi + +echo "Fetch and install ceresmeta-server..." +go install -a github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@HEAD +BIN_PATH="$(go env GOPATH)/bin/ceresmeta-server" TARGET=$(pwd)/ceresmeta mkdir -p ${TARGET} From 325ed04cf60635554a4d0e00a2651e64c4f7af23 Mon Sep 17 00:00:00 2001 From: CooooolFrog Date: Thu, 9 Nov 2023 14:17:51 +0800 Subject: [PATCH 7/7] fix build_meta script --- integration_tests/build_meta.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/build_meta.sh b/integration_tests/build_meta.sh index 6cfb25842f..220e2e1a43 100755 --- a/integration_tests/build_meta.sh +++ b/integration_tests/build_meta.sh @@ -7,7 +7,7 @@ set -o pipefail BIN_PATH=${BIN_PATH:-""} echo "Fetch and install ceresmeta-server..." -go install -a github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@HEAD +go install -a github.com/CeresDB/ceresmeta/cmd/ceresmeta-server@main BIN_PATH="$(go env GOPATH)/bin/ceresmeta-server" TARGET=$(pwd)/ceresmeta