diff --git a/Cargo.lock b/Cargo.lock index 2ac2ae90178a9..da4c0f2bd8805 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1814,10 +1814,13 @@ dependencies = [ "chrono", "common-arrow", "common-base", + "common-config", "common-exception", "common-expression", "common-io", + "common-meta-api", "common-meta-app", + "common-meta-store", "common-meta-types", "common-pipeline-core", "common-settings", diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 6c6439b704934..b88f90446c512 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -146,6 +146,7 @@ build_exceptions! { // The table is not a clustered table. UnclusteredTable(1118), UnknownCatalog(11119), + UnknownCatalogType(11120), // Data Related Errors diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 67adb83c88241..f43b300763141 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -404,6 +404,16 @@ impl DataOperator { Ok(()) } + /// Create a new data operator without check. + pub fn try_new(sp: &StorageParams) -> common_exception::Result { + let operator = init_operator(sp)?; + + Ok(DataOperator { + operator, + params: sp.clone(), + }) + } + #[async_backtrace::framed] pub async fn try_create(sp: &StorageParams) -> common_exception::Result { let operator = init_operator(sp)?; diff --git a/src/meta/app/src/schema/catalog.rs b/src/meta/app/src/schema/catalog.rs index c921a941ee017..e0db6d532b64a 100644 --- a/src/meta/app/src/schema/catalog.rs +++ b/src/meta/app/src/schema/catalog.rs @@ -20,7 +20,7 @@ use chrono::Utc; use crate::storage::StorageParams; -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)] pub enum CatalogType { Default = 1, Hive = 2, @@ -40,12 +40,26 @@ impl Display for CatalogType { /// different options for creating catalogs #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] pub enum CatalogOption { - // hms_address + /// The default catalog. + /// + /// It's not allowed to create a new default catalog. + Default, + // Catalog option for hive. Hive(HiveCatalogOption), - // Uri location for iceberg + // Catalog option for Iceberg. Iceberg(IcebergCatalogOption), } +impl CatalogOption { + pub fn catalog_type(&self) -> CatalogType { + match self { + CatalogOption::Default => CatalogType::Default, + CatalogOption::Hive(_) => CatalogType::Hive, + CatalogOption::Iceberg(_) => CatalogType::Iceberg, + } + } +} + /// Option for creating a iceberg catalog #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] pub struct HiveCatalogOption { @@ -65,6 +79,34 @@ pub struct CatalogInfo { pub meta: CatalogMeta, } +impl CatalogInfo { + /// Get the catalog type via catalog info. + pub fn catalog_type(&self) -> CatalogType { + self.meta.catalog_option.catalog_type() + } + + /// Get the catalog name via catalog info. + pub fn catalog_name(&self) -> &str { + &self.name_ident.catalog_name + } + + /// Create a new default catalog info. + pub fn new_default() -> CatalogInfo { + Self { + id: CatalogId { catalog_id: 0 }, + name_ident: CatalogNameIdent { + // tenant for default catalog is not used. + tenant: "".to_string(), + catalog_name: "default".to_string(), + }, + meta: CatalogMeta { + catalog_option: CatalogOption::Default, + created_on: Default::default(), + }, + } + } +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] pub struct CatalogMeta { pub catalog_option: CatalogOption, diff --git a/src/meta/proto-conv/src/catalog_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/catalog_from_to_protobuf_impl.rs index 5db784cfede1d..384340d337012 100644 --- a/src/meta/proto-conv/src/catalog_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/catalog_from_to_protobuf_impl.rs @@ -102,6 +102,11 @@ impl FromToProto for mt::CatalogMeta { ver: VER, min_reader_ver: MIN_READER_VER, option: match self.catalog_option.clone() { + CatalogOption::Default => { + return Err(Incompatible { + reason: "CatalogOption.default is invalid for metsrv".to_string(), + }); + } CatalogOption::Hive(v) => Some(pb::CatalogOption { catalog_option: Some(pb::catalog_option::CatalogOption::Hive( pb::HiveCatalogOption { diff --git a/src/query/catalog/Cargo.toml b/src/query/catalog/Cargo.toml index ea3eeb8c58a0b..c079e749ef504 100644 --- a/src/query/catalog/Cargo.toml +++ b/src/query/catalog/Cargo.toml @@ -11,10 +11,13 @@ test = false [dependencies] common-arrow = { path = "../../common/arrow" } common-base = { path = "../../common/base" } +common-config = { path = "../config" } common-exception = { path = "../../common/exception" } common-expression = { path = "../expression" } common-io = { path = "../../common/io" } +common-meta-api = { path = "../../meta/api" } common-meta-app = { path = "../../meta/app" } +common-meta-store = { path = "../../meta/store" } common-meta-types = { path = "../../meta/types" } common-pipeline-core = { path = "../pipeline/core" } common-settings = { path = "../settings" } diff --git a/src/query/catalog/src/catalog/interface.rs b/src/query/catalog/src/catalog/interface.rs index 7dda5adcf17c8..993487a517ba5 100644 --- a/src/query/catalog/src/catalog/interface.rs +++ b/src/query/catalog/src/catalog/interface.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -85,8 +87,19 @@ pub struct StorageDescription { pub support_cluster_key: bool, } +pub trait CatalogCreator: Send + Sync + Debug { + fn try_create(&self, info: &CatalogInfo) -> Result>; +} + #[async_trait::async_trait] -pub trait Catalog: DynClone + Send + Sync { +pub trait Catalog: DynClone + Send + Sync + Debug { + /// Catalog itself + + // Get the name of the catalog. + fn name(&self) -> String; + // Get the info of the catalog. + fn info(&self) -> CatalogInfo; + /// Database. // Get the database by name. diff --git a/src/query/catalog/src/catalog/manager.rs b/src/query/catalog/src/catalog/manager.rs index 65581c3df09c6..7a307cadd2c49 100644 --- a/src/query/catalog/src/catalog/manager.rs +++ b/src/query/catalog/src/catalog/manager.rs @@ -12,66 +12,229 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; +use chrono::Utc; use common_base::base::GlobalInstance; +use common_config::CatalogConfig; +use common_config::InnerConfig; use common_exception::ErrorCode; use common_exception::Result; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; +use common_meta_api::SchemaApi; +use common_meta_app::schema::CatalogId; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::CatalogMeta; +use common_meta_app::schema::CatalogNameIdent; +use common_meta_app::schema::CatalogOption; +use common_meta_app::schema::CatalogType; +use common_meta_app::schema::CreateCatalogReq; +use common_meta_app::schema::DropCatalogReq; +use common_meta_app::schema::GetCatalogReq; +use common_meta_app::schema::HiveCatalogOption; +use common_meta_app::schema::ListCatalogReq; +use common_meta_store::MetaStore; +use common_meta_store::MetaStoreProvider; use super::Catalog; +use super::CatalogCreator; pub const CATALOG_DEFAULT: &str = "default"; pub struct CatalogManager { - pub catalogs: DashMap>, + pub meta: MetaStore, + + /// default_catalog is the DEFAULT catalog. + pub default_catalog: Arc, + /// external catalogs is the external catalogs that configured in config. + pub external_catalogs: HashMap>, + + /// catalog_creators is the catalog creators that registered. + pub catalog_creators: HashMap>, } impl CatalogManager { - pub fn get_catalog(&self, catalog_name: &str) -> Result> { - self.catalogs - .get(catalog_name) - .as_deref() - .cloned() - .ok_or_else(|| ErrorCode::BadArguments(format!("no such catalog {}", catalog_name))) - } - + /// Fetch catalog manager from global instance. pub fn instance() -> Arc { GlobalInstance::get() } - pub fn insert_catalog( - &self, - catalog_name: &str, - catalog: Arc, - if_not_exists: bool, + /// Init the catalog manager in global instance. + #[async_backtrace::framed] + pub async fn init( + conf: &InnerConfig, + default_catalog: Arc, + catalog_creators: Vec<(CatalogType, Arc)>, ) -> Result<()> { - // NOTE: - // - // Concurrent write may happen here, should be carefully dealt with. - // The problem occurs when the entry is vacant: - // - // Using `DashMap::entry` can occupy the write lock on the entry, - // ensuring a safe concurrent writing. - // - // If getting with `DashMap::get_mut`, it will unlock the entry and return `None` directly. - // This makes a safe concurrent write hard to implement. - match self.catalogs.entry(catalog_name.to_string()) { - Entry::Occupied(_) => { - if if_not_exists { - Ok(()) - } else { - Err(ErrorCode::CatalogAlreadyExists(format!( - "Catalog {} already exists", - catalog_name - ))) - } - } - Entry::Vacant(v) => { - v.insert(catalog); - Ok(()) - } + GlobalInstance::set(Self::try_create(conf, default_catalog, catalog_creators).await?); + + Ok(()) + } + + /// Try to create a catalog manager via Config. + #[async_backtrace::framed] + async fn try_create( + conf: &InnerConfig, + default_catalog: Arc, + catalog_creators: Vec<(CatalogType, Arc)>, + ) -> Result> { + let meta = { + let provider = Arc::new(MetaStoreProvider::new(conf.meta.to_meta_grpc_client_conf())); + + provider.create_meta_store().await? + }; + + let tenant = conf.query.tenant_id.clone(); + let catalog_creators = HashMap::from_iter(catalog_creators.into_iter()); + + // init external catalogs. + let mut external_catalogs = HashMap::default(); + for (name, ctl_cfg) in conf.catalogs.iter() { + let CatalogConfig::Hive(hive_ctl_cfg) = ctl_cfg; + let creator = catalog_creators.get(&CatalogType::Hive).ok_or_else(|| { + ErrorCode::BadArguments(format!("unknown catalog type: {:?}", CatalogType::Hive)) + })?; + let ctl = creator.try_create(&CatalogInfo { + id: CatalogId { catalog_id: 0 }, + name_ident: CatalogNameIdent { + tenant: tenant.clone(), + catalog_name: name.clone(), + }, + meta: CatalogMeta { + catalog_option: CatalogOption::Hive(HiveCatalogOption { + address: hive_ctl_cfg.address.clone(), + }), + created_on: Utc::now(), + }, + })?; + external_catalogs.insert(name.clone(), ctl); + } + + let catalog_manager = Self { + meta, + default_catalog, + external_catalogs, + catalog_creators, + }; + + Ok(Arc::new(catalog_manager)) + } + + /// Get default catalog from manager. + /// + /// There are some place that we don't have async context, so we provide + /// `get_default_catalog` to allow users fetch default catalog without async. + pub fn get_default_catalog(&self) -> Result> { + Ok(self.default_catalog.clone()) + } + + /// build_catalog builds a catalog from catalog info. + pub fn build_catalog(&self, info: &CatalogInfo) -> Result> { + let typ = info.meta.catalog_option.catalog_type(); + + if typ == CatalogType::Default { + return Ok(self.default_catalog.clone()); + } + + let creator = self + .catalog_creators + .get(&typ) + .ok_or_else(|| ErrorCode::BadArguments(format!("unknown catalog type: {:?}", typ)))?; + + creator.try_create(info) + } + + /// Get a catalog from manager. + /// + /// # NOTES + /// + /// DEFAULT catalog is handled specially via `get_default_catalog`. Other catalogs + /// will be fetched from metasrv. + #[async_backtrace::framed] + pub async fn get_catalog(&self, tenant: &str, catalog_name: &str) -> Result> { + if catalog_name == CATALOG_DEFAULT { + return self.get_default_catalog(); + } + + if let Some(ctl) = self.external_catalogs.get(catalog_name) { + return Ok(ctl.clone()); } + + // Get catalog from metasrv. + let info = self + .meta + .get_catalog(GetCatalogReq::new(tenant, catalog_name)) + .await?; + + self.build_catalog(&info) + } + + /// Create a new catalog. + /// + /// # NOTES + /// + /// Trying to create default catalog will return an error. + #[async_backtrace::framed] + pub async fn create_catalog(&self, req: CreateCatalogReq) -> Result<()> { + if req.catalog_name() == CATALOG_DEFAULT { + return Err(ErrorCode::BadArguments( + "default catalog cannot be created".to_string(), + )); + } + + if self.external_catalogs.get(req.catalog_name()).is_some() { + return Err(ErrorCode::BadArguments( + "catalog already exists that cannot be created".to_string(), + )); + } + + let _ = self.meta.create_catalog(req).await; + + Ok(()) + } + + /// Drop a catalog. + /// + /// # NOTES + /// + /// Trying to drop default catalog will return an error. + #[async_backtrace::framed] + pub async fn drop_catalog(&self, req: DropCatalogReq) -> Result<()> { + let catalog_name = &req.name_ident.catalog_name; + + if catalog_name == CATALOG_DEFAULT { + return Err(ErrorCode::BadArguments( + "default catalog cannot be dropped".to_string(), + )); + } + + if self.external_catalogs.get(catalog_name).is_some() { + return Err(ErrorCode::BadArguments( + "catalog already exists that cannot be dropped".to_string(), + )); + } + + let _ = self.meta.drop_catalog(req).await; + + Ok(()) + } + + #[async_backtrace::framed] + pub async fn list_catalogs(&self, tenant: &str) -> Result>> { + let mut catalogs = vec![self.get_default_catalog()?]; + + // insert external catalogs. + for ctl in self.external_catalogs.values() { + catalogs.push(ctl.clone()); + } + + // fecth catalogs from metasrv. + let infos = self.meta.list_catalogs(ListCatalogReq::new(tenant)).await?; + + for info in infos { + catalogs.push(self.build_catalog(&info)?); + } + + Ok(catalogs) } } diff --git a/src/query/catalog/src/catalog/mod.rs b/src/query/catalog/src/catalog/mod.rs index 436e406c9def1..8befa374491f3 100644 --- a/src/query/catalog/src/catalog/mod.rs +++ b/src/query/catalog/src/catalog/mod.rs @@ -18,6 +18,7 @@ mod interface; mod manager; pub use interface::Catalog; +pub use interface::CatalogCreator; pub use interface::StorageDescription; pub use manager::CatalogManager; pub use manager::CATALOG_DEFAULT; diff --git a/src/query/catalog/src/plan/datasource/datasource_plan.rs b/src/query/catalog/src/plan/datasource/datasource_plan.rs index fb8c10819b660..61751408dac7c 100644 --- a/src/query/catalog/src/plan/datasource/datasource_plan.rs +++ b/src/query/catalog/src/plan/datasource/datasource_plan.rs @@ -17,6 +17,7 @@ use std::collections::BTreeMap; use common_expression::FieldIndex; use common_expression::RemoteExpr; use common_expression::TableSchemaRef; +use common_meta_app::schema::CatalogInfo; use crate::plan::datasource::datasource_info::DataSourceInfo; use crate::plan::PartStatistics; @@ -27,8 +28,7 @@ use crate::table_args::TableArgs; // TODO: Delete the scan plan field, but it depends on plan_parser:L394 #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct DataSourcePlan { - // TODO catalog id is better - pub catalog: String, + pub catalog_info: CatalogInfo, pub source_info: DataSourceInfo, pub output_schema: TableSchemaRef, diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 41a7cbc4fb204..4ff2692b9b160 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -384,7 +384,7 @@ pub trait TableExt: Table { let table_info = self.get_table_info(); let name = table_info.name.clone(); let tid = table_info.ident.table_id; - let catalog = ctx.get_catalog(table_info.catalog())?; + let catalog = ctx.get_catalog(table_info.catalog()).await?; let (ident, meta) = catalog.get_table_meta_by_id(tid).await?; let table_info: TableInfo = TableInfo { ident, diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index badedb58d34ac..545b33ee66516 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -108,7 +108,8 @@ pub trait TableContext: Send + Sync { fn get_query_str(&self) -> String; fn get_fragment_id(&self) -> usize; - fn get_catalog(&self, catalog_name: &str) -> Result>; + async fn get_catalog(&self, catalog_name: &str) -> Result>; + fn get_default_catalog(&self) -> Result>; fn get_id(&self) -> String; fn get_current_catalog(&self) -> String; fn check_aborting(&self) -> Result<()>; diff --git a/src/query/ee-features/table-lock/src/table_lock_heartbeat.rs b/src/query/ee-features/table-lock/src/table_lock_heartbeat.rs index 2f421afa0b451..64a63831fbd5b 100644 --- a/src/query/ee-features/table-lock/src/table_lock_heartbeat.rs +++ b/src/query/ee-features/table-lock/src/table_lock_heartbeat.rs @@ -53,7 +53,7 @@ impl TableLockHeartbeat { let shutdown_notify = self.shutdown_notify.clone(); async move { - let catalog = ctx.get_catalog(table_info.catalog())?; + let catalog = ctx.get_catalog(table_info.catalog()).await?; let mut notified = Box::pin(shutdown_notify.notified()); while !shutdown_flag.load(Ordering::Relaxed) { let mills = { diff --git a/src/query/ee/src/table_lock/table_lock_handler.rs b/src/query/ee/src/table_lock/table_lock_handler.rs index 267a4b3d943a8..5646f030540ed 100644 --- a/src/query/ee/src/table_lock/table_lock_handler.rs +++ b/src/query/ee/src/table_lock/table_lock_handler.rs @@ -41,7 +41,7 @@ impl TableLockHandler for RealTableLockHandler { ctx: Arc, table_info: TableInfo, ) -> Result { - let catalog = ctx.get_catalog(table_info.catalog())?; + let catalog = ctx.get_catalog(table_info.catalog()).await?; let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?; // get a new table lock revision. let res = catalog diff --git a/src/query/ee/tests/it/aggregating_index/index_refresh.rs b/src/query/ee/tests/it/aggregating_index/index_refresh.rs index 98bd83705e78a..965ac149e2c1b 100644 --- a/src/query/ee/tests/it/aggregating_index/index_refresh.rs +++ b/src/query/ee/tests/it/aggregating_index/index_refresh.rs @@ -61,7 +61,7 @@ async fn create_index(ctx: Arc, index_name: &str, query: &str) -> let plan = plan_sql(ctx.clone(), &sql).await?; if let Plan::CreateIndex(plan) = plan { - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let create_index_req = CreateIndexReq { if_not_exists: plan.if_not_exists, name_ident: IndexNameIdent { diff --git a/src/query/service/src/api/http/v1/tenant_tables.rs b/src/query/service/src/api/http/v1/tenant_tables.rs index c48be60ef3ab7..c2bb8093024b7 100644 --- a/src/query/service/src/api/http/v1/tenant_tables.rs +++ b/src/query/service/src/api/http/v1/tenant_tables.rs @@ -15,7 +15,6 @@ use chrono::DateTime; use chrono::Utc; use common_catalog::catalog::CatalogManager; -use common_catalog::catalog_kind::CATALOG_DEFAULT; use common_config::GlobalConfig; use common_exception::Result; use poem::web::Json; @@ -44,7 +43,7 @@ pub struct TenantTableInfo { } async fn load_tenant_tables(tenant: &str) -> Result { - let catalog = CatalogManager::instance().get_catalog(CATALOG_DEFAULT)?; + let catalog = CatalogManager::instance().get_default_catalog()?; let databases = catalog.list_databases(tenant).await?; let mut table_infos: Vec = vec![]; diff --git a/src/query/service/src/catalogs/catalog.rs b/src/query/service/src/catalogs/catalog.rs deleted file mode 100644 index 6f65196f82369..0000000000000 --- a/src/query/service/src/catalogs/catalog.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use common_catalog::catalog::Catalog; diff --git a/src/query/service/src/catalogs/catalog_manager.rs b/src/query/service/src/catalogs/catalog_manager.rs deleted file mode 100644 index 5bccae5dc5bfa..0000000000000 --- a/src/query/service/src/catalogs/catalog_manager.rs +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_base::base::GlobalInstance; -use common_catalog::catalog::Catalog; -pub use common_catalog::catalog::CatalogManager; -use common_catalog::catalog_kind::CATALOG_DEFAULT; -use common_config::CatalogConfig; -use common_config::InnerConfig; -use common_exception::ErrorCode; -use common_exception::Result; -use common_meta_app::schema::CatalogOption; -use common_meta_app::schema::CreateCatalogReq; -use common_meta_app::schema::DropCatalogReq; -use common_meta_app::schema::IcebergCatalogOption; -use common_storage::DataOperator; -#[cfg(feature = "hive")] -use common_storages_hive::HiveCatalog; -use common_storages_iceberg::IcebergCatalog; -use dashmap::DashMap; - -use crate::catalogs::DatabaseCatalog; - -#[async_trait::async_trait] -pub trait CatalogManagerHelper { - async fn init(conf: &InnerConfig) -> Result<()>; - - async fn try_create(conf: &InnerConfig) -> Result>; - - async fn register_build_in_catalogs(&self, conf: &InnerConfig) -> Result<()>; - - fn register_external_catalogs(&self, conf: &InnerConfig) -> Result<()>; - - /// build catalog from sql - async fn create_user_defined_catalog(&self, req: CreateCatalogReq) -> Result<()>; - - fn drop_user_defined_catalog(&self, req: DropCatalogReq) -> Result<()>; -} - -#[async_trait::async_trait] -impl CatalogManagerHelper for CatalogManager { - #[async_backtrace::framed] - async fn init(conf: &InnerConfig) -> Result<()> { - GlobalInstance::set(Self::try_create(conf).await?); - - Ok(()) - } - - #[async_backtrace::framed] - async fn try_create(conf: &InnerConfig) -> Result> { - let catalog_manager = CatalogManager { - catalogs: DashMap::new(), - }; - - catalog_manager.register_build_in_catalogs(conf).await?; - - #[cfg(feature = "hive")] - { - catalog_manager.register_external_catalogs(conf)?; - } - - Ok(Arc::new(catalog_manager)) - } - - #[async_backtrace::framed] - async fn register_build_in_catalogs(&self, conf: &InnerConfig) -> Result<()> { - let default_catalog: Arc = - Arc::new(DatabaseCatalog::try_create_with_config(conf.clone()).await?); - self.catalogs - .insert(CATALOG_DEFAULT.to_owned(), default_catalog); - Ok(()) - } - - fn register_external_catalogs(&self, conf: &InnerConfig) -> Result<()> { - // currently, if the `hive` feature is not enabled - // the loop will quit after the first iteration. - // this is expected. - #[allow(clippy::never_loop)] - for (name, ctl) in conf.catalogs.iter() { - match ctl { - CatalogConfig::Hive(ctl) => { - // register hive catalog - #[cfg(not(feature = "hive"))] - { - return Err(ErrorCode::CatalogNotSupported(format!( - "Failed to create catalog {} to {}: Hive catalog is not enabled, please recompile with --features hive", - name, ctl.address - ))); - } - #[cfg(feature = "hive")] - { - let hms_address = ctl.address.clone(); - let hive_catalog = Arc::new(HiveCatalog::try_create(hms_address)?); - self.catalogs.insert(name.to_string(), hive_catalog); - } - } - } - } - Ok(()) - } - - #[async_backtrace::framed] - async fn create_user_defined_catalog(&self, req: CreateCatalogReq) -> Result<()> { - let catalog_option = req.meta.catalog_option; - - // create catalog first - match catalog_option { - // NOTE: - // when compiling without `hive` feature enabled - // `address` will be seem as unused, which is not intentional - #[allow(unused)] - CatalogOption::Hive(cfg) => { - #[cfg(not(feature = "hive"))] - { - Err(ErrorCode::CatalogNotSupported( - "Hive catalog is not enabled, please recompile with --features hive", - )) - } - #[cfg(feature = "hive")] - { - let catalog: Arc = Arc::new(HiveCatalog::try_create(cfg.address)?); - let ctl_name = &req.name_ident.catalog_name; - let if_not_exists = req.if_not_exists; - - self.insert_catalog(ctl_name, catalog, if_not_exists) - } - } - CatalogOption::Iceberg(opt) => { - let IcebergCatalogOption { storage_params: sp } = opt; - - let data_operator = DataOperator::try_create(&sp).await?; - let ctl_name = &req.name_ident.catalog_name; - let catalog: Arc = - Arc::new(IcebergCatalog::try_create(ctl_name, data_operator)?); - - let if_not_exists = req.if_not_exists; - self.insert_catalog(ctl_name, catalog, if_not_exists) - } - } - } - - fn drop_user_defined_catalog(&self, req: DropCatalogReq) -> Result<()> { - let name = req.name_ident.catalog_name; - if name == CATALOG_DEFAULT { - return Err(ErrorCode::CatalogNotSupported( - "Dropping the DEFAULT catalog is not allowed", - )); - } - - match self.catalogs.remove(&name) { - Some(_) => Ok(()), - - None if req.if_exists => Ok(()), - - None => Err(ErrorCode::CatalogNotFound(format!( - "Catalog {} has to be exists", - name - ))), - } - } -} diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index 62d8c6598f014..bb930a112f2ad 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -13,8 +13,11 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; +use common_catalog::catalog::Catalog; use common_catalog::catalog::StorageDescription; use common_catalog::database::Database; use common_catalog::table_args::TableArgs; @@ -22,6 +25,7 @@ use common_catalog::table_function::TableFunction; use common_config::InnerConfig; use common_exception::ErrorCode; use common_exception::Result; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -78,7 +82,6 @@ use common_meta_app::schema::VirtualColumnMeta; use common_meta_types::MetaId; use log::info; -use crate::catalogs::catalog::Catalog; use crate::catalogs::default::ImmutableCatalog; use crate::catalogs::default::MutableCatalog; use crate::storages::Table; @@ -98,6 +101,12 @@ pub struct DatabaseCatalog { table_function_factory: Arc, } +impl Debug for DatabaseCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultCatalog").finish_non_exhaustive() + } +} + impl DatabaseCatalog { pub fn create( immutable_catalog: Arc, @@ -131,6 +140,14 @@ impl Catalog for DatabaseCatalog { self } + fn name(&self) -> String { + "default".to_string() + } + + fn info(&self) -> CatalogInfo { + CatalogInfo::new_default() + } + #[async_backtrace::framed] async fn get_database(&self, tenant: &str, db_name: &str) -> Result> { if tenant.is_empty() { diff --git a/src/query/service/src/catalogs/default/immutable_catalog.rs b/src/query/service/src/catalogs/default/immutable_catalog.rs index 03d5d0b55ac70..a3adbb3fbca58 100644 --- a/src/query/service/src/catalogs/default/immutable_catalog.rs +++ b/src/query/service/src/catalogs/default/immutable_catalog.rs @@ -13,11 +13,15 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; +use common_catalog::catalog::Catalog; use common_config::InnerConfig; use common_exception::ErrorCode; use common_exception::Result; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -69,7 +73,6 @@ use common_meta_app::schema::UpsertTableOptionReq; use common_meta_app::schema::VirtualColumnMeta; use common_meta_types::MetaId; -use crate::catalogs::catalog::Catalog; use crate::catalogs::InMemoryMetas; use crate::catalogs::SYS_DB_ID_BEGIN; use crate::catalogs::SYS_TBL_ID_BEGIN; @@ -87,6 +90,12 @@ pub struct ImmutableCatalog { sys_db_meta: Arc, } +impl Debug for ImmutableCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ImmutableCatalog").finish_non_exhaustive() + } +} + impl ImmutableCatalog { #[async_backtrace::framed] pub async fn try_create_with_config(conf: &InnerConfig) -> Result { @@ -112,6 +121,14 @@ impl Catalog for ImmutableCatalog { self } + fn name(&self) -> String { + "default".to_string() + } + + fn info(&self) -> CatalogInfo { + CatalogInfo::new_default() + } + #[async_backtrace::framed] async fn get_database(&self, _tenant: &str, db_name: &str) -> Result> { match db_name { diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 38dbeb2f59230..dd33a6b0fb18d 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -13,12 +13,16 @@ // limitations under the License. use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; use chrono::Utc; +use common_catalog::catalog::Catalog; use common_config::InnerConfig; use common_exception::Result; use common_meta_api::SchemaApi; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -88,7 +92,6 @@ use common_meta_types::MetaId; use log::info; use super::catalog_context::CatalogContext; -use crate::catalogs::catalog::Catalog; use crate::databases::Database; use crate::databases::DatabaseContext; use crate::databases::DatabaseFactory; @@ -107,6 +110,12 @@ pub struct MutableCatalog { tenant: String, } +impl Debug for MutableCatalog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MutableCatalog").finish_non_exhaustive() + } +} + impl MutableCatalog { /// The component hierarchy is layered as: /// ```text @@ -178,6 +187,14 @@ impl Catalog for MutableCatalog { self } + fn name(&self) -> String { + "default".to_string() + } + + fn info(&self) -> CatalogInfo { + CatalogInfo::new_default() + } + #[async_backtrace::framed] async fn get_database(&self, tenant: &str, db_name: &str) -> Result> { let db_info = self diff --git a/src/query/service/src/catalogs/mod.rs b/src/query/service/src/catalogs/mod.rs index 2d12ad6dc9d21..1ab276ff74539 100644 --- a/src/query/service/src/catalogs/mod.rs +++ b/src/query/service/src/catalogs/mod.rs @@ -12,13 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod catalog; - -mod catalog_manager; pub mod default; -pub use catalog::Catalog; -pub use catalog_manager::CatalogManager; -pub use catalog_manager::CatalogManagerHelper; +pub use common_catalog::catalog::Catalog; #[cfg(feature = "hive")] pub use common_storages_hive as hive; pub use default::table_id_ranges::*; diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 7ed60ea647369..4f55c5e2b1187 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -12,17 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_base::base::GlobalInstance; use common_base::runtime::GlobalIORuntime; use common_base::runtime::GlobalQueryRuntime; +use common_catalog::catalog::CatalogCreator; use common_catalog::catalog::CatalogManager; use common_config::GlobalConfig; use common_config::InnerConfig; use common_exception::Result; +use common_meta_app::schema::CatalogType; use common_profile::QueryProfileManager; use common_sharing::ShareEndpointManager; use common_storage::DataOperator; use common_storage::ShareTableConfig; +use common_storages_iceberg::IcebergCreator; use common_tracing::GlobalLogger; use common_users::RoleCacheManager; use common_users::UserApiProvider; @@ -30,7 +35,7 @@ use storages_common_cache_manager::CacheManager; use crate::api::DataExchangeManager; use crate::auth::AuthMgr; -use crate::catalogs::CatalogManagerHelper; +use crate::catalogs::DatabaseCatalog; use crate::clusters::ClusterDiscovery; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::SessionManager; @@ -67,7 +72,30 @@ impl GlobalServices { )?; CacheManager::init(&config.cache, &config.query.tenant_id)?; - CatalogManager::init(&config).await?; + + // TODO(xuanwo): + // + // This part is a bit complex because catalog are used widely in different + // crates that we don't have a good place for different kinds of creators. + // + // Maybe we can do some refactor to simplify the logic here. + { + // Init default catalog. + let default_catalog = DatabaseCatalog::try_create_with_config(config.clone()).await?; + + let mut catalog_creator: Vec<(CatalogType, Arc)> = vec![]; + catalog_creator.push((CatalogType::Iceberg, Arc::new(IcebergCreator))); + // Register hive catalog. + #[cfg(feature = "hive")] + { + use common_storages_hive::HiveCreator; + + catalog_creator.push((CatalogType::Hive, Arc::new(HiveCreator))); + } + + CatalogManager::init(&config, Arc::new(default_catalog), catalog_creator).await?; + } + HttpQueryManager::init(&config).await?; DataExchangeManager::init()?; SessionManager::init(&config)?; diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index d0fa8ecb4b23f..6bf64d3f055d4 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -544,7 +544,7 @@ impl AccessChecker for PrivilegeAccess { session .validate_privilege( &GrantObject::Table( - plan.catalog_name.to_string(), + plan.catalog_info.catalog_name().to_string(), plan.database_name.to_string(), plan.table_name.to_string(), ), diff --git a/src/query/service/src/interpreters/common/grant.rs b/src/query/service/src/interpreters/common/grant.rs index fdc062ce77fef..040cd7be6abf8 100644 --- a/src/query/service/src/interpreters/common/grant.rs +++ b/src/query/service/src/interpreters/common/grant.rs @@ -37,7 +37,7 @@ pub async fn validate_grant_object_exists( return Ok(()); } - let catalog = ctx.get_catalog(catalog_name)?; + let catalog = ctx.get_catalog(catalog_name).await?; if !catalog .exists_table(tenant.as_str(), database_name, table_name) .await? @@ -49,7 +49,7 @@ pub async fn validate_grant_object_exists( } } GrantObject::Database(catalog_name, database_name) => { - let catalog = ctx.get_catalog(catalog_name)?; + let catalog = ctx.get_catalog(catalog_name).await?; if !catalog .exists_database(tenant.as_str(), database_name) .await? diff --git a/src/query/service/src/interpreters/interpreter_catalog_create.rs b/src/query/service/src/interpreters/interpreter_catalog_create.rs index 94b1f157b8f35..78ac2c6cf8889 100644 --- a/src/query/service/src/interpreters/interpreter_catalog_create.rs +++ b/src/query/service/src/interpreters/interpreter_catalog_create.rs @@ -24,7 +24,6 @@ use common_storages_fuse::TableContext; use log::debug; use super::Interpreter; -use crate::catalogs::CatalogManagerHelper; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -61,7 +60,7 @@ impl Interpreter for CreateCatalogInterpreter { let catalog_manager = CatalogManager::instance(); catalog_manager - .create_user_defined_catalog(self.plan.clone().into()) + .create_catalog(self.plan.clone().into()) .await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/service/src/interpreters/interpreter_catalog_drop.rs b/src/query/service/src/interpreters/interpreter_catalog_drop.rs index e619cecc2c62c..01835eb177c09 100644 --- a/src/query/service/src/interpreters/interpreter_catalog_drop.rs +++ b/src/query/service/src/interpreters/interpreter_catalog_drop.rs @@ -22,7 +22,6 @@ use common_storages_fuse::TableContext; use log::debug; use super::Interpreter; -use crate::catalogs::CatalogManagerHelper; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -49,7 +48,7 @@ impl Interpreter for DropCatalogInterpreter { debug!("ctx.id" = self.ctx.get_id().as_str(); "drop_catalog_execute"); let mgr = CatalogManager::instance(); - mgr.drop_user_defined_catalog(self.plan.clone().into())?; + mgr.drop_catalog(self.plan.clone().into()).await?; Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs b/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs index b6fb243f344f6..c39e49895ef73 100644 --- a/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs +++ b/src/query/service/src/interpreters/interpreter_cluster_key_alter.rs @@ -43,7 +43,7 @@ impl Interpreter for AlterTableClusterKeyInterpreter { async fn execute2(&self) -> Result { let plan = &self.plan; let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(&plan.catalog)?; + let catalog = self.ctx.get_catalog(&plan.catalog).await?; let table = catalog .get_table(tenant.as_str(), &plan.database, &plan.table) diff --git a/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs b/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs index 064724588e137..b012df07b9438 100644 --- a/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs +++ b/src/query/service/src/interpreters/interpreter_cluster_key_drop.rs @@ -43,7 +43,7 @@ impl Interpreter for DropTableClusterKeyInterpreter { async fn execute2(&self) -> Result { let plan = &self.plan; let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(&plan.catalog)?; + let catalog = self.ctx.get_catalog(&plan.catalog).await?; let table = catalog .get_table(tenant.as_str(), &plan.database, &plan.table) diff --git a/src/query/service/src/interpreters/interpreter_copy.rs b/src/query/service/src/interpreters/interpreter_copy.rs index 84c3ba8b0c0a5..02211a1431ea4 100644 --- a/src/query/service/src/interpreters/interpreter_copy.rs +++ b/src/query/service/src/interpreters/interpreter_copy.rs @@ -153,7 +153,11 @@ impl CopyInterpreter { ) -> Result> { let ctx = self.ctx.clone(); let to_table = ctx - .get_table(&plan.catalog_name, &plan.database_name, &plan.table_name) + .get_table( + plan.catalog_info.catalog_name(), + &plan.database_name, + &plan.table_name, + ) .await?; let table_ctx: Arc = self.ctx.clone(); let files = plan.collect_files(&table_ctx).await?; @@ -168,7 +172,7 @@ impl CopyInterpreter { stage_table .read_plan_with_catalog( self.ctx.clone(), - plan.catalog_name.to_string(), + plan.catalog_info.catalog_name().to_string(), None, None, false, @@ -184,7 +188,7 @@ impl CopyInterpreter { // TODO(leiysky): we reuse the id of exchange here, // which is not correct. We should generate a new id for insert. plan_id: 0, - catalog_name: plan.catalog_name.clone(), + catalog_info: plan.catalog_info.clone(), database_name: plan.database_name.clone(), table_name: plan.table_name.clone(), required_values_schema: plan.required_values_schema.clone(), @@ -212,7 +216,7 @@ impl CopyInterpreter { // which is not correct. We should generate a new id plan_id: 0, ignore_result: select_interpreter.get_ignore_result(), - catalog_name: plan.catalog_name.clone(), + catalog_info: plan.catalog_info.clone(), database_name: plan.database_name.clone(), table_name: plan.table_name.clone(), required_source_schema: plan.required_source_schema.clone(), @@ -251,7 +255,7 @@ impl CopyInterpreter { stage_table .read_plan_with_catalog( ctx.clone(), - plan.catalog_name.to_string(), + plan.catalog_info.catalog_name().to_string(), None, None, false, @@ -271,7 +275,7 @@ impl CopyInterpreter { &self, plan: &CopyIntoTablePlan, ) -> Result { - let catalog = plan.catalog_name.as_str(); + let catalog = plan.catalog_info.catalog_name(); let database = plan.database_name.as_str(); let table = plan.table_name.as_str(); @@ -380,7 +384,7 @@ impl CopyInterpreter { distributed_plan: &CopyPlanType, ) -> Result { let ( - catalog_name, + catalog_info, database_name, table_name, stage_info, @@ -391,7 +395,7 @@ impl CopyInterpreter { ); let mut build_res = match distributed_plan { CopyPlanType::DistributedCopyIntoTableFromStage(plan) => { - catalog_name = plan.catalog_name.clone(); + catalog_info = plan.catalog_info.clone(); database_name = plan.database_name.clone(); table_name = plan.table_name.clone(); stage_info = plan.stage_table_info.stage_info.clone(); @@ -414,7 +418,7 @@ impl CopyInterpreter { build_distributed_pipeline(&self.ctx, &exchange_plan, false).await? } CopyPlanType::CopyIntoTableFromQuery(plan) => { - catalog_name = plan.catalog_name.clone(); + catalog_info = plan.catalog_info.clone(); database_name = plan.database_name.clone(); table_name = plan.table_name.clone(); stage_info = plan.stage_table_info.stage_info.clone(); @@ -437,7 +441,7 @@ impl CopyInterpreter { }; let to_table = self .ctx - .get_table(&catalog_name, &database_name, &table_name) + .get_table(catalog_info.catalog_name(), &database_name, &table_name) .await?; // commit. diff --git a/src/query/service/src/interpreters/interpreter_database_create.rs b/src/query/service/src/interpreters/interpreter_database_create.rs index cf9e1cf365c47..3cdf91d827837 100644 --- a/src/query/service/src/interpreters/interpreter_database_create.rs +++ b/src/query/service/src/interpreters/interpreter_database_create.rs @@ -102,7 +102,7 @@ impl Interpreter for CreateDatabaseInterpreter { let tenant = self.plan.tenant.clone(); let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let databases = catalog.list_databases(&tenant).await?; if quota.max_databases != 0 && databases.len() >= quota.max_databases as usize { return Err(ErrorCode::TenantQuotaExceeded(format!( diff --git a/src/query/service/src/interpreters/interpreter_database_drop.rs b/src/query/service/src/interpreters/interpreter_database_drop.rs index 43a95b74ffad1..8e2fd300aff6c 100644 --- a/src/query/service/src/interpreters/interpreter_database_drop.rs +++ b/src/query/service/src/interpreters/interpreter_database_drop.rs @@ -42,7 +42,7 @@ impl Interpreter for DropDatabaseInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let resp = catalog.drop_database(self.plan.clone().into()).await?; if let Some(spec_vec) = resp.spec_vec { let mut share_table_into = Vec::with_capacity(spec_vec.len()); diff --git a/src/query/service/src/interpreters/interpreter_database_rename.rs b/src/query/service/src/interpreters/interpreter_database_rename.rs index aff9f029d1c35..7ca067c5fad9a 100644 --- a/src/query/service/src/interpreters/interpreter_database_rename.rs +++ b/src/query/service/src/interpreters/interpreter_database_rename.rs @@ -44,7 +44,7 @@ impl Interpreter for RenameDatabaseInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { for entity in &self.plan.entities { - let catalog = self.ctx.get_catalog(&entity.catalog)?; + let catalog = self.ctx.get_catalog(&entity.catalog).await?; let tenant = self.plan.tenant.clone(); catalog .rename_database(RenameDatabaseReq { diff --git a/src/query/service/src/interpreters/interpreter_database_show_create.rs b/src/query/service/src/interpreters/interpreter_database_show_create.rs index c012caf512c47..f58cb41f6f29e 100644 --- a/src/query/service/src/interpreters/interpreter_database_show_create.rs +++ b/src/query/service/src/interpreters/interpreter_database_show_create.rs @@ -53,7 +53,7 @@ impl Interpreter for ShowCreateDatabaseInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let db = catalog .get_database(tenant.as_str(), &self.plan.database) .await?; diff --git a/src/query/service/src/interpreters/interpreter_database_undrop.rs b/src/query/service/src/interpreters/interpreter_database_undrop.rs index b1b61493e96a8..f909ae899d826 100644 --- a/src/query/service/src/interpreters/interpreter_database_undrop.rs +++ b/src/query/service/src/interpreters/interpreter_database_undrop.rs @@ -42,7 +42,7 @@ impl Interpreter for UndropDatabaseInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let catalog_name = self.plan.catalog.as_str(); - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; catalog.undrop_database(self.plan.clone().into()).await?; Ok(PipelineBuildResult::create()) } diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 61bb6bf685557..879384f6874b0 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -27,6 +27,7 @@ use common_expression::DataBlock; use common_expression::RemoteExpr; use common_expression::ROW_ID_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use common_sql::binder::ColumnBindingBuilder; use common_sql::executor::cast_expr_to_non_null_boolean; @@ -109,10 +110,11 @@ impl Interpreter for DeleteInterpreter { .try_lock(self.ctx.clone(), table_info.clone()) .await?; + let catalog = self.ctx.get_catalog(catalog_name).await?; + let catalog_info = catalog.info(); + // refresh table. - let tbl = self - .ctx - .get_catalog(catalog_name)? + let tbl = catalog .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await?; @@ -237,7 +239,7 @@ impl Interpreter for DeleteInterpreter { fuse_table.get_table_info().clone(), col_indices, snapshot, - self.plan.catalog_name.clone(), + catalog_info, is_distributed, query_row_id_col, )?; @@ -272,7 +274,7 @@ impl DeleteInterpreter { table_info: TableInfo, col_indices: Vec, snapshot: TableSnapshot, - catalog_name: String, + catalog_info: CatalogInfo, is_distributed: bool, query_row_id_col: bool, ) -> Result { @@ -280,7 +282,7 @@ impl DeleteInterpreter { parts: partitions, filter, table_info: table_info.clone(), - catalog_name: catalog_name.clone(), + catalog_info: catalog_info.clone(), col_indices, query_row_id_col, })); @@ -298,7 +300,7 @@ impl DeleteInterpreter { input: Box::new(root), snapshot, table_info, - catalog_name, + catalog_info, }))) } } diff --git a/src/query/service/src/interpreters/interpreter_index_create.rs b/src/query/service/src/interpreters/interpreter_index_create.rs index 81b5700e46b87..b3e44a710f80b 100644 --- a/src/query/service/src/interpreters/interpreter_index_create.rs +++ b/src/query/service/src/interpreters/interpreter_index_create.rs @@ -66,7 +66,7 @@ impl Interpreter for CreateIndexInterpreter { )); } - let catalog = self.ctx.get_catalog(&catalog)?; + let catalog = self.ctx.get_catalog(&catalog).await?; let create_index_req = CreateIndexReq { if_not_exists: self.plan.if_not_exists, diff --git a/src/query/service/src/interpreters/interpreter_index_drop.rs b/src/query/service/src/interpreters/interpreter_index_drop.rs index b8c055be3c706..d9f915ac95b1e 100644 --- a/src/query/service/src/interpreters/interpreter_index_drop.rs +++ b/src/query/service/src/interpreters/interpreter_index_drop.rs @@ -55,7 +55,10 @@ impl Interpreter for DropIndexInterpreter { )?; let index_name = self.plan.index.clone(); - let catalog = self.ctx.get_catalog(&self.ctx.get_current_catalog())?; + let catalog = self + .ctx + .get_catalog(&self.ctx.get_current_catalog()) + .await?; let drop_index_req = DropIndexReq { if_exists: self.plan.if_exists, name_ident: IndexNameIdent { tenant, index_name }, diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index 07868252f0b6b..b5ad1302e390e 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -327,7 +327,7 @@ impl Interpreter for RefreshIndexInterpreter { } async fn modify_last_update(ctx: Arc, req: UpdateIndexReq) -> Result<()> { - let catalog = ctx.get_catalog(&ctx.get_current_catalog())?; + let catalog = ctx.get_catalog(&ctx.get_current_catalog()).await?; let handler = get_agg_index_handler(); let _ = handler.do_update_index(catalog, req).await?; Ok(()) diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index 4aecc20120e3e..14cfecc72909b 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -207,7 +207,8 @@ impl Interpreter for InsertInterpreter { _ => unreachable!(), }; - let catalog = self.plan.catalog.clone(); + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; + let catalog_info = catalog.info(); let insert_select_plan = match select_plan { PhysicalPlan::Exchange(ref mut exchange) => { @@ -219,7 +220,7 @@ impl Interpreter for InsertInterpreter { // which is not correct. We should generate a new id for insert. plan_id: exchange.plan_id, input, - catalog, + catalog_info, table_info: table1.get_table_info().clone(), select_schema: plan.schema(), select_column_bindings, @@ -236,7 +237,7 @@ impl Interpreter for InsertInterpreter { // which is not correct. We should generate a new id for insert. plan_id: other_plan.get_id(), input: Box::new(other_plan), - catalog, + catalog_info, table_info: table1.get_table_info().clone(), select_schema: plan.schema(), select_column_bindings, diff --git a/src/query/service/src/interpreters/interpreter_table_add_column.rs b/src/query/service/src/interpreters/interpreter_table_add_column.rs index 6125a3de376f0..70bde53aff748 100644 --- a/src/query/service/src/interpreters/interpreter_table_add_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_add_column.rs @@ -57,7 +57,8 @@ impl Interpreter for AddTableColumnInterpreter { let tbl = self .ctx - .get_catalog(catalog_name)? + .get_catalog(catalog_name) + .await? .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await .ok(); @@ -77,7 +78,7 @@ impl Interpreter for AddTableColumnInterpreter { ))); } - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; let mut new_table_meta = table.get_table_info().meta.clone(); let field = self.plan.field.clone(); if field.computed_expr().is_some() { diff --git a/src/query/service/src/interpreters/interpreter_table_create.rs b/src/query/service/src/interpreters/interpreter_table_create.rs index 54dec6adb8fe4..fea9915653c07 100644 --- a/src/query/service/src/interpreters/interpreter_table_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_create.rs @@ -109,7 +109,7 @@ impl Interpreter for CreateTableInterpreter { let quota_api = UserApiProvider::instance().get_tenant_quota_api_client(&tenant)?; let quota = quota_api.get_quota(MatchSeq::GE(0)).await?.data; let engine = self.plan.engine; - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; if quota.max_tables_per_database > 0 { // Note: // max_tables_per_database is a config quota. Default is 0. @@ -154,7 +154,7 @@ impl CreateTableInterpreter { #[async_backtrace::framed] async fn create_table_as_select(&self, select_plan: Box) -> Result { let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; // TODO: maybe the table creation and insertion should be a transaction, but it may require create_table support 2pc. let reply = catalog.create_table(self.build_request(None)?).await?; @@ -191,7 +191,7 @@ impl CreateTableInterpreter { #[async_backtrace::framed] async fn create_table(&self) -> Result { - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; let mut stat = None; if !GlobalConfig::instance().query.management_mode { if let Some(snapshot_loc) = self.plan.options.get(OPT_KEY_SNAPSHOT_LOCATION) { diff --git a/src/query/service/src/interpreters/interpreter_table_drop.rs b/src/query/service/src/interpreters/interpreter_table_drop.rs index 6c06a1cbd4928..abe100dd5c0a9 100644 --- a/src/query/service/src/interpreters/interpreter_table_drop.rs +++ b/src/query/service/src/interpreters/interpreter_table_drop.rs @@ -68,7 +68,7 @@ impl Interpreter for DropTableInterpreter { &self.plan.database, &self.plan.table, &self.plan.database, &self.plan.table ))); } - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; let resp = catalog .drop_table_by_id(DropTableByIdReq { diff --git a/src/query/service/src/interpreters/interpreter_table_drop_column.rs b/src/query/service/src/interpreters/interpreter_table_drop_column.rs index 433f07ce26fd2..4849b913ea6cd 100644 --- a/src/query/service/src/interpreters/interpreter_table_drop_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_drop_column.rs @@ -56,7 +56,8 @@ impl Interpreter for DropTableColumnInterpreter { let tbl_name = self.plan.table.as_str(); let table = self .ctx - .get_catalog(catalog_name)? + .get_catalog(catalog_name) + .await? .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await?; @@ -86,7 +87,7 @@ impl Interpreter for DropTableColumnInterpreter { )?; } - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; let mut new_table_meta = table.get_table_info().meta.clone(); new_table_meta.drop_column(&self.plan.column)?; diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index 20b9ab06d2ba6..e7b85aafa3e4d 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -153,6 +153,9 @@ impl ModifyTableColumnInterpreter { .try_lock(self.ctx.clone(), table_info.clone()) .await?; + let catalog = self.ctx.get_catalog(table_info.catalog()).await?; + let catalog_info = catalog.info(); + let fuse_table = FuseTable::try_from_table(table.as_ref())?; let prev_snapshot_id = match fuse_table.read_table_snapshot().await { Ok(snapshot) => snapshot.map(|snapshot| snapshot.snapshot_id), @@ -236,7 +239,7 @@ impl ModifyTableColumnInterpreter { PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect { plan_id: select_plan.get_id(), input: Box::new(select_plan), - catalog: self.plan.catalog.clone(), + catalog_info, table_info: new_table.get_table_info().clone(), select_schema: Arc::new(Arc::new(schema).into()), select_column_bindings, @@ -352,7 +355,8 @@ impl Interpreter for ModifyTableColumnInterpreter { let tbl = self .ctx - .get_catalog(catalog_name)? + .get_catalog(catalog_name) + .await? .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await .ok(); @@ -377,7 +381,7 @@ impl Interpreter for ModifyTableColumnInterpreter { ))); } - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; let table_meta = table.get_table_info().meta.clone(); // NOTICE: if we support modify column data type, diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index f161c34d0c4b4..29330016149bc 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -84,7 +84,7 @@ impl OptimizeTableInterpreter { && matches!(target, CompactTarget::Blocks); // check if the table is locked. - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let reply = catalog .list_table_lock_revs(table.get_table_info().ident.table_id) .await?; @@ -179,7 +179,8 @@ async fn purge( // currently, context caches the table, we have to "refresh" // the table by using the catalog API directly let table = ctx - .get_catalog(&plan.catalog)? + .get_catalog(&plan.catalog) + .await? .get_table(ctx.get_tenant().as_str(), &plan.database, &plan.table) .await?; diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 03b73a1f1f806..66a5f72ad6ba1 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -84,12 +84,13 @@ impl Interpreter for ReclusterTableInterpreter { loop { let table = self .ctx - .get_catalog(&plan.catalog)? + .get_catalog(&plan.catalog) + .await? .get_table(tenant.as_str(), &plan.database, &plan.table) .await?; // check if the table is locked. - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let reply = catalog .list_table_lock_revs(table.get_table_info().ident.table_id) .await?; diff --git a/src/query/service/src/interpreters/interpreter_table_rename.rs b/src/query/service/src/interpreters/interpreter_table_rename.rs index c826ac6489def..2d89b8926a9c4 100644 --- a/src/query/service/src/interpreters/interpreter_table_rename.rs +++ b/src/query/service/src/interpreters/interpreter_table_rename.rs @@ -46,7 +46,7 @@ impl Interpreter for RenameTableInterpreter { // TODO check privileges // You must have ALTER and DROP privileges for the original table, // and CREATE and INSERT privileges for the new table. - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; catalog .rename_table(RenameTableReq { if_exists: self.plan.if_exists, diff --git a/src/query/service/src/interpreters/interpreter_table_rename_column.rs b/src/query/service/src/interpreters/interpreter_table_rename_column.rs index 14297337bca5e..875c1bc36407b 100644 --- a/src/query/service/src/interpreters/interpreter_table_rename_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_rename_column.rs @@ -58,7 +58,8 @@ impl Interpreter for RenameTableColumnInterpreter { let tbl = self .ctx - .get_catalog(catalog_name)? + .get_catalog(catalog_name) + .await? .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await .ok(); @@ -78,7 +79,7 @@ impl Interpreter for RenameTableColumnInterpreter { ))); } - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; let mut new_table_meta = table.get_table_info().meta.clone(); is_valid_column(&self.plan.new_column)?; diff --git a/src/query/service/src/interpreters/interpreter_table_revert.rs b/src/query/service/src/interpreters/interpreter_table_revert.rs index 7b5011de8e93e..23b6aab5cee90 100644 --- a/src/query/service/src/interpreters/interpreter_table_revert.rs +++ b/src/query/service/src/interpreters/interpreter_table_revert.rs @@ -43,7 +43,7 @@ impl Interpreter for RevertTableInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; let table = catalog .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) diff --git a/src/query/service/src/interpreters/interpreter_table_set_options.rs b/src/query/service/src/interpreters/interpreter_table_set_options.rs index 9670941a57390..c6d823cfc41fa 100644 --- a/src/query/service/src/interpreters/interpreter_table_set_options.rs +++ b/src/query/service/src/interpreters/interpreter_table_set_options.rs @@ -84,7 +84,7 @@ impl Interpreter for SetOptionsInterpreter { } options_map.insert(key, Some(table_option.1.clone())); } - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; let database = self.plan.database.as_str(); let table = self.plan.table.as_str(); let tbl = catalog diff --git a/src/query/service/src/interpreters/interpreter_table_show_create.rs b/src/query/service/src/interpreters/interpreter_table_show_create.rs index 21574103bf630..b61483dde7d8d 100644 --- a/src/query/service/src/interpreters/interpreter_table_show_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_show_create.rs @@ -58,7 +58,7 @@ impl Interpreter for ShowCreateTableInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; let table = catalog .get_table(tenant.as_str(), &self.plan.database, &self.plan.table) diff --git a/src/query/service/src/interpreters/interpreter_table_undrop.rs b/src/query/service/src/interpreters/interpreter_table_undrop.rs index 53a371f4df3ca..b4cc5dcdad638 100644 --- a/src/query/service/src/interpreters/interpreter_table_undrop.rs +++ b/src/query/service/src/interpreters/interpreter_table_undrop.rs @@ -42,7 +42,7 @@ impl Interpreter for UndropTableInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let catalog_name = self.plan.catalog.as_str(); - let catalog = self.ctx.get_catalog(catalog_name)?; + let catalog = self.ctx.get_catalog(catalog_name).await?; catalog.undrop_table(self.plan.clone().into()).await?; Ok(PipelineBuildResult::create()) diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 4222cbad8c274..1040b105d4171 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -84,7 +84,8 @@ impl Interpreter for UpdateInterpreter { // refresh table. let tbl = self .ctx - .get_catalog(catalog_name)? + .get_catalog(catalog_name) + .await? .get_table(self.ctx.get_tenant().as_str(), db_name, tbl_name) .await?; diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 10bbc82872592..3c38cc87c6ecf 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -75,7 +75,7 @@ impl Interpreter for VacuumDropTablesInterpreter { None => ctx.get_settings().get_retention_period()? as i64, }; let retention_time = chrono::Utc::now() - chrono::Duration::hours(hours); - let catalog = self.ctx.get_catalog(self.plan.catalog.as_str())?; + let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; // if database if empty, vacuum all tables let filter = if self.plan.database.is_empty() { TableInfoFilter::AllDroppedTables(Some(retention_time)) diff --git a/src/query/service/src/interpreters/interpreter_view_alter.rs b/src/query/service/src/interpreters/interpreter_view_alter.rs index e6d98f486d047..d198146b50dd8 100644 --- a/src/query/service/src/interpreters/interpreter_view_alter.rs +++ b/src/query/service/src/interpreters/interpreter_view_alter.rs @@ -50,7 +50,7 @@ impl Interpreter for AlterViewInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { // drop view - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; if let Ok(tbl) = catalog .get_table(&self.plan.tenant, &self.plan.database, &self.plan.view_name) .await diff --git a/src/query/service/src/interpreters/interpreter_view_create.rs b/src/query/service/src/interpreters/interpreter_view_create.rs index 19a84a7c5a76e..03e3c12b68c6c 100644 --- a/src/query/service/src/interpreters/interpreter_view_create.rs +++ b/src/query/service/src/interpreters/interpreter_view_create.rs @@ -50,7 +50,7 @@ impl Interpreter for CreateViewInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let tenant = self.ctx.get_tenant(); let table_function = catalog.list_table_functions(); if catalog diff --git a/src/query/service/src/interpreters/interpreter_view_drop.rs b/src/query/service/src/interpreters/interpreter_view_drop.rs index bda8d69e90266..51dac672b8b1b 100644 --- a/src/query/service/src/interpreters/interpreter_view_drop.rs +++ b/src/query/service/src/interpreters/interpreter_view_drop.rs @@ -71,7 +71,7 @@ impl Interpreter for DropViewInterpreter { ))); } - let catalog = self.ctx.get_catalog(&self.plan.catalog)?; + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; catalog .drop_table_by_id(DropTableByIdReq { if_exists: self.plan.if_exists, diff --git a/src/query/service/src/interpreters/interpreter_virtual_columns_alter.rs b/src/query/service/src/interpreters/interpreter_virtual_columns_alter.rs index 32932722c50c3..ca6a0450c2b51 100644 --- a/src/query/service/src/interpreters/interpreter_virtual_columns_alter.rs +++ b/src/query/service/src/interpreters/interpreter_virtual_columns_alter.rs @@ -68,7 +68,7 @@ impl Interpreter for AlterVirtualColumnsInterpreter { .await?; let table_id = table.get_id(); - let catalog = self.ctx.get_catalog(&catalog_name)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; let update_virtual_column_req = UpdateVirtualColumnReq { name_ident: VirtualColumnNameIdent { tenant, table_id }, diff --git a/src/query/service/src/interpreters/interpreter_virtual_columns_create.rs b/src/query/service/src/interpreters/interpreter_virtual_columns_create.rs index ddc1736415a09..f131fa8d7f05e 100644 --- a/src/query/service/src/interpreters/interpreter_virtual_columns_create.rs +++ b/src/query/service/src/interpreters/interpreter_virtual_columns_create.rs @@ -68,7 +68,7 @@ impl Interpreter for CreateVirtualColumnsInterpreter { .await?; let table_id = table.get_id(); - let catalog = self.ctx.get_catalog(&catalog_name)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; let create_virtual_column_req = CreateVirtualColumnReq { name_ident: VirtualColumnNameIdent { tenant, table_id }, diff --git a/src/query/service/src/interpreters/interpreter_virtual_columns_drop.rs b/src/query/service/src/interpreters/interpreter_virtual_columns_drop.rs index 2a12268231fc4..cbabd2a66569a 100644 --- a/src/query/service/src/interpreters/interpreter_virtual_columns_drop.rs +++ b/src/query/service/src/interpreters/interpreter_virtual_columns_drop.rs @@ -68,7 +68,7 @@ impl Interpreter for DropVirtualColumnsInterpreter { .await?; let table_id = table.get_id(); - let catalog = self.ctx.get_catalog(&catalog_name)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; let drop_virtual_column_req = DropVirtualColumnReq { name_ident: VirtualColumnNameIdent { tenant, table_id }, diff --git a/src/query/service/src/interpreters/interpreter_virtual_columns_generate.rs b/src/query/service/src/interpreters/interpreter_virtual_columns_generate.rs index 89f5e90a411e2..5898d2ebc23d1 100644 --- a/src/query/service/src/interpreters/interpreter_virtual_columns_generate.rs +++ b/src/query/service/src/interpreters/interpreter_virtual_columns_generate.rs @@ -66,7 +66,7 @@ impl Interpreter for GenerateVirtualColumnsInterpreter { .ctx .get_table(&catalog_name, &db_name, &tbl_name) .await?; - let catalog = self.ctx.get_catalog(&catalog_name)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; let list_virtual_columns_req = ListVirtualColumnsReq { tenant, diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 1f487a1b7f9ca..0374cb2ca19ff 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -244,8 +244,12 @@ impl PipelineBuilder { &mut self.main_pipeline, copy_plan.ignore_result, )?; - let catalog = self.ctx.get_catalog(©_plan.catalog_name)?; - let to_table = catalog.get_table_by_info(©_plan.table_info)?; + + let to_table = self.ctx.build_table_by_table_info( + ©_plan.catalog_info, + ©_plan.table_info, + None, + )?; build_append_data_pipeline( self.ctx.clone(), &mut self.main_pipeline, @@ -260,8 +264,11 @@ impl PipelineBuilder { &mut self, distributed_plan: &DistributedCopyIntoTableFromStage, ) -> Result<()> { - let catalog = self.ctx.get_catalog(&distributed_plan.catalog_name)?; - let to_table = catalog.get_table_by_info(&distributed_plan.table_info)?; + let to_table = self.ctx.build_table_by_table_info( + &distributed_plan.catalog_info, + &distributed_plan.table_info, + None, + )?; let stage_table_info = distributed_plan.stage_table_info.clone(); let stage_table = StageTable::try_create(stage_table_info)?; stage_table.set_block_thresholds(distributed_plan.thresholds); @@ -294,7 +301,7 @@ impl PipelineBuilder { fn build_delete_partial(&mut self, delete: &DeletePartial) -> Result<()> { let table = self.ctx - .build_table_by_table_info(&delete.catalog_name, &delete.table_info, None)?; + .build_table_by_table_info(&delete.catalog_info, &delete.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; table.add_deletion_source( self.ctx.clone(), @@ -328,7 +335,7 @@ impl PipelineBuilder { self.build_pipeline(&delete.input)?; let table = self.ctx - .build_table_by_table_info(&delete.catalog_name, &delete.table_info, None)?; + .build_table_by_table_info(&delete.catalog_info, &delete.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; let ctx: Arc = self.ctx.clone(); table.chain_mutation_pipes( @@ -1480,10 +1487,11 @@ impl PipelineBuilder { })?; } - let table = self - .ctx - .get_catalog(&insert_select.catalog)? - .get_table_by_info(&insert_select.table_info)?; + let table = self.ctx.build_table_by_table_info( + &insert_select.catalog_info, + &insert_select.table_info, + None, + )?; let source_schema = insert_schema; build_fill_missing_columns_pipeline( diff --git a/src/query/service/src/procedures/systems/clustering_information.rs b/src/query/service/src/procedures/systems/clustering_information.rs index 62364a0c417b9..444951722dbe0 100644 --- a/src/query/service/src/procedures/systems/clustering_information.rs +++ b/src/query/service/src/procedures/systems/clustering_information.rs @@ -51,7 +51,8 @@ impl OneBlockProcedure for ClusteringInformationProcedure { let table_name = args[1].clone(); let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&ctx.get_current_catalog())? + .get_catalog(&ctx.get_current_catalog()) + .await? .get_table( tenant_id.as_str(), database_name.as_str(), diff --git a/src/query/service/src/procedures/systems/fuse_block.rs b/src/query/service/src/procedures/systems/fuse_block.rs index 49ae68106edc0..189b06deaf9af 100644 --- a/src/query/service/src/procedures/systems/fuse_block.rs +++ b/src/query/service/src/procedures/systems/fuse_block.rs @@ -56,7 +56,8 @@ impl OneBlockProcedure for FuseBlockProcedure { }; let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&ctx.get_current_catalog())? + .get_catalog(&ctx.get_current_catalog()) + .await? .get_table( tenant_id.as_str(), database_name.as_str(), diff --git a/src/query/service/src/procedures/systems/fuse_column.rs b/src/query/service/src/procedures/systems/fuse_column.rs index b6fa87bd982bb..3dd6826259247 100644 --- a/src/query/service/src/procedures/systems/fuse_column.rs +++ b/src/query/service/src/procedures/systems/fuse_column.rs @@ -56,7 +56,8 @@ impl OneBlockProcedure for FuseColumnProcedure { }; let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&ctx.get_current_catalog())? + .get_catalog(&ctx.get_current_catalog()) + .await? .get_table( tenant_id.as_str(), database_name.as_str(), diff --git a/src/query/service/src/procedures/systems/fuse_segment.rs b/src/query/service/src/procedures/systems/fuse_segment.rs index a47318110b4da..2d7a05434a630 100644 --- a/src/query/service/src/procedures/systems/fuse_segment.rs +++ b/src/query/service/src/procedures/systems/fuse_segment.rs @@ -57,7 +57,8 @@ impl OneBlockProcedure for FuseSegmentProcedure { let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&ctx.get_current_catalog())? + .get_catalog(&ctx.get_current_catalog()) + .await? .get_table( tenant_id.as_str(), database_name.as_str(), diff --git a/src/query/service/src/procedures/systems/fuse_snapshot.rs b/src/query/service/src/procedures/systems/fuse_snapshot.rs index 63b3eab0e6d2b..e9456e85aecd6 100644 --- a/src/query/service/src/procedures/systems/fuse_snapshot.rs +++ b/src/query/service/src/procedures/systems/fuse_snapshot.rs @@ -52,7 +52,8 @@ impl OneBlockProcedure for FuseSnapshotProcedure { let table_name = args[1].clone(); let tenant_id = ctx.get_tenant(); let tbl = ctx - .get_catalog(&ctx.get_current_catalog())? + .get_catalog(&ctx.get_current_catalog()) + .await? .get_table( tenant_id.as_str(), database_name.as_str(), diff --git a/src/query/service/src/servers/flight_sql/flight_sql_service/catalog.rs b/src/query/service/src/servers/flight_sql/flight_sql_service/catalog.rs index 9b3946d401962..8686f5a0eee37 100644 --- a/src/query/service/src/servers/flight_sql/flight_sql_service/catalog.rs +++ b/src/query/service/src/servers/flight_sql/flight_sql_service/catalog.rs @@ -55,13 +55,14 @@ impl CatalogInfoProvider { let catalogs: Vec<(String, Arc)> = if let Some(catalog_name) = catalog_name { vec![( catalog_name.clone(), - catalog_mgr.get_catalog(&catalog_name)?, + catalog_mgr.get_catalog(&tenant, &catalog_name).await?, )] } else { catalog_mgr - .catalogs + .list_catalogs(&tenant) + .await? .iter() - .map(|r| (r.key().to_string(), r.value().clone())) + .map(|r| (r.name(), r.clone())) .collect() }; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 167fe5b738526..7fa35ad324a10 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -50,6 +50,7 @@ use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::StageFileFormatType; use common_meta_app::principal::UserInfo; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::GetTableCopiedFileReq; use common_meta_app::schema::TableInfo; use common_pipeline_core::InputError; @@ -128,14 +129,16 @@ impl QueryContext { }) } - // Build fuse/system normal table by table info. + /// Build fuse/system normal table by table info. + /// + /// TODO(xuanwo): we should support build table via table info in the future. pub fn build_table_by_table_info( &self, - catalog_name: &str, + catalog_info: &CatalogInfo, table_info: &TableInfo, table_args: Option, ) -> Result> { - let catalog = self.get_catalog(catalog_name)?; + let catalog = self.shared.catalog_manager.build_catalog(catalog_info)?; match table_args { None => catalog.get_table_by_info(table_info), Some(table_args) => Ok(catalog @@ -149,7 +152,7 @@ impl QueryContext { // 's3://' here is a s3 external stage, and build it to the external table. fn build_external_by_table_info( &self, - _catalog: &str, + _catalog: &CatalogInfo, table_info: &StageTableInfo, _table_args: Option, ) -> Result> { @@ -159,7 +162,9 @@ impl QueryContext { #[async_backtrace::framed] pub async fn set_current_database(&self, new_database_name: String) -> Result<()> { let tenant_id = self.get_tenant(); - let catalog = self.get_catalog(self.get_current_catalog().as_str())?; + let catalog = self + .get_catalog(self.get_current_catalog().as_str()) + .await?; match catalog .get_database(tenant_id.as_str(), &new_database_name) .await @@ -254,12 +259,16 @@ impl TableContext for QueryContext { /// This method builds a `dyn Table`, which provides table specific io methods the plan needs. fn build_table_from_source_plan(&self, plan: &DataSourcePlan) -> Result> { match &plan.source_info { - DataSourceInfo::TableSource(table_info) => { - self.build_table_by_table_info(&plan.catalog, table_info, plan.tbl_args.clone()) - } - DataSourceInfo::StageSource(stage_info) => { - self.build_external_by_table_info(&plan.catalog, stage_info, plan.tbl_args.clone()) - } + DataSourceInfo::TableSource(table_info) => self.build_table_by_table_info( + &plan.catalog_info, + table_info, + plan.tbl_args.clone(), + ), + DataSourceInfo::StageSource(stage_info) => self.build_external_by_table_info( + &plan.catalog_info, + stage_info, + plan.tbl_args.clone(), + ), DataSourceInfo::Parquet2Source(table_info) => Parquet2Table::from_info(table_info), DataSourceInfo::ParquetSource(table_info) => ParquetTable::from_info(table_info), DataSourceInfo::ResultScanSource(table_info) => ResultScan::from_info(table_info), @@ -387,10 +396,16 @@ impl TableContext for QueryContext { self.fragment_id.fetch_add(1, Ordering::Release) } - fn get_catalog(&self, catalog_name: &str) -> Result> { + #[async_backtrace::framed] + async fn get_catalog(&self, catalog_name: &str) -> Result> { self.shared .catalog_manager - .get_catalog(catalog_name.as_ref()) + .get_catalog(&self.get_tenant(), catalog_name.as_ref()) + .await + } + + fn get_default_catalog(&self) -> Result> { + self.shared.catalog_manager.get_default_catalog() } fn get_id(&self) -> String { @@ -613,7 +628,7 @@ impl TableContext for QueryContext { max_files: Option, ) -> Result> { let tenant = self.get_tenant(); - let catalog = self.get_catalog(catalog_name)?; + let catalog = self.get_catalog(catalog_name).await?; let table = catalog .get_table(&tenant, database_name, table_name) .await?; diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 2d9a5b712d47b..c1b65f2beca5b 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -22,6 +22,7 @@ use std::time::SystemTime; use common_base::base::Progress; use common_base::runtime::Runtime; +use common_catalog::catalog::CatalogManager; use common_catalog::table_context::MaterializedCtesBlocks; use common_catalog::table_context::StageAttachment; use common_exception::ErrorCode; @@ -39,7 +40,6 @@ use parking_lot::Mutex; use parking_lot::RwLock; use uuid::Uuid; -use crate::catalogs::CatalogManager; use crate::clusters::Cluster; use crate::pipelines::executor::PipelineExecutor; use crate::sessions::query_affect::QueryAffect; @@ -267,7 +267,7 @@ impl QueryContextShared { ) -> Result> { let tenant = self.get_tenant(); let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string()); - let catalog = self.catalog_manager.get_catalog(catalog)?; + let catalog = self.catalog_manager.get_catalog(&tenant, catalog).await?; let cache_table = catalog.get_table(tenant.as_str(), database, table).await?; let mut tables_refs = self.tables_refs.lock(); diff --git a/src/query/service/src/table_functions/openai/ai_to_sql.rs b/src/query/service/src/table_functions/openai/ai_to_sql.rs index d384398b1afbd..ea8838e0780fe 100644 --- a/src/query/service/src/table_functions/openai/ai_to_sql.rs +++ b/src/query/service/src/table_functions/openai/ai_to_sql.rs @@ -189,7 +189,7 @@ impl AsyncSource for GPT2SQLSource { // SELECT let database = self.ctx.get_current_database(); let tenant = self.ctx.get_tenant(); - let catalog = self.ctx.get_catalog(CATALOG_DEFAULT)?; + let catalog = self.ctx.get_catalog(CATALOG_DEFAULT).await?; let mut template = vec![]; template.push("### Postgres SQL tables, with their properties:".to_string()); diff --git a/src/query/service/src/test_kits/table_test_fixture.rs b/src/query/service/src/test_kits/table_test_fixture.rs index dd378d2f36861..5a541177d1126 100644 --- a/src/query/service/src/test_kits/table_test_fixture.rs +++ b/src/query/service/src/test_kits/table_test_fixture.rs @@ -121,6 +121,7 @@ impl TestFixture { }, }; ctx.get_catalog("default") + .await .unwrap() .create_database(plan.into()) .await @@ -505,7 +506,8 @@ impl TestFixture { pub async fn latest_default_table(&self) -> Result> { self.ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( self.default_tenant().as_str(), self.default_db_name().as_str(), diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index f17f6ba42e001..077865f9bd826 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -51,7 +51,7 @@ async fn check_segment_column_ids( expected_column_ids: Option>, expected_column_min_max: Option>, ) -> Result<()> { - let catalog = fixture.ctx().get_catalog("default")?; + let catalog = fixture.ctx().get_catalog("default").await?; // get the latest tbl let table = catalog .get_table( @@ -193,7 +193,8 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { // get the latest tbl let table = fixture .ctx() - .get_catalog(&catalog_name)? + .get_catalog(&catalog_name) + .await? .get_table( fixture.default_tenant().as_str(), fixture.default_db_name().as_str(), diff --git a/src/query/service/tests/it/storages/fuse/operations/analyze.rs b/src/query/service/tests/it/storages/fuse/operations/analyze.rs index bf7f623217a28..bc9498c5ca5a1 100644 --- a/src/query/service/tests/it/storages/fuse/operations/analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/analyze.rs @@ -70,7 +70,9 @@ async fn test_fuse_snapshot_analyze_and_truncate() -> Result<()> { // truncate table { let ctx = fixture.ctx(); - let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let catalog = ctx + .get_catalog(fixture.default_catalog_name().as_str()) + .await?; let table = catalog .get_table(ctx.get_tenant().as_str(), &db, &tbl) .await?; @@ -81,7 +83,9 @@ async fn test_fuse_snapshot_analyze_and_truncate() -> Result<()> { // optimize after truncate table, ts file location will become None { let ctx = fixture.ctx(); - let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let catalog = ctx + .get_catalog(fixture.default_catalog_name().as_str()) + .await?; let table = catalog .get_table(ctx.get_tenant().as_str(), &db, &tbl) .await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index f7d43bc6ffc9b..d7f4b3bf65f17 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -40,6 +40,7 @@ use common_meta_app::principal::FileFormatParams; use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::RoleInfo; use common_meta_app::principal::UserInfo; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -218,7 +219,7 @@ async fn test_commit_to_meta_server() -> Result<()> { let fixture = TestFixture::new().await; fixture.create_default_table().await?; let ctx = fixture.ctx(); - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let table = fixture.latest_default_table().await?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; @@ -423,7 +424,11 @@ impl TableContext for CtxDelegation { todo!() } - fn get_catalog(&self, _catalog_name: &str) -> Result> { + async fn get_catalog(&self, _catalog_name: &str) -> Result> { + Ok(self.catalog.clone()) + } + + fn get_default_catalog(&self) -> Result> { Ok(self.catalog.clone()) } @@ -584,7 +589,7 @@ impl TableContext for CtxDelegation { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct FakedCatalog { cat: Arc, error_injection: Option, @@ -592,6 +597,14 @@ struct FakedCatalog { #[async_trait::async_trait] impl Catalog for FakedCatalog { + fn name(&self) -> String { + "FakedCatalog".to_string() + } + + fn info(&self) -> CatalogInfo { + self.cat.info() + } + async fn get_database(&self, _tenant: &str, _db_name: &str) -> Result> { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index fc7ad26d2dff2..4de0dc5efcbc4 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -61,7 +61,9 @@ async fn test_compact() -> Result<()> { execute_command(ctx.clone(), qry.as_str()).await?; // compact - let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let catalog = ctx + .get_catalog(fixture.default_catalog_name().as_str()) + .await?; let table = catalog .get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name) .await?; @@ -76,7 +78,9 @@ async fn test_compact() -> Result<()> { } // compact - let catalog = ctx.get_catalog(fixture.default_catalog_name().as_str())?; + let catalog = ctx + .get_catalog(fixture.default_catalog_name().as_str()) + .await?; let table = catalog .get_table(ctx.get_tenant().as_str(), &db_name, &tbl_name) .await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 189d4fc70bcf4..78444d882521b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -67,7 +67,7 @@ async fn test_compact_segment_normal_case() -> Result<()> { let qry = "create table t(c int) block_per_segment=10"; execute_command(ctx.clone(), qry).await?; - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let num_inserts = 9; append_rows(ctx.clone(), num_inserts).await?; @@ -109,7 +109,7 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { let create_tbl_command = "create table t(c int) block_per_segment=10"; execute_command(ctx.clone(), create_tbl_command).await?; - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let num_inserts = 9; append_rows(ctx.clone(), num_inserts).await?; @@ -166,7 +166,7 @@ async fn test_compact_segment_unresolvable_conflict() -> Result<()> { let create_tbl_command = "create table t(c int) block_per_segment=10"; execute_command(ctx.clone(), create_tbl_command).await?; - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let num_inserts = 9; append_rows(ctx.clone(), num_inserts).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 594195b8bbe6b..c0fcc6c948c80 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -47,7 +47,7 @@ async fn test_table_modify_column_ndv_statistics() -> Result<()> { let create_tbl_command = "create table t(c int)"; execute_command(ctx.clone(), create_tbl_command).await?; - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let num_inserts = 3; append_rows(ctx.clone(), num_inserts).await?; diff --git a/src/query/service/tests/it/storages/fuse/pruning.rs b/src/query/service/tests/it/storages/fuse/pruning.rs index 3216c8925ca97..2b27124a8f0ef 100644 --- a/src/query/service/tests/it/storages/fuse/pruning.rs +++ b/src/query/service/tests/it/storages/fuse/pruning.rs @@ -109,7 +109,7 @@ async fn test_block_pruner() -> Result<()> { let _ = interpreter.execute(ctx.clone()).await?; // get table - let catalog = ctx.get_catalog("default")?; + let catalog = ctx.get_catalog("default").await?; let table = catalog .get_table( fixture.default_tenant().as_str(), diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index 0e3133d59d7bd..0fb9db44fbf46 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -231,14 +231,20 @@ fn distributed_copy_into_table_from_stage( ) -> Result> { Ok(FormatTreeNode::new(format!( "copy into table {}.{}.{} from {:?}", - plan.catalog_name, plan.database_name, plan.table_name, plan.source + plan.catalog_info.catalog_name(), + plan.database_name, + plan.table_name, + plan.source ))) } fn copy_into_table_from_query(plan: &CopyIntoTableFromQuery) -> Result> { Ok(FormatTreeNode::new(format!( "copy into table {}.{}.{} from {:?}", - plan.catalog_name, plan.database_name, plan.table_name, plan.input + plan.catalog_info.catalog_name(), + plan.database_name, + plan.table_name, + plan.input ))) } diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 34b6f8d19c775..7084ab2793db8 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -36,6 +36,7 @@ use common_expression::Scalar; use common_expression::TableSchemaRef; use common_functions::aggregates::AggregateFunctionFactory; use common_functions::BUILTIN_FUNCTIONS; +use common_meta_app::schema::CatalogInfo; use common_meta_app::schema::TableInfo; use common_storage::StageFileInfo; use storages_common_table_meta::meta::TableSnapshot; @@ -785,7 +786,7 @@ impl UnionAll { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct DistributedCopyIntoTableFromStage { pub plan_id: u32, - pub catalog_name: String, + pub catalog_info: CatalogInfo, pub database_name: String, pub table_name: String, // ... into table() .. -> @@ -816,7 +817,7 @@ impl DistributedCopyIntoTableFromStage { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct CopyIntoTableFromQuery { pub plan_id: u32, - pub catalog_name: String, + pub catalog_info: CatalogInfo, pub database_name: String, pub table_name: String, @@ -852,7 +853,7 @@ pub struct DistributedInsertSelect { pub plan_id: u32, pub input: Box, - pub catalog: String, + pub catalog_info: CatalogInfo, pub table_info: TableInfo, pub insert_schema: DataSchemaRef, pub select_schema: DataSchemaRef, @@ -892,7 +893,7 @@ pub struct DeletePartial { pub parts: Partitions, pub filter: RemoteExpr, pub table_info: TableInfo, - pub catalog_name: String, + pub catalog_info: CatalogInfo, pub col_indices: Vec, pub query_row_id_col: bool, } @@ -914,7 +915,7 @@ pub struct DeleteFinal { pub input: Box, pub snapshot: TableSnapshot, pub table_info: TableInfo, - pub catalog_name: String, + pub catalog_info: CatalogInfo, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index 411076a1ddff4..3c25e10ac5f0e 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -513,7 +513,7 @@ impl PhysicalPlanBuilder { RelOperator::DummyTableScan(_) => { let catalogs = CatalogManager::instance(); let table = catalogs - .get_catalog(CATALOG_DEFAULT)? + .get_default_catalog()? .get_table(self.ctx.get_tenant().as_str(), "system", "one") .await?; diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index ce4772f34376c..c522d72beb08e 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -337,7 +337,7 @@ pub trait PhysicalPlanReplacer { DistributedInsertSelect { plan_id: plan.plan_id, input: Box::new(input), - catalog: plan.catalog.clone(), + catalog_info: plan.catalog_info.clone(), table_info: plan.table_info.clone(), select_schema: plan.select_schema.clone(), insert_schema: plan.insert_schema.clone(), diff --git a/src/query/sql/src/executor/table_read_plan.rs b/src/query/sql/src/executor/table_read_plan.rs index c6dc1971949c2..5be19e65e6135 100644 --- a/src/query/sql/src/executor/table_read_plan.rs +++ b/src/query/sql/src/executor/table_read_plan.rs @@ -84,6 +84,8 @@ impl ToReadDataSourcePlan for dyn Table { internal_columns: Option>, dry_run: bool, ) -> Result { + let catalog_info = ctx.get_catalog(&catalog).await?.info(); + let (statistics, parts) = if let Some(PushDownInfo { filter: Some(RemoteExpr::Constant { @@ -235,7 +237,7 @@ impl ToReadDataSourcePlan for dyn Table { // TODO pass in catalog name Ok(DataSourcePlan { - catalog, + catalog_info, source_info, output_schema, parts, diff --git a/src/query/sql/src/planner/binder/copy.rs b/src/query/sql/src/planner/binder/copy.rs index 9573e401ff892..310196300929f 100644 --- a/src/query/sql/src/planner/binder/copy.rs +++ b/src/query/sql/src/planner/binder/copy.rs @@ -94,6 +94,9 @@ impl<'a> Binder { pattern: stmt.pattern.clone(), }; + let catalog = self.ctx.get_catalog(&catalog_name).await?; + let catalog_info = catalog.info(); + let table = self .ctx .get_table(&catalog_name, &database_name, &table_name) @@ -113,7 +116,7 @@ impl<'a> Binder { let stage_schema = infer_table_schema(&required_values_schema)?; let plan = CopyIntoTablePlan { - catalog_name, + catalog_info, database_name, table_name, validation_mode, @@ -175,6 +178,9 @@ impl<'a> Binder { let validation_mode = ValidationMode::from_str(stmt.validation_mode.as_str()) .map_err(ErrorCode::SyntaxException)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; + let catalog_info = catalog.info(); + let table = self .ctx .get_table(&catalog_name, &database_name, &table_name) @@ -190,7 +196,7 @@ impl<'a> Binder { let stage_schema = infer_table_schema(&required_values_schema)?; let plan = CopyIntoTablePlan { - catalog_name, + catalog_info, database_name, table_name, validation_mode, @@ -327,6 +333,9 @@ impl<'a> Binder { files: stmt.files.clone(), }; + let catalog = self.ctx.get_catalog(&catalog_name).await?; + let catalog_info = catalog.info(); + let table = self .ctx .get_table(&catalog_name, &database_name, &table_name) @@ -341,7 +350,7 @@ impl<'a> Binder { ); let plan = CopyIntoTablePlan { - catalog_name, + catalog_info, database_name, table_name, required_source_schema: required_values_schema.clone(), @@ -456,13 +465,16 @@ impl<'a> Binder { .await? }; + let catalog = self.ctx.get_catalog(&catalog_name).await?; + let catalog_info = catalog.info(); + let (mut stage_info, files_info) = self.bind_attachment(attachment).await?; stage_info.copy_options.purge = true; let stage_schema = infer_table_schema(&data_schema)?; let plan = CopyIntoTablePlan { - catalog_name, + catalog_info, database_name, table_name, required_source_schema: data_schema.clone(), diff --git a/src/query/sql/src/planner/binder/ddl/column.rs b/src/query/sql/src/planner/binder/ddl/column.rs index 1df152893fdd0..e16a424cee539 100644 --- a/src/query/sql/src/planner/binder/ddl/column.rs +++ b/src/query/sql/src/planner/binder/ddl/column.rs @@ -43,11 +43,11 @@ impl Binder { None => self.ctx.get_current_catalog(), Some(ident) => { let catalog = normalize_identifier(ident, &self.name_resolution_ctx).name; - self.ctx.get_catalog(&catalog)?; + self.ctx.get_catalog(&catalog).await?; catalog } }; - let catalog = self.ctx.get_catalog(&catalog_name)?; + let catalog = self.ctx.get_catalog(&catalog_name).await?; let database = match database { None => self.ctx.get_current_database(), Some(ident) => { diff --git a/src/query/sql/src/planner/binder/ddl/index.rs b/src/query/sql/src/planner/binder/ddl/index.rs index 3386c011ddd7e..f5ed85602fd9a 100644 --- a/src/query/sql/src/planner/binder/ddl/index.rs +++ b/src/query/sql/src/planner/binder/ddl/index.rs @@ -129,7 +129,10 @@ impl Binder { } let index_name = self.normalize_object_identifier(index); - let catalog = self.ctx.get_catalog(&self.ctx.get_current_catalog())?; + let catalog = self + .ctx + .get_catalog(&self.ctx.get_current_catalog()) + .await?; let get_index_req = GetIndexReq { name_ident: IndexNameIdent { tenant: self.ctx.get_tenant(), diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 8e248061b66ae..b708483745f98 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -358,7 +358,8 @@ impl Binder { Some(ident) => { let database = normalize_identifier(ident, &self.name_resolution_ctx).name; self.ctx - .get_catalog(&ctl_name)? + .get_catalog(&ctl_name) + .await? .get_database(&self.ctx.get_tenant(), &database) .await?; Ok(database) @@ -500,7 +501,7 @@ impl Binder { // // Later, when database id is kept, let say in `TableInfo`, we can // safely eliminate this "FUSE" constant and the table meta option entry. - let catalog = self.ctx.get_catalog(&catalog)?; + let catalog = self.ctx.get_catalog(&catalog).await?; let db = catalog .get_database(&self.ctx.get_tenant(), &database) .await?; diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 5a98a0fe38c7d..8ac98606648f2 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -505,10 +505,10 @@ impl Binder { .await } else { // Other table functions always reside is default catalog - let table_meta: Arc = self - .catalogs - .get_catalog(CATALOG_DEFAULT)? - .get_table_function(&func_name.name, table_args)?; + let table_meta: Arc = + self.catalogs + .get_default_catalog()? + .get_table_function(&func_name.name, table_args)?; let table = table_meta.as_table(); let table_alias_name = if let Some(table_alias) = alias { Some( @@ -941,7 +941,7 @@ impl Binder { travel_point: &Option, ) -> Result> { // Resolve table with catalog - let catalog = self.catalogs.get_catalog(catalog_name)?; + let catalog = self.catalogs.get_catalog(tenant, catalog_name).await?; let mut table_meta = catalog.get_table(tenant, database_name, table_name).await?; if let Some(tp) = travel_point { @@ -1002,7 +1002,7 @@ impl Binder { catalog_name: &str, table_id: MetaId, ) -> Result> { - let catalog = self.catalogs.get_catalog(catalog_name)?; + let catalog = self.catalogs.get_catalog(tenant, catalog_name).await?; let index_metas = catalog .list_indexes(ListIndexesReq::new(tenant, Some(table_id))) .await?; diff --git a/src/query/sql/src/planner/plans/copy.rs b/src/query/sql/src/planner/plans/copy.rs index 35165cdbfe706..9dc6125e6fedb 100644 --- a/src/query/sql/src/planner/plans/copy.rs +++ b/src/query/sql/src/planner/plans/copy.rs @@ -24,6 +24,7 @@ use common_exception::Result; use common_expression::DataSchemaRef; use common_expression::Scalar; use common_meta_app::principal::StageInfo; +use common_meta_app::schema::CatalogInfo; use common_storage::init_stage_operator; use common_storage::StageFileInfo; use log::info; @@ -78,7 +79,7 @@ impl CopyIntoTableMode { #[derive(Clone)] pub struct CopyIntoTablePlan { - pub catalog_name: String, + pub catalog_info: CatalogInfo, pub database_name: String, pub table_name: String, @@ -147,7 +148,7 @@ impl CopyIntoTablePlan { ctx.set_status_info("begin filtering out copied files"); let files = ctx .filter_out_copied_files( - &self.catalog_name, + self.catalog_info.catalog_name(), &self.database_name, &self.table_name, &all_source_file_infos, @@ -176,7 +177,7 @@ impl CopyIntoTablePlan { impl Debug for CopyIntoTablePlan { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let CopyIntoTablePlan { - catalog_name, + catalog_info, database_name, table_name, validation_mode, @@ -187,7 +188,8 @@ impl Debug for CopyIntoTablePlan { } = self; write!( f, - "Copy into {catalog_name:}.{database_name:}.{table_name:}" + "Copy into {:}.{database_name:}.{table_name:}", + catalog_info.catalog_name() )?; write!(f, ", validation_mode: {validation_mode:?}")?; write!(f, ", from: {stage_table_info:?}")?; diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 6def2df4ac841..5574ccc17d9af 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -203,7 +203,7 @@ impl FuseTable { new_table_meta.updated_on = Utc::now(); // 2. prepare the request - let catalog = ctx.get_catalog(table_info.catalog())?; + let catalog = ctx.get_catalog(table_info.catalog()).await?; let table_id = table_info.ident.table_id; let table_version = table_info.ident.seq; diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index cbd5e60cce571..7931e30b506e0 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -75,7 +75,7 @@ impl FuseTable { let mut dry_run_purge_files = vec![]; let mut purged_snapshot_count = 0; - let catalog = ctx.get_catalog(&ctx.get_current_catalog())?; + let catalog = ctx.get_catalog(&ctx.get_current_catalog()).await?; let table_agg_index_ids = catalog .list_indexes_by_table_id(ListIndexesByIdReq { tenant: ctx.get_tenant(), diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 3e96abd19d671..9e7c1ab93bb89 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -166,9 +166,10 @@ impl FuseTable { PruningStatistics::default(), )?; let table_info = self.get_table_info(); + let catalog_info = ctx.get_catalog(table_info.catalog()).await?.info(); let description = statistics.get_description(&table_info.desc); let plan = DataSourcePlan { - catalog: table_info.catalog().to_string(), + catalog_info, source_info: DataSourceInfo::TableSource(table_info.clone()), output_schema: table_info.schema(), parts, diff --git a/src/query/storages/fuse/src/operations/revert.rs b/src/query/storages/fuse/src/operations/revert.rs index 29987b882ab07..bb8afcfc4d992 100644 --- a/src/query/storages/fuse/src/operations/revert.rs +++ b/src/query/storages/fuse/src/operations/revert.rs @@ -45,7 +45,7 @@ impl FuseTable { // 3. prepare the request // using the CURRENT version as the base table version let base_version = self.table_info.ident.seq; - let catalog = ctx.get_catalog(&table_info.meta.catalog)?; + let catalog = ctx.get_catalog(&table_info.meta.catalog).await?; let table_id = table_info.ident.table_id; let req = UpdateTableMetaReq { table_id, diff --git a/src/query/storages/fuse/src/operations/truncate.rs b/src/query/storages/fuse/src/operations/truncate.rs index a84dbb813838d..2685d8c6093f9 100644 --- a/src/query/storages/fuse/src/operations/truncate.rs +++ b/src/query/storages/fuse/src/operations/truncate.rs @@ -67,7 +67,7 @@ impl FuseTable { let table_id = self.table_info.ident.table_id; let table_version = self.table_info.ident.seq; - let catalog = ctx.get_catalog(self.table_info.catalog())?; + let catalog = ctx.get_catalog(self.table_info.catalog()).await?; // commit table meta to meta server. // `truncate_table` is not supposed to be retry-able, thus we use diff --git a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs index 7c73b50dd9018..58e280d0e6504 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information/clustering_information_table.rs @@ -167,7 +167,8 @@ impl AsyncSource for ClusteringInformationSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block_table.rs b/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block_table.rs index b95d052afad5b..07ed354fe9ac5 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_blocks/fuse_block_table.rs @@ -180,7 +180,8 @@ impl AsyncSource for FuseBlockSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column_table.rs b/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column_table.rs index 3ef5adefd112c..3e5d73697b4a8 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_columns/fuse_column_table.rs @@ -180,7 +180,8 @@ impl AsyncSource for FuseColumnSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs index 1676246dafddc..bc48faeae228b 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segments/fuse_segment_table.rs @@ -180,7 +180,8 @@ impl AsyncSource for FuseSegmentSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs index fb5b7757ea280..c31ec1c63ea44 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot_table.rs @@ -182,7 +182,8 @@ impl AsyncSource for FuseSnapshotSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic_table.rs b/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic_table.rs index 2eef9ac2919c1..e15124fba8db5 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic_table.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_statistics/fuse_statistic_table.rs @@ -178,7 +178,8 @@ impl AsyncSource for FuseStatisticSource { let tenant_id = self.ctx.get_tenant(); let tbl = self .ctx - .get_catalog(CATALOG_DEFAULT)? + .get_catalog(CATALOG_DEFAULT) + .await? .get_table( tenant_id.as_str(), self.arg_database_name.as_str(), diff --git a/src/query/storages/hive/hive/src/hive_catalog.rs b/src/query/storages/hive/hive/src/hive_catalog.rs index ca86eb9c5d5c6..378e63e970407 100644 --- a/src/query/storages/hive/hive/src/hive_catalog.rs +++ b/src/query/storages/hive/hive/src/hive_catalog.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::base::tokio; use common_catalog::catalog::Catalog; +use common_catalog::catalog::CatalogCreator; use common_catalog::catalog::StorageDescription; use common_catalog::database::Database; use common_catalog::table::Table; @@ -27,6 +28,8 @@ use common_exception::Result; use common_hive_meta_store::Partition; use common_hive_meta_store::TThriftHiveMetastoreSyncClient; use common_hive_meta_store::ThriftHiveMetastoreSyncClient; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::CatalogOption; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -85,15 +88,37 @@ use crate::hive_table::HiveTable; pub const HIVE_CATALOG: &str = "hive"; -#[derive(Clone)] +#[derive(Debug)] +pub struct HiveCreator; + +impl CatalogCreator for HiveCreator { + fn try_create(&self, info: &CatalogInfo) -> Result> { + let opt = match &info.meta.catalog_option { + CatalogOption::Hive(opt) => opt, + _ => unreachable!( + "trying to create hive catalog from other catalog, must be an internal bug" + ), + }; + + let catalog: Arc = + Arc::new(HiveCatalog::try_create(info.clone(), &opt.address)?); + + Ok(catalog) + } +} + +#[derive(Clone, Debug)] pub struct HiveCatalog { + info: CatalogInfo, + /// address of hive meta store service client_address: String, } impl HiveCatalog { - pub fn try_create(hms_address: impl Into) -> Result { + pub fn try_create(info: CatalogInfo, hms_address: impl Into) -> Result { Ok(HiveCatalog { + info, client_address: hms_address.into(), }) } @@ -244,6 +269,14 @@ impl Catalog for HiveCatalog { self } + fn name(&self) -> String { + self.info.name_ident.catalog_name.clone() + } + + fn info(&self) -> CatalogInfo { + self.info.clone() + } + #[minitrace::trace] #[async_backtrace::framed] async fn get_database(&self, _tenant: &str, db_name: &str) -> Result> { diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 5550bdabe5ad1..92ca5fbd257df 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -392,7 +392,7 @@ impl HiveTable { partition_keys: Vec, filter_expression: Option>, ) -> Result)>> { - let hive_catalog = ctx.get_catalog(CATALOG_HIVE)?; + let hive_catalog = ctx.get_catalog(CATALOG_HIVE).await?; let hive_catalog = hive_catalog.as_any().downcast_ref::().unwrap(); // todo may use get_partition_names_ps to filter diff --git a/src/query/storages/hive/hive/src/lib.rs b/src/query/storages/hive/hive/src/lib.rs index bdb5f467d8833..ffe7aacd9d909 100644 --- a/src/query/storages/hive/hive/src/lib.rs +++ b/src/query/storages/hive/hive/src/lib.rs @@ -34,6 +34,7 @@ mod utils; pub use hive_block_filter::HiveBlockFilter; pub use hive_blocks::HiveBlocks; pub use hive_catalog::HiveCatalog; +pub use hive_catalog::HiveCreator; pub use hive_file_splitter::HiveFileSplitter; pub use hive_meta_data_reader::MetaDataReader; pub use hive_parquet_block_reader::filter_hive_partition_from_partition_keys; diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index b176949533f15..3fa7e0de58618 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use common_catalog::catalog::Catalog; +use common_catalog::catalog::CatalogCreator; use common_catalog::catalog::StorageDescription; use common_catalog::database::Database; use common_catalog::table::Table; @@ -24,6 +25,8 @@ use common_catalog::table_args::TableArgs; use common_catalog::table_function::TableFunction; use common_exception::ErrorCode; use common_exception::Result; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::CatalogOption; use common_meta_app::schema::CountTablesReply; use common_meta_app::schema::CountTablesReq; use common_meta_app::schema::CreateDatabaseReply; @@ -83,16 +86,37 @@ use crate::database::IcebergDatabase; pub const ICEBERG_CATALOG: &str = "iceberg"; +#[derive(Debug)] +pub struct IcebergCreator; + +impl CatalogCreator for IcebergCreator { + fn try_create(&self, info: &CatalogInfo) -> Result> { + let opt = match &info.meta.catalog_option { + CatalogOption::Iceberg(opt) => opt, + _ => unreachable!( + "trying to create iceberg catalog from other catalog, must be an internal bug" + ), + }; + + let data_operator = DataOperator::try_new(&opt.storage_params)?; + let catalog: Arc = + Arc::new(IcebergCatalog::try_create(info.clone(), data_operator)?); + + Ok(catalog) + } +} + /// `Catalog` for a external iceberg storage /// /// - Metadata of databases are saved in meta store /// - Instances of `Database` are created from reading subdirectories of /// Iceberg table /// - Table metadata are saved in external Iceberg storage -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct IcebergCatalog { - /// name of this iceberg table - name: String, + /// info of this iceberg table. + info: CatalogInfo, + /// underlying storage access operator operator: DataOperator, } @@ -113,11 +137,8 @@ impl IcebergCatalog { /// Such catalog will be seen as an `flatten` catalogs, /// a `default` database will be generated directly #[minitrace::trace] - pub fn try_create(name: &str, operator: DataOperator) -> Result { - Ok(Self { - name: name.to_string(), - operator, - }) + pub fn try_create(info: CatalogInfo, operator: DataOperator) -> Result { + Ok(Self { info, operator }) } /// list read databases @@ -147,6 +168,13 @@ impl IcebergCatalog { #[async_trait] impl Catalog for IcebergCatalog { + fn name(&self) -> String { + self.info.name_ident.catalog_name.clone() + } + fn info(&self) -> CatalogInfo { + self.info.clone() + } + #[minitrace::trace] #[async_backtrace::framed] async fn get_database(&self, _tenant: &str, db_name: &str) -> Result> { @@ -167,7 +195,9 @@ impl Catalog for IcebergCatalog { let db_root = DataOperator::try_create(&db_sp).await?; Ok(Arc::new(IcebergDatabase::create( - &self.name, db_name, db_root, + &self.name(), + db_name, + db_root, ))) } diff --git a/src/query/storages/iceberg/src/lib.rs b/src/query/storages/iceberg/src/lib.rs index 2d05f3094ccad..0d13293011305 100644 --- a/src/query/storages/iceberg/src/lib.rs +++ b/src/query/storages/iceberg/src/lib.rs @@ -97,4 +97,5 @@ mod table; mod table_source; pub use catalog::IcebergCatalog; +pub use catalog::IcebergCreator; pub use catalog::ICEBERG_CATALOG; diff --git a/src/query/storages/system/src/catalogs_table.rs b/src/query/storages/system/src/catalogs_table.rs index 4d8f9245b962e..56d42e9f9c34a 100644 --- a/src/query/storages/system/src/catalogs_table.rs +++ b/src/query/storages/system/src/catalogs_table.rs @@ -47,16 +47,17 @@ impl AsyncSystemTable for CatalogsTable { #[async_backtrace::framed] async fn get_full_data( &self, - _ctx: Arc, + ctx: Arc, _push_downs: Option, ) -> Result { - let cm = CatalogManager::instance(); + let mgr = CatalogManager::instance(); - let catalog_names: Vec> = cm - .catalogs - .iter() - .map(|x| x.key().as_bytes().to_vec()) - .collect(); + let catalog_names = mgr + .list_catalogs(&ctx.get_tenant()) + .await? + .into_iter() + .map(|v| v.name().into_bytes()) + .collect::>(); Ok(DataBlock::new_from_columns(vec![StringType::from_data( catalog_names, diff --git a/src/query/storages/system/src/columns_table.rs b/src/query/storages/system/src/columns_table.rs index b802d481fba43..d52b7724c3789 100644 --- a/src/query/storages/system/src/columns_table.rs +++ b/src/query/storages/system/src/columns_table.rs @@ -150,7 +150,7 @@ impl ColumnsTable { push_downs: Option, ) -> Result> { let tenant = ctx.get_tenant(); - let catalog = ctx.get_catalog(CATALOG_DEFAULT)?; + let catalog = ctx.get_catalog(CATALOG_DEFAULT).await?; let mut tables = Vec::new(); let mut databases = Vec::new(); diff --git a/src/query/storages/system/src/databases_table.rs b/src/query/storages/system/src/databases_table.rs index d05e01066b637..1260a3e1a5f12 100644 --- a/src/query/storages/system/src/databases_table.rs +++ b/src/query/storages/system/src/databases_table.rs @@ -59,9 +59,10 @@ impl AsyncSystemTable for DatabasesTable { let tenant = ctx.get_tenant(); let catalogs = CatalogManager::instance(); let catalogs: Vec<(String, Arc)> = catalogs - .catalogs + .list_catalogs(&tenant) + .await? .iter() - .map(|e| (e.key().clone(), e.value().clone())) + .map(|e| (e.name(), e.clone())) .collect(); let mut catalog_names = vec![]; diff --git a/src/query/storages/system/src/engines_table.rs b/src/query/storages/system/src/engines_table.rs index 0daddb06cf1b3..31422124eae84 100644 --- a/src/query/storages/system/src/engines_table.rs +++ b/src/query/storages/system/src/engines_table.rs @@ -51,7 +51,7 @@ impl AsyncSystemTable for EnginesTable { _push_downs: Option, ) -> Result { // TODO passing catalog name - let table_engine_descriptors = ctx.get_catalog(CATALOG_DEFAULT)?.get_table_engines(); + let table_engine_descriptors = ctx.get_catalog(CATALOG_DEFAULT).await?.get_table_engines(); let mut engine_name = Vec::with_capacity(table_engine_descriptors.len()); let mut engine_comment = Vec::with_capacity(table_engine_descriptors.len()); for descriptor in &table_engine_descriptors { diff --git a/src/query/storages/system/src/indexes_table.rs b/src/query/storages/system/src/indexes_table.rs index 3bf3597139ddf..3d02f48a13941 100644 --- a/src/query/storages/system/src/indexes_table.rs +++ b/src/query/storages/system/src/indexes_table.rs @@ -53,7 +53,7 @@ impl AsyncSystemTable for IndexesTable { _push_downs: Option, ) -> Result { let tenant = ctx.get_tenant(); - let catalog = ctx.get_catalog(CATALOG_DEFAULT)?; + let catalog = ctx.get_catalog(CATALOG_DEFAULT).await?; let indexes = catalog .list_indexes(ListIndexesReq { tenant, diff --git a/src/query/storages/system/src/table_functions_table.rs b/src/query/storages/system/src/table_functions_table.rs index d0a4850727477..3c3d8b07406bf 100644 --- a/src/query/storages/system/src/table_functions_table.rs +++ b/src/query/storages/system/src/table_functions_table.rs @@ -42,7 +42,7 @@ impl SyncSystemTable for TableFunctionsTable { } fn get_full_data(&self, ctx: Arc) -> Result { - let func_names = ctx.get_catalog("default")?.list_table_functions(); + let func_names = ctx.get_default_catalog()?.list_table_functions(); let names = func_names.iter().map(|s| s.as_str()).collect::>(); Ok(DataBlock::new_from_columns(vec![StringType::from_data( names, diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index 830e56dc91b14..46f30a8bb0c13 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -108,9 +108,10 @@ where TablesTable: HistoryAware let tenant = ctx.get_tenant(); let catalog_mgr = CatalogManager::instance(); let ctls: Vec<(String, Arc)> = catalog_mgr - .catalogs + .list_catalogs(&tenant) + .await? .iter() - .map(|e| (e.key().to_string(), e.value().clone())) + .map(|e| (e.name(), e.clone())) .collect(); let mut catalogs = vec![]; diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0001_ddl_create_database b/tests/sqllogictests/suites/base/05_ddl/05_0001_ddl_create_database index 8c911209f60bc..6096f7e5ddced 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0001_ddl_create_database +++ b/tests/sqllogictests/suites/base/05_ddl/05_0001_ddl_create_database @@ -27,7 +27,7 @@ CREATE DATABASE system statement error 1002 DROP DATABASE system -statement error 1006 +statement error 11119 CREATE DATABASE catalog_not_exist.t statement ok @@ -59,6 +59,6 @@ CREATE SCHEMA system statement error 1002 DROP SCHEMA system -statement error 1006 +statement error 11119 CREATE SCHEMA catalog_not_exist.t