diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index b00fa25611..6bc2ec6d39 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -94,7 +94,3 @@ DROP TABLE IF EXISTS `partition_table_t`; affected_rows: 0 -SHOW CREATE TABLE partition_table_t; - -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SHOW CREATE TABLE partition_table_t;. Caused by: Failed to create plan, err:Table not found, table:partition_table_t" }) - diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index 59724d430e..a408cad222 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -42,4 +42,5 @@ SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7 DROP TABLE IF EXISTS `partition_table_t`; -SHOW CREATE TABLE partition_table_t; +-- The route cache will cause the data table to be queried after it is deleted. Refer to #893. +-- SHOW CREATE TABLE partition_table_t; diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index 8bf9bf92f8..c050a62673 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -104,6 +104,12 @@ pub struct TableInfo { pub partition_info: Option, } +impl TableInfo { + pub fn is_partition_table(&self) -> bool { + self.partition_info.is_some() + } +} + impl TryFrom for TableInfo { type Error = Error; diff --git a/proxy/src/forward.rs b/proxy/src/forward.rs index e6a66fbf1d..9603dceee9 100644 --- a/proxy/src/forward.rs +++ b/proxy/src/forward.rs @@ -350,7 +350,8 @@ mod tests { use catalog::consts::DEFAULT_SCHEMA; use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse}; use futures::FutureExt; - use router::{PartitionTableInfo, Router}; + use meta_client::types::TableInfo; + use router::Router; use tonic::IntoRequest; use super::*; @@ -392,11 +393,11 @@ mod tests { } } - async fn fetch_partition_table_info( + async fn fetch_table_info( &self, _schema: &str, _table: &str, - ) -> router::Result> { + ) -> router::Result> { return Ok(None); } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 5ec760e6b7..57c909a7b9 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -240,12 +240,19 @@ impl Proxy { msg: format!("Failed to find table, table_name:{table_name}"), })?; - let partition_table_info_in_meta = self + let table_info_in_meta = self .router - .fetch_partition_table_info(schema_name, table_name) + .fetch_table_info(schema_name, table_name) .await?; - match (table, &partition_table_info_in_meta) { + if let Some(table_info_in_meta) = &table_info_in_meta { + // No need to handle non-partition table. + if !table_info_in_meta.is_partition_table() { + return Ok(()); + } + } + + match (table, &table_info_in_meta) { (Some(table), Some(partition_table_info)) => { // No need to create partition table when table_id match. if table.id().as_u64() == partition_table_info.id { @@ -285,13 +292,13 @@ impl Proxy { (None, Some(_)) => (), } - let partition_table_info = partition_table_info_in_meta.unwrap(); + let partition_table_info = table_info_in_meta.unwrap(); // If table not exists, open it. // Get table_schema from first sub partition table. let first_sub_partition_table_name = util::get_sub_partition_name( &partition_table_info.name, - &partition_table_info.partition_info, + partition_table_info.partition_info.as_ref().unwrap(), 0usize, ); let table = self @@ -324,7 +331,7 @@ impl Proxy { options: table.options, state: TableState::Stable, shard_id: DEFAULT_SHARD_ID, - partition_info: Some(partition_table_info.partition_info), + partition_info: partition_table_info.partition_info, }; let create_opts = CreateOptions { table_engine: self.instance.partition_table_engine.clone(), diff --git a/proxy/src/write.rs b/proxy/src/write.rs index 1aae990e18..ff36f30249 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -64,11 +64,16 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { + let request_id = RequestId::next_id(); + let write_context = req.context.clone().context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, })?; + self.handle_auto_create_table(request_id, &write_context.database, &req) + .await?; + let (write_request_to_local, write_requests_to_forward) = self.split_write_request(req).await?; @@ -86,8 +91,11 @@ impl Proxy { // Write to local. if !write_request_to_local.table_requests.is_empty() { - let local_handle = - async move { Ok(self.write_to_local(ctx, write_request_to_local).await) }; + let local_handle = async move { + Ok(self + .write_to_local(ctx, request_id, write_request_to_local) + .await) + }; futures.push(local_handle.boxed()); } @@ -232,8 +240,12 @@ impl Proxy { } } - async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result { - let request_id = RequestId::next_id(); + async fn write_to_local( + &self, + ctx: Context, + request_id: RequestId, + req: WriteRequest, + ) -> Result { let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let catalog = self.instance.catalog_manager.default_catalog_name(); @@ -304,59 +316,38 @@ impl Proxy { let table_name = &write_table_req.table; self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name) .await?; - let mut table = self.try_get_table(&catalog, &schema, table_name)?; - - match table.clone() { - None => { - if auto_create_table { - self.create_table( - request_id, - &catalog, - &schema, - &write_table_req, - &schema_config, - deadline, - ) - .await?; - // try to get table again - table = self.try_get_table(&catalog, &schema, table_name)?; - } - } - Some(t) => { - if auto_create_table { - // The reasons for making the decision to add columns before writing are as - // follows: - // * If judged based on the error message returned, it may cause data that - // has already been successfully written to be written again and affect - // the accuracy of the data. - // * Currently, the decision to add columns is made at the request level, - // not at the row level, so the cost is relatively small. - let table_schema = t.schema(); - let columns = - find_new_columns(&table_schema, &schema_config, &write_table_req)?; - if !columns.is_empty() { - self.execute_add_columns_plan( - request_id, &catalog, &schema, t, columns, deadline, - ) - .await?; - } - } + let table = self + .try_get_table(&catalog, &schema, table_name)? + .with_context(|| ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Table not found, schema:{schema}, table:{table_name}"), + })?; + + if auto_create_table { + // The reasons for making the decision to add columns before writing are as + // follows: + // * If judged based on the error message returned, it may cause data that has + // already been successfully written to be written again and affect the + // accuracy of the data. + // * Currently, the decision to add columns is made at the request level, not at + // the row level, so the cost is relatively small. + let table_schema = table.schema(); + let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?; + if !columns.is_empty() { + self.execute_add_columns_plan( + request_id, + &catalog, + &schema, + table.clone(), + columns, + deadline, + ) + .await?; } } - match table { - Some(table) => { - let plan = write_table_request_to_insert_plan(table, write_table_req)?; - plan_vec.push(plan); - } - None => { - return ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Table not found, schema:{schema}, table:{table_name}"), - } - .fail(); - } - } + let plan = write_table_request_to_insert_plan(table, write_table_req)?; + plan_vec.push(plan); } Ok(plan_vec) @@ -425,6 +416,50 @@ impl Proxy { }) } + async fn handle_auto_create_table( + &self, + request_id: RequestId, + schema: &str, + req: &WriteRequest, + ) -> Result<()> { + if !self.auto_create_table { + return Ok(()); + } + + let schema_config = self + .schema_config_provider + .schema_config(schema) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Fail to fetch schema config, schema_name:{schema}"), + })? + .cloned() + .unwrap_or_default(); + + // TODO: Consider whether to build tables concurrently when there are too many + // tables. + for write_table_req in &req.table_requests { + let table_info = self + .router + .fetch_table_info(schema, &write_table_req.table) + .await?; + if table_info.is_some() { + continue; + } + self.create_table( + request_id, + self.instance.catalog_manager.default_catalog_name(), + schema, + write_table_req, + &schema_config, + None, + ) + .await?; + } + Ok(()) + } + async fn create_table( &self, request_id: RequestId, diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index 0aa179f54e..e54da642ef 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -7,13 +7,12 @@ use ceresdbproto::storage::{Route, RouteRequest}; use cluster::ClusterRef; use common_util::error::BoxError; use log::trace; -use meta_client::types::{RouteTablesRequest, TableInfo}; +use meta_client::types::RouteTablesRequest; use moka::future::Cache; use snafu::ResultExt; use crate::{ - endpoint::Endpoint, OtherWithCause, ParseEndpoint, PartitionTableInfo, Result, - RouteCacheConfig, Router, + endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router, TableInfo, }; #[derive(Clone, Debug)] @@ -24,22 +23,16 @@ struct RouteData { pub struct ClusterBasedRouter { cluster: ClusterRef, - cache: Option>, + cache: Cache, } impl ClusterBasedRouter { pub fn new(cluster: ClusterRef, cache_config: RouteCacheConfig) -> Self { - let cache = if cache_config.enable { - Some( - Cache::builder() - .time_to_live(cache_config.ttl.0) - .time_to_idle(cache_config.tti.0) - .max_capacity(cache_config.capacity) - .build(), - ) - } else { - None - }; + let cache = Cache::builder() + .time_to_live(cache_config.ttl.0) + .time_to_idle(cache_config.tti.0) + .max_capacity(cache_config.capacity) + .build(); Self { cluster, cache } } @@ -49,16 +42,12 @@ impl ClusterBasedRouter { fn route_from_cache(&self, tables: &[String], routes: &mut Vec) -> Vec { let mut miss = vec![]; - if let Some(cache) = &self.cache { - for table in tables { - if let Some(route) = cache.get(table) { - routes.push(route.clone()); - } else { - miss.push(table.clone()); - } + for table in tables { + if let Some(route) = self.cache.get(table) { + routes.push(route.clone()); + } else { + miss.push(table.clone()); } - } else { - miss = tables.to_vec(); } miss @@ -108,10 +97,8 @@ impl ClusterBasedRouter { }; if let Some(route) = route { - if let Some(cache) = &self.cache { - // There may be data race here, and it is acceptable currently. - cache.insert(table_name.clone(), route.clone()).await; - } + // There may be data race here, and it is acceptable currently. + self.cache.insert(table_name.clone(), route.clone()).await; routes.push(route); } } @@ -145,11 +132,7 @@ impl Router for ClusterBasedRouter { .collect()) } - async fn fetch_partition_table_info( - &self, - schema: &str, - table: &str, - ) -> Result> { + async fn fetch_table_info(&self, schema: &str, table: &str) -> Result> { let mut route_data_vec = self .route_with_cache(&vec![table.to_string()], schema.to_string()) .await?; @@ -159,17 +142,7 @@ impl Router for ClusterBasedRouter { let route_data = route_data_vec.remove(0); let table_info = route_data.table_info; - if table_info.partition_info.is_some() { - return Ok(Some(PartitionTableInfo { - id: table_info.id, - name: table_info.name, - schema_id: table_info.schema_id, - schema_name: table_info.schema_name, - partition_info: table_info.partition_info.unwrap(), - })); - } - - Ok(None) + Ok(Some(table_info)) } } @@ -284,7 +257,6 @@ mod tests { let mock_cluster = MockClusterImpl {}; let config = RouteCacheConfig { - enable: true, ttl: ReadableDuration::from(Duration::from_secs(4)), tti: ReadableDuration::from(Duration::from_secs(2)), capacity: 2, diff --git a/router/src/lib.rs b/router/src/lib.rs index 3a388d2025..2b047c5244 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. pub mod cluster_based; pub mod endpoint; @@ -9,12 +9,11 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use ceresdbproto::storage::{Route, RouteRequest}; pub use cluster_based::ClusterBasedRouter; -use common_types::{schema::SchemaId, table::TableId}; use common_util::{config::ReadableDuration, define_result}; +use meta_client::types::TableInfo; pub use rule_based::{RuleBasedRouter, RuleList}; use serde::{Deserialize, Serialize}; use snafu::{Backtrace, Snafu}; -use table_engine::partition::PartitionInfo; #[derive(Snafu, Debug)] #[snafu(visibility(pub))] @@ -61,29 +60,14 @@ define_result!(Error); pub type RouterRef = Arc; -#[derive(Clone, Debug)] -pub struct PartitionTableInfo { - pub id: TableId, - pub name: String, - pub schema_id: SchemaId, - pub schema_name: String, - pub partition_info: PartitionInfo, -} - #[async_trait] pub trait Router { async fn route(&self, req: RouteRequest) -> Result>; - async fn fetch_partition_table_info( - &self, - schema: &str, - table: &str, - ) -> Result>; + async fn fetch_table_info(&self, schema: &str, table: &str) -> Result>; } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct RouteCacheConfig { - // enable route cache, default false - enable: bool, /// Time to live (TTL) in second. ttl: ReadableDuration, /// Time to idle (TTI) in second. @@ -95,7 +79,6 @@ pub struct RouteCacheConfig { impl Default for RouteCacheConfig { fn default() -> Self { Self { - enable: false, ttl: ReadableDuration::from(Duration::from_secs(5)), tti: ReadableDuration::from(Duration::from_secs(5)), capacity: 10_000, diff --git a/router/src/rule_based.rs b/router/src/rule_based.rs index 7ad772bdad..86afd44711 100644 --- a/router/src/rule_based.rs +++ b/router/src/rule_based.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! A router based on rules. @@ -12,9 +12,7 @@ use meta_client::types::ShardId; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; -use crate::{ - endpoint::Endpoint, hash, PartitionTableInfo, Result, RouteNotFound, Router, ShardNotFound, -}; +use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound, TableInfo}; pub type ShardNodes = HashMap; @@ -173,11 +171,7 @@ impl Router for RuleBasedRouter { Ok(Vec::new()) } - async fn fetch_partition_table_info( - &self, - _schema: &str, - _table: &str, - ) -> Result> { + async fn fetch_table_info(&self, _schema: &str, _table: &str) -> Result> { return Ok(None); } }