diff --git a/Cargo.lock b/Cargo.lock index d9d36aaf5f..11c461bef9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4767,6 +4767,22 @@ dependencies = [ "regex", ] +[[package]] +name = "partition_table_engine" +version = "1.1.0" +dependencies = [ + "async-stream", + "async-trait", + "common_types", + "common_util", + "futures 0.3.28", + "lazy_static", + "prometheus 0.12.0", + "remote_engine_client", + "snafu 0.6.10", + "table_engine", +] + [[package]] name = "paste" version = "0.1.18" @@ -5987,6 +6003,7 @@ dependencies = [ "moka", "serde", "snafu 0.6.10", + "table_engine", "tokio", "twox-hash", ] @@ -6382,6 +6399,7 @@ dependencies = [ "logger", "meta_client", "opensrv-mysql", + "partition_table_engine", "paste 1.0.12", "pprof 0.11.1", "profile", @@ -6390,6 +6408,7 @@ dependencies = [ "prometheus-static-metric", "prost", "query_engine", + "remote_engine_client", "router", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index fafe7ed45c..ee04bab348 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "integration_tests/sdk/rust", "interpreters", "meta_client", + "partition_table_engine", "query_engine", "remote_engine_client", "router", @@ -91,6 +92,7 @@ interpreters = { path = "interpreters" } itertools = "0.10.5" meta_client = { path = "meta_client" } object_store = { path = "components/object_store" } +partition_table_engine = { path = "partition_table_engine" } parquet_ext = { path = "components/parquet_ext" } parquet = { version = "36.0.0" } paste = "1.0" diff --git a/analytic_engine/src/engine.rs b/analytic_engine/src/engine.rs index cc4c0e0e47..1038a05c99 100644 --- a/analytic_engine/src/engine.rs +++ b/analytic_engine/src/engine.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Implements the TableEngine trait @@ -7,21 +7,17 @@ use std::sync::Arc; use async_trait::async_trait; use common_util::error::BoxError; use log::info; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use table_engine::{ engine::{ Close, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result, - TableEngine, Unexpected, UnexpectedNoCause, + TableEngine, }, table::{SchemaId, TableRef}, ANALYTIC_ENGINE_TYPE, }; -use crate::{ - instance::InstanceRef, - space::SpaceId, - table::{partition::PartitionTableImpl, TableImpl}, -}; +use crate::{instance::InstanceRef, space::SpaceId, table::TableImpl}; /// TableEngine implementation pub struct TableEngineImpl { @@ -76,30 +72,14 @@ impl TableEngine for TableEngineImpl { let space_table = self.instance.create_table(space_id, request).await?; - let table_impl: TableRef = match &space_table.table_data().partition_info { - None => Arc::new(TableImpl::new( - self.instance.clone(), - ANALYTIC_ENGINE_TYPE.to_string(), - space_id, - space_table.table_data().id, - space_table.table_data().clone(), - space_table, - )), - Some(_v) => Arc::new( - PartitionTableImpl::new( - self.instance - .remote_engine - .clone() - .context(UnexpectedNoCause { - msg: "remote engine not found", - })?, - ANALYTIC_ENGINE_TYPE.to_string(), - space_table, - ) - .box_err() - .context(Unexpected)?, - ), - }; + let table_impl: TableRef = Arc::new(TableImpl::new( + self.instance.clone(), + ANALYTIC_ENGINE_TYPE.to_string(), + space_id, + space_table.table_data().id, + space_table.table_data().clone(), + space_table, + )); Ok(table_impl) } @@ -128,30 +108,14 @@ impl TableEngine for TableEngineImpl { None => return Ok(None), }; - let table_impl: TableRef = match &space_table.table_data().partition_info { - None => Arc::new(TableImpl::new( - self.instance.clone(), - ANALYTIC_ENGINE_TYPE.to_string(), - space_id, - space_table.table_data().id, - space_table.table_data().clone(), - space_table, - )), - Some(_v) => Arc::new( - PartitionTableImpl::new( - self.instance - .remote_engine - .clone() - .context(UnexpectedNoCause { - msg: "remote engine is empty", - })?, - ANALYTIC_ENGINE_TYPE.to_string(), - space_table, - ) - .box_err() - .context(Unexpected)?, - ), - }; + let table_impl = Arc::new(TableImpl::new( + self.instance.clone(), + ANALYTIC_ENGINE_TYPE.to_string(), + space_id, + space_table.table_data().id, + space_table.table_data().clone(), + space_table, + )); Ok(Some(table_impl)) } diff --git a/analytic_engine/src/instance/create.rs b/analytic_engine/src/instance/create.rs index 8cb0ef9524..a0c299b872 100644 --- a/analytic_engine/src/instance/create.rs +++ b/analytic_engine/src/instance/create.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Create table logic of instance @@ -113,7 +113,6 @@ impl Instance { table_name: table_data.name.clone(), schema: table_data.schema(), opts: table_data.table_options().as_ref().clone(), - partition_info: table_data.partition_info.clone(), }); MetaUpdateRequest { shard_info: table_data.shard_info, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 179d59b073..9e17bc0fb7 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! A table engine instance //! @@ -27,7 +27,7 @@ use common_util::{define_result, runtime::Runtime}; use log::info; use mem_collector::MemUsageCollector; use snafu::{ResultExt, Snafu}; -use table_engine::{engine::EngineRuntimes, remote::RemoteEngineRef}; +use table_engine::engine::EngineRuntimes; use wal::manager::{WalLocation, WalManagerRef}; use crate::{ @@ -177,7 +177,6 @@ pub struct Instance { /// Options for scanning sst pub(crate) scan_options: ScanOptions, pub(crate) iter_options: Option, - pub(crate) remote_engine: Option, } impl Instance { diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 941c04eeb2..1e226f06c1 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -10,7 +10,7 @@ use std::{ use common_types::schema::IndexInWriterSchema; use log::{debug, error, info, trace, warn}; use snafu::ResultExt; -use table_engine::{engine::OpenTableRequest, remote::RemoteEngineRef}; +use table_engine::engine::OpenTableRequest; use tokio::sync::oneshot; use wal::{ log_batch::LogEntry, @@ -54,7 +54,6 @@ impl Instance { wal_manager: WalManagerRef, store_picker: ObjectStorePickerRef, sst_factory: SstFactoryRef, - remote_engine_ref: Option, ) -> Result> { let space_store = Arc::new(SpaceStore { spaces: RwLock::new(Spaces::default()), @@ -111,7 +110,6 @@ impl Instance { .map(|v| v.as_byte() as usize), iter_options, scan_options, - remote_engine: remote_engine_ref, }); Ok(instance) diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 89f6e20e7c..2a6308c330 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -689,17 +689,13 @@ where mod tests { use std::{path::PathBuf, sync::Arc, vec}; - use bytes::Bytes; use common_types::{ column_schema, datum::DatumKind, schema, schema::Schema, table::DEFAULT_SHARD_ID, }; use common_util::{runtime, runtime::Runtime, tests::init_log_for_test}; use futures::future::BoxFuture; use object_store::LocalFileSystem; - use table_engine::{ - partition::{HashPartitionInfo, PartitionDefinition, PartitionInfo}, - table::{SchemaId, TableId, TableSeqGenerator}, - }; + use table_engine::table::{SchemaId, TableId, TableSeqGenerator}; use wal::rocks_impl::manager::Builder as WalBuilder; use super::*; @@ -831,23 +827,6 @@ mod tests { table_name, schema: common_types::tests::build_schema(), opts: TableOptions::default(), - partition_info: None, - }) - } - - fn meta_update_add_table_with_partition_info( - &self, - table_id: TableId, - partition_info: Option, - ) -> MetaUpdate { - let table_name = Self::table_name_from_id(table_id); - MetaUpdate::AddTable(AddTableMeta { - space_id: self.schema_id.as_u32(), - table_id, - table_name, - schema: common_types::tests::build_schema(), - opts: TableOptions::default(), - partition_info, }) } @@ -897,7 +876,6 @@ mod tests { async fn add_table_with_manifest( &self, table_id: TableId, - partition_info: Option, manifest_data_builder: &mut TableManifestDataBuilder, manifest: &ManifestImpl, ) { @@ -905,8 +883,7 @@ mod tests { shard_id: DEFAULT_SHARD_ID, }; - let add_table = - self.meta_update_add_table_with_partition_info(table_id, partition_info); + let add_table = self.meta_update_add_table(table_id); let update_req = { MetaUpdateRequest { shard_info, @@ -967,7 +944,7 @@ mod tests { manifest_data_builder: &mut TableManifestDataBuilder, ) { let manifest = self.open_manifest().await; - self.add_table_with_manifest(table_id, None, manifest_data_builder, &manifest) + self.add_table_with_manifest(table_id, manifest_data_builder, &manifest) .await; } @@ -1128,26 +1105,11 @@ mod tests { runtime.block_on(async move { let table_id = ctx.alloc_table_id(); - let default_version = 0; - let partition_info = Some(PartitionInfo::Hash(HashPartitionInfo { - version: default_version, - definitions: vec![PartitionDefinition { - name: "p0".to_string(), - origin_name: Some("region0".to_string()), - }], - expr: Bytes::from("test"), - linear: false, - })); let location = WalLocation::new(DEFAULT_SHARD_ID as u64, table_id.as_u64()); let mut manifest_data_builder = TableManifestDataBuilder::default(); let manifest = ctx.open_manifest().await; - ctx.add_table_with_manifest( - table_id, - partition_info, - &mut manifest_data_builder, - &manifest, - ) - .await; + ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest) + .await; manifest .maybe_do_snapshot(ctx.schema_id.as_u32(), table_id, location, true) @@ -1188,7 +1150,7 @@ mod tests { }; let mut manifest_data_builder = TableManifestDataBuilder::default(); let manifest = ctx.open_manifest().await; - ctx.add_table_with_manifest(table_id, None, &mut manifest_data_builder, &manifest) + ctx.add_table_with_manifest(table_id, &mut manifest_data_builder, &manifest) .await; for i in 0..500 { diff --git a/analytic_engine/src/manifest/meta_update.rs b/analytic_engine/src/manifest/meta_update.rs index adcf0f7a5f..3068e5c7bf 100644 --- a/analytic_engine/src/manifest/meta_update.rs +++ b/analytic_engine/src/manifest/meta_update.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Update to meta @@ -13,7 +13,7 @@ use common_types::{ use common_util::define_result; use prost::Message; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use table_engine::{partition::PartitionInfo, table::TableId}; +use table_engine::table::TableId; use wal::log_batch::{Payload, PayloadDecoder}; use crate::{ @@ -42,11 +42,6 @@ pub enum Error { #[snafu(display("Failed to convert schema, err:{}", source))] ConvertSchema { source: common_types::schema::Error }, - #[snafu(display("Failed to convert partition info, err:{}", source))] - ConvertPartitionInfo { - source: table_engine::partition::Error, - }, - #[snafu(display("Empty table schema.\nBacktrace:\n{}", backtrace))] EmptyTableSchema { backtrace: Backtrace }, @@ -155,19 +150,18 @@ pub struct AddTableMeta { pub schema: Schema, // Options needed to persist pub opts: TableOptions, - pub partition_info: Option, } impl From for manifest_pb::AddTableMeta { fn from(v: AddTableMeta) -> Self { - let partition_info = v.partition_info.map(|v| v.into()); manifest_pb::AddTableMeta { space_id: v.space_id, table_id: v.table_id.as_u64(), table_name: v.table_name, schema: Some(schema_pb::TableSchema::from(&v.schema)), options: Some(manifest_pb::TableOptions::from(v.opts)), - partition_info, + // Deprecated. + partition_info: None, } } } @@ -178,12 +172,6 @@ impl TryFrom for AddTableMeta { fn try_from(src: manifest_pb::AddTableMeta) -> Result { let table_schema = src.schema.context(EmptyTableSchema)?; let opts = src.options.context(EmptyTableOptions)?; - let partition_info = match src.partition_info { - Some(partition_info) => { - Some(PartitionInfo::try_from(partition_info).context(ConvertPartitionInfo)?) - } - None => None, - }; Ok(Self { space_id: src.space_id, @@ -191,7 +179,6 @@ impl TryFrom for AddTableMeta { table_name: src.table_name, schema: Schema::try_from(table_schema).context(ConvertSchema)?, opts: TableOptions::try_from(opts).context(ConvertTableOptions)?, - partition_info, }) } } diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index a7c32a66e1..d74ef1c40a 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Setup the analytic engine @@ -16,13 +16,8 @@ use object_store::{ prefix::StoreWithPrefix, LocalFileSystem, ObjectStoreRef, }; -use remote_engine_client::RemoteEngineImpl; -use router::RouterRef; use snafu::{Backtrace, ResultExt, Snafu}; -use table_engine::{ - engine::{EngineRuntimes, TableEngineRef}, - remote::RemoteEngineRef, -}; +use table_engine::engine::{EngineRuntimes, TableEngineRef}; use table_kv::{memory::MemoryImpl, obkv::ObkvImpl, TableKv}; use wal::{ manager::{self, WalManagerRef}, @@ -110,7 +105,6 @@ const DISK_CACHE_DIR_NAME: &str = "sst_cache"; #[derive(Clone)] pub struct EngineBuilder<'a> { pub config: &'a Config, - pub router: Option, pub engine_runtimes: Arc, pub opened_wals: OpenedWals, } @@ -132,7 +126,6 @@ impl<'a> EngineBuilder<'a> { self.opened_wals.data_wal, Arc::new(manifest), Arc::new(opened_storages), - self.router, ) .await?; Ok(Arc::new(TableEngineImpl::new(instance))) @@ -362,17 +355,7 @@ async fn open_instance( wal_manager: WalManagerRef, manifest: ManifestRef, store_picker: ObjectStorePickerRef, - router: Option, ) -> Result { - let remote_engine_ref: Option = if let Some(v) = router { - Some(Arc::new(RemoteEngineImpl::new( - config.remote_engine_client.clone(), - v, - ))) - } else { - None - }; - let meta_cache: Option = config .sst_meta_cache_cap .map(|cap| Arc::new(MetaCache::new(cap))); @@ -389,7 +372,6 @@ async fn open_instance( wal_manager, store_picker, Arc::new(FactoryImpl::default()), - remote_engine_ref, ) .await .context(OpenInstance)?; diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index cd1599be53..29c80f52d6 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -27,7 +27,7 @@ use common_util::define_result; use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use table_engine::{engine::CreateTableRequest, partition::PartitionInfo, table::TableId}; +use table_engine::{engine::CreateTableRequest, table::TableId}; use crate::{ instance::write_worker::{choose_worker, WorkerLocal, WriteHandle}, @@ -146,9 +146,6 @@ pub struct TableData { /// Shard id pub shard_info: TableShardInfo, - - /// Partition info - pub partition_info: Option, } impl fmt::Debug for TableData { @@ -164,7 +161,6 @@ impl fmt::Debug for TableData { .field("last_file_id", &self.last_file_id) .field("dropped", &self.dropped.load(Ordering::Relaxed)) .field("shard_info", &self.shard_info) - .field("partition_info", &self.partition_info) .finish() } } @@ -219,7 +215,6 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(request.shard_id), - partition_info: request.partition_info, }) } @@ -256,7 +251,6 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - partition_info: add_meta.partition_info, }) } diff --git a/analytic_engine/src/table/metrics.rs b/analytic_engine/src/table/metrics.rs index 585708bea2..95c16c7e7a 100644 --- a/analytic_engine/src/table/metrics.rs +++ b/analytic_engine/src/table/metrics.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Metrics of table. @@ -101,23 +101,6 @@ lazy_static! { exponential_buckets(0.01, 2.0, 13).unwrap() ).unwrap(); - // Buckets: 0, 0.01, .., 0.01 * 2^12 - pub static ref PARTITION_TABLE_WRITE_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!( - "partition_table_write_duration", - "Histogram for write duration of the partition table in seconds", - &["type"], - exponential_buckets(0.01, 2.0, 13).unwrap() - ) - .unwrap(); - - // Buckets: 0, 0.01, .., 0.01 * 2^12 - pub static ref PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!( - "partition_table_partitioned_read_duration", - "Histogram for partitioned read duration of the partition table in seconds", - &["type"], - exponential_buckets(0.01, 2.0, 13).unwrap() - ) - .unwrap(); // End of histograms. } diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index d00637644d..77da11679f 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Table implementation @@ -32,7 +32,6 @@ use crate::{ pub mod data; pub mod metrics; -pub mod partition; pub mod sst_util; pub mod version; pub mod version_edit; @@ -103,7 +102,7 @@ impl Table for TableImpl { } fn partition_info(&self) -> Option { - self.table_data.partition_info.clone() + None } fn engine_type(&self) -> &str { diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index cce946ad89..e788f75dc0 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -130,7 +130,6 @@ impl TestContext { let engine_builder = EngineBuilder { config: &self.config, - router: None, engine_runtimes: self.runtimes.clone(), opened_wals: opened_wals.clone(), }; diff --git a/catalog/src/schema.rs b/catalog/src/schema.rs index f29ec84fc5..5885a35a1c 100644 --- a/catalog/src/schema.rs +++ b/catalog/src/schema.rs @@ -191,6 +191,9 @@ pub struct CreateTableRequest { pub schema_name: String, /// Table name pub table_name: String, + /// Table id + // TODO: remove this field + pub table_id: Option, /// Table schema pub table_schema: common_types::schema::Schema, /// Table engine type @@ -208,9 +211,11 @@ pub struct CreateTableRequest { impl CreateTableRequest { pub fn into_engine_create_request( self, - table_id: TableId, + table_id: Option, schema_id: SchemaId, ) -> engine::CreateTableRequest { + let table_id = self.table_id.unwrap_or(table_id.unwrap_or(TableId::MIN)); + engine::CreateTableRequest { catalog_name: self.catalog_name, schema_name: self.schema_name, diff --git a/catalog_impls/src/table_based.rs b/catalog_impls/src/table_based.rs index 09f09ef2a1..6254a2df02 100644 --- a/catalog_impls/src/table_based.rs +++ b/catalog_impls/src/table_based.rs @@ -739,7 +739,7 @@ impl Schema for SchemaImpl { // Create table let table_id = self.alloc_table_id(&request.table_name).await?; - let request = request.into_engine_create_request(table_id, self.schema_id); + let request = request.into_engine_create_request(Some(table_id), self.schema_id); let table_name = request.table_name.clone(); let table = opts .table_engine @@ -905,6 +905,7 @@ mod tests { catalog_name: DEFAULT_CATALOG.to_string(), schema_name: schema.name().to_string(), table_name: table_name.to_string(), + table_id: None, table_schema: common_types::tests::build_schema(), engine: ANALYTIC_ENGINE_TYPE.to_string(), options: HashMap::new(), diff --git a/catalog_impls/src/volatile.rs b/catalog_impls/src/volatile.rs index 3e41ec9cd4..dcf0ecd173 100644 --- a/catalog_impls/src/volatile.rs +++ b/catalog_impls/src/volatile.rs @@ -301,20 +301,22 @@ impl Schema for SchemaImpl { } // Do real create table. - let table_with_shards = self - .shard_tables_cache - .find_table_by_name( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) - .with_context(|| schema::CreateTable { - request: request.clone(), - msg: "table with shards is not found in the ShardTableManager", - })?; - - let request = request - .into_engine_create_request(table_with_shards.table_info.id.into(), self.schema_id); + // Partition table is not stored in ShardTableManager. + if request.partition_info.is_none() { + let _ = self + .shard_tables_cache + .find_table_by_name( + &request.catalog_name, + &request.schema_name, + &request.table_name, + ) + .with_context(|| schema::CreateTable { + request: request.clone(), + msg: format!("table with shards is not found in the ShardTableManager, catalog_name:{}, schema_name:{}, table_name:{}", + request.catalog_name, request.schema_name, request.table_name), + })?; + } + let request = request.into_engine_create_request(None, self.schema_id); // Table engine is able to handle duplicate table creation. let table = opts diff --git a/cluster/src/cluster_impl.rs b/cluster/src/cluster_impl.rs index a2d43ebb1e..9008aceb7f 100644 --- a/cluster/src/cluster_impl.rs +++ b/cluster/src/cluster_impl.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ sync::{Arc, Mutex, RwLock}, @@ -33,7 +33,7 @@ use tokio::{ use crate::{ config::ClusterConfig, shard_tables_cache::ShardTablesCache, topology::ClusterTopology, - Cluster, ClusterNodesNotFound, ClusterNodesResp, MetaClientFailure, OpenShard, + Cluster, ClusterNodesNotFound, ClusterNodesResp, Internal, MetaClientFailure, OpenShard, OpenShardWithCause, Result, ShardNotFound, TableNotFound, }; @@ -287,7 +287,11 @@ impl Inner { self.shard_tables_cache.try_insert_table_to_shard( update_shard_info.prev_version, ShardInfo::from(curr_shard_info), - TableInfo::from(table_info), + TableInfo::try_from(table_info) + .box_err() + .context(Internal { + msg: "Failed to parse tableInfo", + })?, ) } @@ -309,7 +313,11 @@ impl Inner { self.shard_tables_cache.try_remove_table_from_shard( update_shard_info.prev_version, ShardInfo::from(curr_shard_info), - TableInfo::from(table_info), + TableInfo::try_from(table_info) + .box_err() + .context(Internal { + msg: "Failed to parse tableInfo", + })?, ) } } diff --git a/cluster/src/lib.rs b/cluster/src/lib.rs index 004aadf178..5aa2ed09ff 100644 --- a/cluster/src/lib.rs +++ b/cluster/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Cluster sub-crate includes serval functionalities for supporting CeresDB //! server to running in the distribute mode. Including: @@ -34,6 +34,9 @@ pub mod topology; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { + #[snafu(display("{msg}, err:{source}"))] + Internal { msg: String, source: GenericError }, + #[snafu(display("Build meta client failed, err:{}.", source))] BuildMetaClient { source: meta_client::Error }, diff --git a/docs/example-cluster-0.toml b/docs/example-cluster-0.toml index d767846d31..da3fbe7631 100644 --- a/docs/example-cluster-0.toml +++ b/docs/example-cluster-0.toml @@ -1,12 +1,14 @@ [node] addr = "ceresdb0" +[logger] +level = "info" + [server] bind_addr = "0.0.0.0" http_port = 5440 grpc_port = 8831 mysql_port = 3307 -log_level = "info" deploy_mode = "Cluster" [server.forward] diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml index c0b0f0f123..6a04569cbd 100644 --- a/docs/example-cluster-1.toml +++ b/docs/example-cluster-1.toml @@ -1,12 +1,14 @@ [node] addr = "ceresdb1" +[logger] +level = "info" + [server] bind_addr = "0.0.0.0" http_port = 5441 grpc_port = 8832 mysql_port = 13307 -log_level = "info" deploy_mode = "Cluster" [server.forward] diff --git a/integration_tests/cases/env/cluster/05_ddl/partition_table.result b/integration_tests/cases/env/cluster/05_ddl/partition_table.result index 5580ee1321..17ae305ebf 100644 --- a/integration_tests/cases/env/cluster/05_ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/05_ddl/partition_table.result @@ -19,7 +19,7 @@ affected_rows: 0 SHOW CREATE TABLE partition_table_t; Table,Create Table, -String("partition_table_t"),String("CREATE TABLE `partition_table_t` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) PARTITION BY KEY(name) PARTITIONS 4 ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='false', num_rows_per_row_group='8192', segment_duration='', storage_format='AUTO', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')"), +String("partition_table_t"),String("CREATE TABLE `partition_table_t` (`tsid` uint64 NOT NULL, `t` timestamp NOT NULL, `name` string TAG, `id` int TAG, `value` double NOT NULL, PRIMARY KEY(tsid,t), TIMESTAMP KEY(t)) PARTITION BY KEY(name) PARTITIONS 4 ENGINE=Analytic WITH(enable_ttl='false')"), SHOW CREATE TABLE __partition_table_t_0; diff --git a/integration_tests/cases/env/local/ddl/create_tables.result b/integration_tests/cases/env/local/ddl/create_tables.result index 78d5115ee7..98439cbfc2 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.result +++ b/integration_tests/cases/env/local/ddl/create_tables.result @@ -48,7 +48,7 @@ affected_rows: 0 CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 8752ae4782..5e85b88442 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ collections::HashMap, @@ -17,12 +17,8 @@ use ceresdb_client::{ RpcContext, }; use reqwest::{ClientBuilder, Url}; -use sql::{ - ast::{Statement, TableName}, - parser::Parser, -}; +use sql::{frontend, parser::Parser}; use sqlness::{Database, QueryContext}; -use sqlparser::ast::{SetExpr, Statement as SqlStatement, TableFactor}; const SERVER_GRPC_ENDPOINT_ENV: &str = "CERESDB_SERVER_GRPC_ENDPOINT"; const SERVER_HTTP_ENDPOINT_ENV: &str = "CERESDB_SERVER_HTTP_ENDPOINT"; @@ -275,8 +271,8 @@ impl CeresDB { database: Some("public".to_string()), timeout: None, }; - - let table_name = Self::parse_table_name(&query); + let statements = Parser::parse_sql(&query).unwrap(); + let table_name = frontend::parse_table_name(&statements); let query_req = match table_name { Some(table_name) => Request { @@ -302,58 +298,4 @@ impl CeresDB { Err(e) => format!("Failed to execute query, err: {e:?}"), }) } - - fn parse_table_name(query: &str) -> Option { - let statements = Parser::parse_sql(query).unwrap(); - - match &statements[0] { - Statement::Standard(s) => match *s.clone() { - SqlStatement::Insert { table_name, .. } => { - Some(TableName::from(table_name).to_string()) - } - SqlStatement::Explain { statement, .. } => { - if let SqlStatement::Query(q) = *statement { - match *q.body { - SetExpr::Select(select) => { - if select.from.len() != 1 { - None - } else if let TableFactor::Table { name, .. } = - &select.from[0].relation - { - Some(TableName::from(name.clone()).to_string()) - } else { - None - } - } - _ => None, - } - } else { - None - } - } - SqlStatement::Query(q) => match *q.body { - SetExpr::Select(select) => { - if select.from.len() != 1 { - None - } else if let TableFactor::Table { name, .. } = &select.from[0].relation { - Some(TableName::from(name.clone()).to_string()) - } else { - None - } - } - _ => None, - }, - _ => None, - }, - Statement::Create(s) => Some(s.table_name.to_string()), - Statement::Drop(s) => Some(s.table_name.to_string()), - Statement::Describe(s) => Some(s.table_name.to_string()), - Statement::AlterModifySetting(s) => Some(s.table_name.to_string()), - Statement::AlterAddColumn(s) => Some(s.table_name.to_string()), - Statement::ShowCreate(s) => Some(s.table_name.to_string()), - Statement::ShowTables(_s) => None, - Statement::ShowDatabases => None, - Statement::Exists(s) => Some(s.table_name.to_string()), - } - } } diff --git a/interpreters/src/table_manipulator/catalog_based.rs b/interpreters/src/table_manipulator/catalog_based.rs index ac76db5c3a..2bdc7eae6d 100644 --- a/interpreters/src/table_manipulator/catalog_based.rs +++ b/interpreters/src/table_manipulator/catalog_based.rs @@ -56,6 +56,7 @@ impl TableManipulator for TableManipulatorImpl { catalog_name: default_catalog.to_string(), schema_name: default_schema.to_string(), table_name: table.clone(), + table_id: None, table_schema, engine, options, diff --git a/meta_client/src/lib.rs b/meta_client/src/lib.rs index e20ad4c735..13bb5f9c85 100644 --- a/meta_client/src/lib.rs +++ b/meta_client/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::sync::Arc; @@ -18,6 +18,9 @@ pub mod types; #[derive(Debug, Snafu)] #[snafu(visibility = "pub")] pub enum Error { + #[snafu(display("{msg}, err:{source}"))] + Convert { msg: String, source: GenericError }, + #[snafu(display("Missing shard info, msg:{}.\nBacktrace:\n{}", msg, backtrace))] MissingShardInfo { msg: String, backtrace: Backtrace }, diff --git a/meta_client/src/meta_impl.rs b/meta_client/src/meta_impl.rs index dec07ee510..ebc9b7bd0d 100644 --- a/meta_client/src/meta_impl.rs +++ b/meta_client/src/meta_impl.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::sync::Arc; @@ -153,7 +153,7 @@ impl MetaClient for MetaClientImpl { info!("Meta client finish dropping table, resp:{:?}", pb_resp); check_response_header(&pb_resp.header)?; - Ok(DropTableResponse::from(pb_resp)) + DropTableResponse::try_from(pb_resp) } async fn get_tables_of_shards( diff --git a/meta_client/src/types.rs b/meta_client/src/types.rs index 7b60691102..c5dfc95ed4 100644 --- a/meta_client/src/types.rs +++ b/meta_client/src/types.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{collections::HashMap, sync::Arc}; @@ -8,12 +8,12 @@ use common_types::{ schema::{SchemaId, SchemaName}, table::{TableId, TableName}, }; -use common_util::config::ReadableDuration; +use common_util::{config::ReadableDuration, error::BoxError}; use serde::Deserialize; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use table_engine::partition::PartitionInfo; -use crate::{Error, MissingShardInfo, MissingTableInfo, Result}; +use crate::{Convert, Error, MissingShardInfo, MissingTableInfo, Result}; pub type ClusterNodesRef = Arc>; #[derive(Debug, Clone)] @@ -87,16 +87,29 @@ pub struct TableInfo { pub name: String, pub schema_id: SchemaId, pub schema_name: String, + pub partition_info: Option, } -impl From for TableInfo { - fn from(pb_table_info: meta_service_pb::TableInfo) -> Self { - TableInfo { +impl TryFrom for TableInfo { + type Error = Error; + + fn try_from(pb_table_info: meta_service_pb::TableInfo) -> Result { + let partition_info = pb_table_info + .partition_info + .map(|v| { + PartitionInfo::try_from(v).box_err().context(Convert { + msg: "Failed to parse partition", + }) + }) + .transpose()?; + + Ok(TableInfo { id: pb_table_info.id, name: pb_table_info.name, schema_id: pb_table_info.schema_id, schema_name: pb_table_info.schema_name, - } + partition_info, + }) } } @@ -274,8 +287,8 @@ impl TryFrom for TablesOfShard { tables: pb_tables_of_shard .tables .into_iter() - .map(Into::into) - .collect(), + .map(TryInto::::try_into) + .collect::>>()?, }) } } @@ -340,7 +353,7 @@ impl TryFrom for CreateTableResponse { })?; Ok(Self { - created_table: TableInfo::from(pb_table_info), + created_table: TableInfo::try_from(pb_table_info)?, shard_info: ShardInfo::from(pb_shard_info), }) } @@ -363,11 +376,13 @@ impl From for meta_service_pb::DropTableRequest { } } -impl From for DropTableResponse { - fn from(pb_resp: meta_service_pb::DropTableResponse) -> Self { - Self { - dropped_table: pb_resp.dropped_table.map(TableInfo::from), - } +impl TryFrom for DropTableResponse { + type Error = Error; + + fn try_from(pb_resp: meta_service_pb::DropTableResponse) -> Result { + Ok(Self { + dropped_table: pb_resp.dropped_table.map(TableInfo::try_from).transpose()?, + }) } } @@ -385,7 +400,7 @@ pub struct NodeShard { #[derive(Debug, Clone)] pub struct RouteEntry { - pub table: TableInfo, + pub table_info: TableInfo, pub node_shards: Vec, } @@ -441,7 +456,7 @@ impl TryFrom for RouteEntry { msg: "table info is missing in route entry", })?; Ok(RouteEntry { - table: TableInfo::from(table_info), + table_info: TableInfo::try_from(table_info)?, node_shards, }) } diff --git a/partition_table_engine/Cargo.toml b/partition_table_engine/Cargo.toml new file mode 100644 index 0000000000..302857d9e6 --- /dev/null +++ b/partition_table_engine/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "partition_table_engine" + +[package.version] +workspace = true + +[package.authors] +workspace = true + +[package.edition] +workspace = true + +[dependencies] +async-stream = { workspace = true } +async-trait = { workspace = true } +common_types = { workspace = true } +common_util = { workspace = true } +futures = { workspace = true } +lazy_static = { workspace = true } +prometheus = { workspace = true } +remote_engine_client = { workspace = true } +snafu = { workspace = true } +table_engine = { workspace = true } diff --git a/partition_table_engine/src/error.rs b/partition_table_engine/src/error.rs new file mode 100644 index 0000000000..1b18e79e46 --- /dev/null +++ b/partition_table_engine/src/error.rs @@ -0,0 +1,12 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use common_util::{define_result, error::GenericError}; +use snafu::Snafu; +define_result!(Error); + +#[derive(Snafu, Debug)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Internal error, message:{}, err:{}", msg, source))] + Internal { msg: String, source: GenericError }, +} diff --git a/partition_table_engine/src/lib.rs b/partition_table_engine/src/lib.rs new file mode 100644 index 0000000000..db5b89644d --- /dev/null +++ b/partition_table_engine/src/lib.rs @@ -0,0 +1,78 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +//! Partition table engine implementations + +mod error; +mod metrics; +mod partition; + +use std::sync::Arc; + +use async_trait::async_trait; +use common_util::error::BoxError; +use snafu::{OptionExt, ResultExt}; +use table_engine::{ + engine::{ + CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, Result, + TableEngine, Unexpected, UnexpectedNoCause, + }, + remote::RemoteEngineRef, + table::TableRef, + PARTITION_TABLE_ENGINE_TYPE, +}; + +use crate::partition::{PartitionTableImpl, TableData}; + +/// Partition table engine implementation. +pub struct PartitionTableEngine { + remote_engine_ref: RemoteEngineRef, +} + +impl PartitionTableEngine { + pub fn new(remote_engine_ref: RemoteEngineRef) -> Self { + Self { remote_engine_ref } + } +} + +#[async_trait] +impl TableEngine for PartitionTableEngine { + fn engine_type(&self) -> &str { + PARTITION_TABLE_ENGINE_TYPE + } + + async fn close(&self) -> Result<()> { + Ok(()) + } + + async fn create_table(&self, request: CreateTableRequest) -> Result { + let table_data = TableData { + catalog_name: request.catalog_name, + schema_name: request.schema_name, + table_name: request.table_name, + table_id: request.table_id, + table_schema: request.table_schema, + partition_info: request.partition_info.context(UnexpectedNoCause { + msg: "partition info not found", + })?, + options: request.options, + engine_type: request.engine, + }; + Ok(Arc::new( + PartitionTableImpl::new(table_data, self.remote_engine_ref.clone()) + .box_err() + .context(Unexpected)?, + )) + } + + async fn drop_table(&self, _request: DropTableRequest) -> Result { + Ok(true) + } + + async fn open_table(&self, _request: OpenTableRequest) -> Result> { + Ok(None) + } + + async fn close_table(&self, _request: CloseTableRequest) -> Result<()> { + Ok(()) + } +} diff --git a/partition_table_engine/src/metrics.rs b/partition_table_engine/src/metrics.rs new file mode 100644 index 0000000000..d19978bdf3 --- /dev/null +++ b/partition_table_engine/src/metrics.rs @@ -0,0 +1,24 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use lazy_static::lazy_static; +use prometheus::{exponential_buckets, register_histogram_vec, HistogramVec}; + +lazy_static! { + // Buckets: 0, 0.01, .., 0.01 * 2^12 + pub static ref PARTITION_TABLE_WRITE_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!( + "partition_table_write_duration", + "Histogram for write duration of the partition table in seconds", + &["type"], + exponential_buckets(0.01, 2.0, 13).unwrap() + ) + .unwrap(); + + // Buckets: 0, 0.01, .., 0.01 * 2^12 + pub static ref PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM: HistogramVec = register_histogram_vec!( + "partition_table_partitioned_read_duration", + "Histogram for partitioned read duration of the partition table in seconds", + &["type"], + exponential_buckets(0.01, 2.0, 13).unwrap() + ) + .unwrap(); +} diff --git a/analytic_engine/src/table/partition.rs b/partition_table_engine/src/partition.rs similarity index 79% rename from analytic_engine/src/table/partition.rs rename to partition_table_engine/src/partition.rs index c68dfff2e0..bb06ab9533 100644 --- a/analytic_engine/src/table/partition.rs +++ b/partition_table_engine/src/partition.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Distributed Table implementation @@ -11,7 +11,7 @@ use common_types::{ }; use common_util::error::BoxError; use futures::future::try_join_all; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use table_engine::{ partition::{ format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo, @@ -30,58 +30,44 @@ use table_engine::{ }, }; -use crate::{ - space::SpaceAndTable, - table::metrics::{ - PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM, - PARTITION_TABLE_WRITE_DURATION_HISTOGRAM, - }, +use crate::metrics::{ + PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM, PARTITION_TABLE_WRITE_DURATION_HISTOGRAM, }; +#[derive(Debug)] +pub struct TableData { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, + pub table_schema: Schema, + pub partition_info: PartitionInfo, + pub options: HashMap, + pub engine_type: String, +} + /// Table trait implementation pub struct PartitionTableImpl { - /// Space table - space_table: SpaceAndTable, - /// Remote engine + table_data: TableData, remote_engine: RemoteEngineRef, - /// Engine type - engine_type: String, } impl PartitionTableImpl { - pub fn new( - remote_engine: RemoteEngineRef, - engine_type: String, - space_table: SpaceAndTable, - ) -> Result { - ensure!( - space_table.table_data().partition_info.is_some(), - UnexpectedWithMsg { - msg: "partition table partition info can't be empty" - } - ); + pub fn new(table_data: TableData, remote_engine: RemoteEngineRef) -> Result { Ok(Self { - space_table, + table_data, remote_engine, - engine_type, }) } fn get_sub_table_ident(&self, id: usize) -> TableIdentifier { - let partition_name = self - .space_table - .table_data() - .partition_info - .as_ref() - .map(|v| v.get_definitions()[id].name.clone()) - .unwrap(); + let partition_name = self.table_data.partition_info.get_definitions()[id] + .name + .clone(); TableIdentifier { - catalog: self.space_table.space().context.catalog_name.clone(), - schema: self.space_table.space().context.schema_name.clone(), - table: format_sub_partition_table_name( - &self.space_table.table_data().name, - &partition_name, - ), + catalog: self.table_data.catalog_name.clone(), + schema: self.table_data.schema_name.clone(), + table: format_sub_partition_table_name(&self.table_data.table_name, &partition_name), } } } @@ -89,8 +75,7 @@ impl PartitionTableImpl { impl fmt::Debug for PartitionTableImpl { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PartitionTableImpl") - .field("space_id", &self.space_table.space().id) - .field("table_id", &self.space_table.table_data().id) + .field("table_data", &self.table_data) .finish() } } @@ -98,31 +83,32 @@ impl fmt::Debug for PartitionTableImpl { #[async_trait] impl Table for PartitionTableImpl { fn name(&self) -> &str { - &self.space_table.table_data().name + &self.table_data.table_name } fn id(&self) -> TableId { - self.space_table.table_data().id + self.table_data.table_id } fn schema(&self) -> Schema { - self.space_table.table_data().schema() + self.table_data.table_schema.clone() } + // TODO: get options from sub partition table with remote engine fn options(&self) -> HashMap { - self.space_table.table_data().table_options().to_raw_map() + self.table_data.options.clone() } fn partition_info(&self) -> Option { - self.space_table.table_data().partition_info.clone() + Some(self.table_data.partition_info.clone()) } fn engine_type(&self) -> &str { - &self.engine_type + &self.table_data.engine_type } fn stats(&self) -> TableStats { - self.space_table.table_data().metrics.table_stats() + TableStats::default() } async fn write(&self, request: WriteRequest) -> Result { @@ -137,7 +123,7 @@ impl Table for PartitionTableImpl { } .fail()?, Some(partition_info) => { - DfPartitionRuleAdapter::new(partition_info, &self.space_table.table_data().schema()) + DfPartitionRuleAdapter::new(partition_info, &self.table_data.table_schema) .box_err() .context(CreatePartitionRule)? } @@ -222,7 +208,7 @@ impl Table for PartitionTableImpl { } .fail()?, Some(partition_info) => { - DfPartitionRuleAdapter::new(partition_info, &self.space_table.table_data().schema()) + DfPartitionRuleAdapter::new(partition_info, &self.table_data.table_schema) .box_err() .context(CreatePartitionRule)? } diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index 0dcba650eb..1682d419d6 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -18,7 +18,7 @@ use std::{ use async_trait::async_trait; use common_types::{record_batch::RecordBatch, schema::RecordSchema}; use common_util::error::BoxError; -use config::Config; +pub use config::Config; use futures::{Stream, StreamExt}; use router::RouterRef; use snafu::ResultExt; diff --git a/router/Cargo.toml b/router/Cargo.toml index 4df5a99ca7..2a7aaa5922 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -21,5 +21,6 @@ meta_client = { workspace = true } moka = { version = "0.10", features = ["future"] } serde = { workspace = true } snafu = { workspace = true } +table_engine = { workspace = true } tokio = { workspace = true } twox-hash = "1.6" diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index 38f31ab53e..5129fe7a53 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! A router based on the [`cluster::Cluster`]. @@ -6,15 +6,24 @@ use async_trait::async_trait; use ceresdbproto::storage::{Route, RouteRequest}; use cluster::ClusterRef; use common_util::error::BoxError; -use meta_client::types::RouteTablesRequest; +use meta_client::types::{RouteTablesRequest, TableInfo}; use moka::future::Cache; use snafu::ResultExt; -use crate::{endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router}; +use crate::{ + endpoint::Endpoint, OtherWithCause, ParseEndpoint, PartitionTableInfo, Result, + RouteCacheConfig, Router, +}; + +#[derive(Clone)] +struct RouteData { + table_info: TableInfo, + endpoint: Option, +} pub struct ClusterBasedRouter { cluster: ClusterRef, - cache: Option>, + cache: Option>, } impl ClusterBasedRouter { @@ -36,50 +45,37 @@ impl ClusterBasedRouter { /// route table from local cache, return cache routes and tables which are /// not in cache - fn route_from_cache(&self, tables: Vec, routes: &mut Vec) -> Vec { + fn route_from_cache(&self, tables: &[String], routes: &mut Vec) -> Vec { let mut miss = vec![]; if let Some(cache) = &self.cache { for table in tables { - if let Some(route) = cache.get(&table) { + if let Some(route) = cache.get(table) { routes.push(route.clone()); } else { miss.push(table.clone()); } } } else { - miss = tables; + miss = tables.to_vec(); } miss } -} - -/// Make a route according to the table name and the raw endpoint. -fn make_route(table_name: &str, endpoint: &str) -> Result { - let endpoint: Endpoint = endpoint.parse().context(ParseEndpoint { endpoint })?; - - Ok(Route { - table: table_name.to_string(), - endpoint: Some(endpoint.into()), - }) -} - -#[async_trait] -impl Router for ClusterBasedRouter { - async fn route(&self, req: RouteRequest) -> Result> { - let req_ctx = req.context.unwrap(); + async fn route_with_cache( + &self, + tables: &Vec, + database: String, + ) -> Result> { // Firstly route table from local cache. - let mut routes = Vec::with_capacity(req.tables.len()); - let miss = self.route_from_cache(req.tables, &mut routes); - + let mut routes = Vec::with_capacity(tables.len()); + let miss = self.route_from_cache(tables, &mut routes); if miss.is_empty() { return Ok(routes); } - let route_tables_req = RouteTablesRequest { - schema_name: req_ctx.database, + schema_name: database, table_names: miss, }; @@ -96,16 +92,69 @@ impl Router for ClusterBasedRouter { for (table_name, route_entry) in route_resp.entries { for node_shard in route_entry.node_shards { if node_shard.shard_info.is_leader() { - let route = make_route(&table_name, &node_shard.endpoint)?; + let route = make_route(route_entry.table_info, &node_shard.endpoint)?; if let Some(cache) = &self.cache { // There may be data race here, and it is acceptable currently. cache.insert(table_name.clone(), route.clone()).await; } routes.push(route); + break; } } } - return Ok(routes); + Ok(routes) + } +} + +/// Make a route according to the table_info and the raw endpoint. +fn make_route(table_info: TableInfo, endpoint: &str) -> Result { + let endpoint: Endpoint = endpoint.parse().context(ParseEndpoint { endpoint })?; + + Ok(RouteData { + table_info, + endpoint: Some(endpoint), + }) +} + +#[async_trait] +impl Router for ClusterBasedRouter { + async fn route(&self, req: RouteRequest) -> Result> { + let req_ctx = req.context.unwrap(); + let route_data_vec = self.route_with_cache(&req.tables, req_ctx.database).await?; + Ok(route_data_vec + .into_iter() + .map(|v| Route { + table: v.table_info.name, + endpoint: v.endpoint.map(Into::into), + }) + .collect()) + } + + async fn fetch_partition_table_info( + &self, + schema: &str, + table: &str, + ) -> Result> { + let mut route_data_vec = self + .route_with_cache(&vec![table.to_string()], schema.to_string()) + .await?; + if route_data_vec.is_empty() { + return Ok(None); + } + + let route_data = route_data_vec.remove(0); + let table_info = route_data.table_info; + if table_info.partition_info.is_some() { + return Ok(Some(PartitionTableInfo { + id: table_info.id, + name: table_info.name, + schema_id: table_info.schema_id, + schema_name: table_info.schema_name, + partition_info: table_info.partition_info.unwrap(), + })); + } + + Ok(None) } } @@ -180,11 +229,12 @@ mod tests { entries.insert( table.clone(), RouteEntry { - table: TableInfo { + table_info: TableInfo { id: 0, name: table.clone(), schema_name: String::from("public"), schema_id: 0, + partition_info: None, }, node_shards: vec![NodeShard { endpoint: String::from("127.0.0.1:8831"), @@ -237,7 +287,7 @@ mod tests { assert_eq!(result.unwrap().len(), 2); let mut routes = Vec::with_capacity(tables.len()); - let miss = router.route_from_cache(tables, &mut routes); + let miss = router.route_from_cache(&tables, &mut routes); assert_eq!(routes.len(), 2); assert_eq!(miss.len(), 0); sleep(Duration::from_secs(1)); @@ -245,18 +295,18 @@ mod tests { // try to get table1 let tables = vec![table1.to_string()]; let mut routes = Vec::with_capacity(tables.len()); - let miss = router.route_from_cache(tables, &mut routes); + let miss = router.route_from_cache(&tables, &mut routes); assert_eq!(routes.len(), 1); - assert_eq!(routes[0].table, table1.to_string()); + assert_eq!(routes[0].table_info.name, table1.to_string()); assert_eq!(miss.len(), 0); // sleep 1.5s, table2 will be evicted, and table1 in cache sleep(Duration::from_millis(1500)); let tables = vec![table1.to_string(), table2.to_string()]; let mut routes = Vec::with_capacity(tables.len()); - let miss = router.route_from_cache(tables, &mut routes); + let miss = router.route_from_cache(&tables, &mut routes); assert_eq!(routes.len(), 1); - assert_eq!(routes[0].table, table1.to_string()); + assert_eq!(routes[0].table_info.name, table1.to_string()); assert_eq!(miss.len(), 1); assert_eq!(miss[0], table2.to_string()); } diff --git a/router/src/lib.rs b/router/src/lib.rs index e2fa1e752f..3a388d2025 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -9,10 +9,12 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use ceresdbproto::storage::{Route, RouteRequest}; pub use cluster_based::ClusterBasedRouter; +use common_types::{schema::SchemaId, table::TableId}; use common_util::{config::ReadableDuration, define_result}; pub use rule_based::{RuleBasedRouter, RuleList}; use serde::{Deserialize, Serialize}; use snafu::{Backtrace, Snafu}; +use table_engine::partition::PartitionInfo; #[derive(Snafu, Debug)] #[snafu(visibility(pub))] @@ -59,9 +61,23 @@ define_result!(Error); pub type RouterRef = Arc; +#[derive(Clone, Debug)] +pub struct PartitionTableInfo { + pub id: TableId, + pub name: String, + pub schema_id: SchemaId, + pub schema_name: String, + pub partition_info: PartitionInfo, +} + #[async_trait] pub trait Router { async fn route(&self, req: RouteRequest) -> Result>; + async fn fetch_partition_table_info( + &self, + schema: &str, + table: &str, + ) -> Result>; } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/router/src/rule_based.rs b/router/src/rule_based.rs index 608e0fbb9b..7ad772bdad 100644 --- a/router/src/rule_based.rs +++ b/router/src/rule_based.rs @@ -12,7 +12,9 @@ use meta_client::types::ShardId; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; -use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound}; +use crate::{ + endpoint::Endpoint, hash, PartitionTableInfo, Result, RouteNotFound, Router, ShardNotFound, +}; pub type ShardNodes = HashMap; @@ -170,4 +172,12 @@ impl Router for RuleBasedRouter { Ok(Vec::new()) } + + async fn fetch_partition_table_info( + &self, + _schema: &str, + _table: &str, + ) -> Result> { + return Ok(None); + } } diff --git a/server/Cargo.toml b/server/Cargo.toml index ba0e26c999..2c5dcd0615 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -34,6 +34,7 @@ log = { workspace = true } logger = { workspace = true } meta_client = { workspace = true } opensrv-mysql = "0.1.0" +partition_table_engine = { workspace = true } paste = { workspace = true } pprof = { version = "0.11.1", features = ["flamegraph"] } profile = { workspace = true } @@ -42,6 +43,7 @@ prometheus = { workspace = true } prometheus-static-metric = { workspace = true } prost = { workspace = true } query_engine = { workspace = true } +remote_engine_client = { workspace = true } router = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/server/src/grpc/meta_event_service/mod.rs b/server/src/grpc/meta_event_service/mod.rs index 15d6a8f068..fb15127622 100644 --- a/server/src/grpc/meta_event_service/mod.rs +++ b/server/src/grpc/meta_event_service/mod.rs @@ -176,6 +176,7 @@ impl MetaServiceImpl { .to_string(), table_operator: TableOperator::new(self.instance.catalog_manager.clone()), table_engine: self.instance.table_engine.clone(), + partition_table_engine: self.instance.partition_table_engine.clone(), wal_region_closer: self.wal_region_closer.clone(), } } @@ -187,6 +188,7 @@ struct HandlerContext { default_catalog: String, table_operator: TableOperator, table_engine: TableEngineRef, + partition_table_engine: TableEngineRef, wal_region_closer: WalRegionCloserRef, } @@ -314,7 +316,7 @@ async fn handle_create_table_on_shard( code: StatusCode::BadRequest, msg: "current shard info is missing ine CreateTableOnShardRequest", })?; - let table = request.table_info.context(ErrNoCause { + let table_info = request.table_info.context(ErrNoCause { code: StatusCode::BadRequest, msg: "table info is missing in the CreateTableOnShardRequest", })?; @@ -331,23 +333,25 @@ async fn handle_create_table_on_shard( ), })?; - let partition_info = match table.partition_info { - Some(v) => Some( - PartitionInfo::try_from(v.clone()) - .box_err() - .with_context(|| ErrWithCause { + let (table_engine, partition_info) = match table_info.partition_info { + Some(v) => { + let partition_info = Some(PartitionInfo::try_from(v.clone()).box_err().with_context( + || ErrWithCause { code: StatusCode::BadRequest, msg: format!("fail to parse partition info, partition_info:{v:?}"), - })?, - ), - None => None, + }, + )?); + (ctx.partition_table_engine.clone(), partition_info) + } + None => (ctx.table_engine.clone(), None), }; // Build create table request and options. let create_table_request = CreateTableRequest { catalog_name: catalog_name.clone(), - schema_name: table.schema_name, - table_name: table.name, + schema_name: table_info.schema_name, + table_name: table_info.name, + table_id: Some(TableId::new(table_info.id)), table_schema, engine: request.engine, options: request.options, @@ -355,8 +359,9 @@ async fn handle_create_table_on_shard( shard_id: shard_info.id, partition_info, }; + let create_opts = CreateOptions { - table_engine: ctx.table_engine, + table_engine, create_if_not_exists: request.create_if_not_exist, }; diff --git a/server/src/instance.rs b/server/src/instance.rs index 5044a29e0b..5518a340c9 100644 --- a/server/src/instance.rs +++ b/server/src/instance.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Instance contains shared states of service @@ -7,7 +7,7 @@ use std::sync::Arc; use catalog::manager::ManagerRef; use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; -use table_engine::engine::TableEngineRef; +use table_engine::{engine::TableEngineRef, remote::RemoteEngineRef}; use crate::limiter::Limiter; @@ -18,10 +18,12 @@ pub struct Instance { pub catalog_manager: ManagerRef, pub query_executor: Q, pub table_engine: TableEngineRef, + pub partition_table_engine: TableEngineRef, // User defined functions registry. pub function_registry: FunctionRegistryRef, pub limiter: Limiter, pub table_manipulator: TableManipulatorRef, + pub remote_engine_ref: RemoteEngineRef, } /// A reference counted instance pointer diff --git a/server/src/proxy/forward.rs b/server/src/proxy/forward.rs index a54e71e365..b29b302470 100644 --- a/server/src/proxy/forward.rs +++ b/server/src/proxy/forward.rs @@ -349,7 +349,7 @@ mod tests { use catalog::consts::DEFAULT_SCHEMA; use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse}; use futures::FutureExt; - use router::Router; + use router::{PartitionTableInfo, Router}; use tonic::IntoRequest; use super::*; @@ -390,6 +390,14 @@ mod tests { }]), } } + + async fn fetch_partition_table_info( + &self, + _schema: &str, + _table: &str, + ) -> router::Result> { + return Ok(None); + } } struct MockClientBuilder; diff --git a/server/src/proxy/grpc/sql_query.rs b/server/src/proxy/grpc/sql_query.rs index 83650b2c94..ca8aff418c 100644 --- a/server/src/proxy/grpc/sql_query.rs +++ b/server/src/proxy/grpc/sql_query.rs @@ -1,10 +1,11 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Query handler use std::{sync::Arc, time::Instant}; use arrow_ext::ipc::{CompressOptions, CompressionMethod, RecordBatchesEncoder}; +use catalog::schema::{CreateOptions, CreateTableRequest, DropOptions, DropTableRequest}; use ceresdbproto::{ common::ResponseHeader, storage::{ @@ -12,7 +13,7 @@ use ceresdbproto::{ ArrowPayload, SqlQueryRequest, SqlQueryResponse, }, }; -use common_types::{record_batch::RecordBatch, request_id::RequestId}; +use common_types::{record_batch::RecordBatch, request_id::RequestId, table::DEFAULT_SHARD_ID}; use common_util::{error::BoxError, time::InstantExt}; use futures::{stream, stream::BoxStream, FutureExt, StreamExt}; use http::StatusCode; @@ -20,11 +21,19 @@ use interpreters::{context::Context as InterpreterContext, factory::Factory, int use log::{error, info, warn}; use query_engine::executor::Executor as QueryExecutor; use router::endpoint::Endpoint; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use sql::{ + frontend, frontend::{Context as SqlContext, Frontend}, provider::CatalogMetaProvider, }; +use table_engine::{ + engine::TableState, + partition::{format_sub_partition_table_name, PartitionInfo}, + remote::model::{GetTableInfoRequest, TableIdentifier}, + table::TableId, + PARTITION_TABLE_ENGINE_TYPE, +}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Channel, IntoRequest}; @@ -77,7 +86,10 @@ impl Proxy { req: SqlQueryRequest, ) -> Result { let req = match self.maybe_forward_sql_query(&req).await { - Some(resp) => return resp, + Some(resp) => match resp { + ForwardResult::Forwarded(resp) => return resp, + ForwardResult::Original => req, + }, None => req, }; @@ -90,14 +102,14 @@ impl Proxy { ctx: Context, req: SqlQueryRequest, ) -> Result> { - let req = match self - .clone() - .maybe_forward_stream_sql_query(req.clone()) - .await - { - Some(resp) => return resp, + let req = match self.clone().maybe_forward_stream_sql_query(&req).await { + Some(resp) => match resp { + ForwardResult::Forwarded(resp) => return resp, + ForwardResult::Original => req, + }, None => req, }; + let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); let runtime = ctx.runtime.clone(); let resp_compress_min_length = self.resp_compress_min_length; @@ -134,7 +146,7 @@ impl Proxy { async fn maybe_forward_sql_query( &self, req: &SqlQueryRequest, - ) -> Option> { + ) -> Option> { if req.tables.len() != 1 { warn!("Unable to forward sql query without exactly one table, req:{req:?}",); @@ -166,11 +178,9 @@ impl Proxy { Box::new(query) as _ }; - match self.forwarder.forward(forward_req, do_query).await { - Ok(forward_res) => match forward_res { - ForwardResult::Forwarded(v) => Some(v), - ForwardResult::Original => None, - }, + let forward_result = self.forwarder.forward(forward_req, do_query).await; + match forward_result { + Ok(forward_res) => Some(forward_res), Err(e) => { error!("Failed to forward sql req but the error is ignored, err:{e}"); None @@ -180,8 +190,8 @@ impl Proxy { async fn maybe_forward_stream_sql_query( self: Arc, - req: SqlQueryRequest, - ) -> Option>> { + req: &SqlQueryRequest, + ) -> Option, Error>> { if req.tables.len() != 1 { warn!("Unable to forward sql query without exactly one table, req:{req:?}",); @@ -228,11 +238,10 @@ impl Proxy { Box::new(query) as _ }; - match self.forwarder.forward(forward_req, do_query).await { - Ok(forward_res) => match forward_res { - ForwardResult::Forwarded(v) => Some(v), - ForwardResult::Original => None, - }, + let forward_result = self.forwarder.forward(forward_req, do_query).await; + + match forward_result { + Ok(forward_res) => Some(forward_res), Err(e) => { error!("Failed to forward stream sql req but the error is ignored, err:{e}"); None @@ -294,6 +303,13 @@ impl Proxy { } ); + // Open partition table if needed. + let table_name = frontend::parse_table_name(&stmts); + if let Some(table_name) = table_name { + self.maybe_open_partition_table_if_not_exist(catalog, schema, &table_name) + .await?; + } + // Create logical plan // Note: Remember to store sql in error when creating logical plan let plan = frontend @@ -375,6 +391,140 @@ impl Proxy { Ok(output) } + + async fn maybe_open_partition_table_if_not_exist( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + ) -> Result<()> { + let partition_table_info = self + .router + .fetch_partition_table_info(schema_name, table_name) + .await?; + if partition_table_info.is_none() { + return Ok(()); + } + + let partition_table_info = partition_table_info.unwrap(); + + let catalog = self + .instance + .catalog_manager + .catalog_by_name(catalog_name) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Failed to find catalog, catalog_name:{catalog_name}"), + })? + .with_context(|| ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Catalog not found, catalog_name:{catalog_name}"), + })?; + + // TODO: support create schema if not exist + let schema = catalog + .schema_by_name(schema_name) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Failed to find schema, schema_name:{schema_name}"), + })? + .context(ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!("Schema not found, schema_name:{schema_name}"), + })?; + let table = schema + .table_by_name(&partition_table_info.name) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!( + "Failed to find table, table_name:{}", + partition_table_info.name + ), + })?; + + if let Some(table) = table { + if table.id().as_u64() == partition_table_info.id { + return Ok(()); + } + + // Drop partition table if table id not match. + let opts = DropOptions { + table_engine: self.instance.partition_table_engine.clone(), + }; + schema + .drop_table( + DropTableRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + engine: PARTITION_TABLE_ENGINE_TYPE.to_string(), + }, + opts, + ) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Failed to drop partition table, table_name:{table_name}"), + })?; + } + + // If table not exists, open it. + // Get table_schema from first sub partition table. + let first_sub_partition_table_name = get_sub_partition_name( + &partition_table_info.name, + &partition_table_info.partition_info, + 0usize, + ); + let table = self + .instance + .remote_engine_ref + .get_table_info(GetTableInfoRequest { + table: TableIdentifier { + catalog: catalog_name.to_string(), + schema: schema_name.to_string(), + table: first_sub_partition_table_name, + }, + }) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Failed to get table", + })?; + + // Partition table is a virtual table, so we need to create it manually. + // Partition info is stored in ceresmeta, so we need to use create_table_request + // to create it. + let create_table_request = CreateTableRequest { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: partition_table_info.name, + table_id: Some(TableId::new(partition_table_info.id)), + table_schema: table.table_schema, + engine: table.engine, + options: Default::default(), + state: TableState::Stable, + shard_id: DEFAULT_SHARD_ID, + partition_info: Some(partition_table_info.partition_info), + }; + let create_opts = CreateOptions { + table_engine: self.instance.partition_table_engine.clone(), + create_if_not_exists: true, + }; + schema + .create_table(create_table_request.clone(), create_opts) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Failed to create table, request:{create_table_request:?}"), + })?; + Ok(()) + } } // TODO(chenxiang): Output can have both `rows` and `affected_rows` @@ -498,3 +648,8 @@ impl QueryResponseWriter { Ok(resp) } } + +fn get_sub_partition_name(table_name: &str, partition_info: &PartitionInfo, id: usize) -> String { + let partition_name = partition_info.get_definitions()[id].name.clone(); + format_sub_partition_table_name(table_name, &partition_name) +} diff --git a/server/src/proxy/mod.rs b/server/src/proxy/mod.rs index c7ce5b104b..e1157498ed 100644 --- a/server/src/proxy/mod.rs +++ b/server/src/proxy/mod.rs @@ -62,6 +62,7 @@ impl Proxy { })?, ); let hotspot_recorder = Arc::new(HotspotRecorder::new(hotspot_config, runtime)); + Ok(Self { router, instance, diff --git a/server/src/server.rs b/server/src/server.rs index ee7d778969..8ee38c6b54 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Server @@ -11,7 +11,9 @@ use df_operator::registry::FunctionRegistryRef; use interpreters::table_manipulator::TableManipulatorRef; use log::{info, warn}; use logger::RuntimeLevel; +use partition_table_engine::PartitionTableEngine; use query_engine::executor::Executor as QueryExecutor; +use remote_engine_client::RemoteEngineImpl; use router::{endpoint::Endpoint, RouterRef}; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::engine::{EngineRuntimes, TableEngineRef}; @@ -174,6 +176,7 @@ impl Server { #[must_use] pub struct Builder { server_config: ServerConfig, + remote_engine_client_config: remote_engine_client::config::Config, node_addr: String, config_content: Option, engine_runtimes: Option>, @@ -195,6 +198,7 @@ impl Builder { pub fn new(config: ServerConfig) -> Self { Self { server_config: config, + remote_engine_client_config: remote_engine_client::Config::default(), node_addr: "".to_string(), config_content: None, engine_runtimes: None, @@ -300,15 +304,25 @@ impl Builder { let table_manipulator = self.table_manipulator.context(MissingTableManipulator)?; let function_registry = self.function_registry.context(MissingFunctionRegistry)?; let opened_wals = self.opened_wals.context(MissingWals)?; + let router = self.router.context(MissingRouter)?; + + let remote_engine_ref = Arc::new(RemoteEngineImpl::new( + self.remote_engine_client_config.clone(), + router.clone(), + )); + + let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone())); let instance = { let instance = Instance { catalog_manager, query_executor, table_engine, + partition_table_engine, function_registry, limiter: self.limiter, table_manipulator, + remote_engine_ref, }; InstanceRef::new(instance) }; @@ -328,7 +342,6 @@ impl Builder { let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?; let config_content = self.config_content.expect("Missing config content"); - let router = self.router.clone().context(MissingRouter)?; let provider = self .schema_config_provider @@ -359,7 +372,6 @@ impl Builder { .build() .context(BuildMysqlService)?; - let router = self.router.context(MissingRouter)?; let rpc_services = grpc::Builder::new() .endpoint( diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index 7ee9e8374a..a8c840495d 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Frontend @@ -11,10 +11,11 @@ use common_types::request_id::RequestId; use common_util::error::{BoxError, GenericError}; use influxql_parser::statement::Statement as InfluxqlStatement; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; +use sqlparser::ast::{SetExpr, Statement as SqlStatement, TableFactor}; use table_engine::table; use crate::{ - ast::Statement, + ast::{Statement, TableName}, parser::Parser, plan::Plan, planner::Planner, @@ -193,3 +194,91 @@ impl Frontend

{ .context(CreatePlan) } } + +pub fn parse_table_name(statements: &StatementVec) -> Option { + match &statements[0] { + Statement::Standard(s) => match *s.clone() { + SqlStatement::Insert { table_name, .. } => { + Some(TableName::from(table_name).to_string()) + } + SqlStatement::Explain { statement, .. } => { + if let SqlStatement::Query(q) = *statement { + match *q.body { + SetExpr::Select(select) => { + if select.from.len() != 1 { + None + } else if let TableFactor::Table { name, .. } = &select.from[0].relation + { + Some(TableName::from(name.clone()).to_string()) + } else { + None + } + } + // TODO: return unsupported error rather than none. + _ => None, + } + } else { + None + } + } + SqlStatement::Query(q) => match *q.body { + SetExpr::Select(select) => { + if select.from.len() != 1 { + None + } else if let TableFactor::Table { name, .. } = &select.from[0].relation { + Some(TableName::from(name.clone()).to_string()) + } else { + None + } + } + _ => None, + }, + _ => None, + }, + Statement::Create(s) => Some(s.table_name.to_string()), + Statement::Drop(s) => Some(s.table_name.to_string()), + Statement::Describe(s) => Some(s.table_name.to_string()), + Statement::AlterModifySetting(s) => Some(s.table_name.to_string()), + Statement::AlterAddColumn(s) => Some(s.table_name.to_string()), + Statement::ShowCreate(s) => Some(s.table_name.to_string()), + Statement::ShowTables(_s) => None, + Statement::ShowDatabases => None, + Statement::Exists(s) => Some(s.table_name.to_string()), + } +} + +#[cfg(test)] +mod tests { + use crate::{frontend::parse_table_name, parser::Parser}; + + #[test] + fn test_parse_table_name() { + let table = "test_parse_table_name"; + let test_cases = vec![ + format!("INSERT INTO {table} (t, name, value) VALUES (1651737067000, 'ceresdb', 100)"), + format!("INSERT INTO `{table}` (t, name, value) VALUES (1651737067000,'ceresdb', 100)"), + format!("select * from {table}"), + format!("select * from `{table}`"), + format!("explain select * from {table}"), + format!("explain select * from `{table}`"), + format!("CREATE TABLE {table} (`name`string TAG,`value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"), + format!("CREATE TABLE `{table}` (`name`string TAG,`value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t))"), + format!("drop table {table}"), + format!("drop table `{table}`"), + format!("describe table {table}"), + format!("describe table `{table}`"), + format!("alter table {table} modify setting enable_ttl='false'"), + format!("alter table `{table}` modify setting enable_ttl='false'"), + format!("alter table {table} add column c1 int"), + format!("alter table `{table}` add column c1 int"), + format!("show create table {table}"), + format!("show create table `{table}`"), + format!("exists table {table}"), + format!("exists table `{table}`"), + ]; + for sql in test_cases { + let statements = Parser::parse_sql(&sql).unwrap(); + assert_eq!(parse_table_name(&statements), Some(table.to_string())); + } + } +} diff --git a/src/setup.rs b/src/setup.rs index c2cae015d6..a8180c2a93 100644 --- a/src/setup.rs +++ b/src/setup.rs @@ -225,7 +225,6 @@ async fn build_with_meta( .expect("Failed to setup analytic engine"); let engine_builder = EngineBuilder { config: &config.analytic, - router: Some(router.clone()), engine_runtimes: runtimes.clone(), opened_wals: opened_wals.clone(), }; @@ -265,7 +264,6 @@ async fn build_without_meta( .expect("Failed to setup analytic engine"); let engine_builder = EngineBuilder { config: &config.analytic, - router: None, engine_runtimes: runtimes.clone(), opened_wals: opened_wals.clone(), }; diff --git a/table_engine/src/engine.rs b/table_engine/src/engine.rs index 443bbd2c8b..d54cb6a957 100644 --- a/table_engine/src/engine.rs +++ b/table_engine/src/engine.rs @@ -27,7 +27,7 @@ pub enum Error { #[snafu(display("Table already exists, table:{}.\nBacktrace:\n{}", table, backtrace))] TableExists { table: String, backtrace: Backtrace }, - #[snafu(display("Invalid arguments, err:{}", source))] + #[snafu(display("Invalid arguments, table:{table}, err:{source}"))] InvalidArguments { table: String, source: GenericError }, #[snafu(display("Failed to write meta data, err:{}", source))] @@ -36,7 +36,7 @@ pub enum Error { #[snafu(display("Unexpected error, err:{}", source))] Unexpected { source: GenericError }, - #[snafu(display("Unexpected error, :msg{}", msg))] + #[snafu(display("Unexpected error, msg:{}", msg))] UnexpectedNoCause { msg: String }, #[snafu(display( diff --git a/table_engine/src/lib.rs b/table_engine/src/lib.rs index 9874c6c239..f8ddf0fb42 100644 --- a/table_engine/src/lib.rs +++ b/table_engine/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Table engine facade, provides read/write interfaces of table @@ -20,3 +20,4 @@ pub const OPTION_KEY_ENABLE_TTL: &str = "enable_ttl"; pub const MEMORY_ENGINE_TYPE: &str = "Memory"; pub const ANALYTIC_ENGINE_TYPE: &str = "Analytic"; +pub const PARTITION_TABLE_ENGINE_TYPE: &str = "PartitionTable"; diff --git a/table_engine/src/partition/mod.rs b/table_engine/src/partition/mod.rs index 68f3bc23a2..4e3d45548b 100644 --- a/table_engine/src/partition/mod.rs +++ b/table_engine/src/partition/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Partitioned table supports @@ -232,10 +232,12 @@ impl TryFrom for PartitionInfo { } } +#[inline] pub fn format_sub_partition_table_name(table_name: &str, partition_name: &str) -> String { format!("{PARTITION_TABLE_PREFIX}{table_name}_{partition_name}") } +#[inline] pub fn is_sub_partition_table(table_name: &str) -> bool { table_name.starts_with(PARTITION_TABLE_PREFIX) }