Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: List catalogs from metasrv #12219

Merged
merged 36 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
ec9939f
feat: List catalogs from metasrv
Xuanwo Jul 26, 2023
d524d4d
try move
Xuanwo Jul 26, 2023
da0bdd9
Revert "try move"
Xuanwo Jul 26, 2023
3ce232c
Revert "feat: List catalogs from metasrv"
Xuanwo Jul 26, 2023
c0d8cb9
Add meta-store
Xuanwo Jul 27, 2023
aba9db3
Add config
Xuanwo Jul 27, 2023
35e63e4
Save work
Xuanwo Jul 27, 2023
4e69180
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 27, 2023
071241f
Fix build
Xuanwo Jul 27, 2023
09f4597
Fix typos
Xuanwo Jul 27, 2023
9017879
format toml
Xuanwo Jul 27, 2023
261534a
Implement list_catalogs
Xuanwo Jul 27, 2023
9f71ffb
Implement create catalog
Xuanwo Jul 27, 2023
63b0612
Fix build
Xuanwo Jul 27, 2023
aebb520
Fix test
Xuanwo Jul 27, 2023
50f4c61
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 27, 2023
51f1e2d
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 28, 2023
4e6ab8e
Fix test
Xuanwo Jul 28, 2023
3178519
Refactor
Xuanwo Jul 28, 2023
7b11055
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 28, 2023
fcdcc40
fix cargo lock
Xuanwo Jul 28, 2023
6a20a97
Save work
Xuanwo Jul 29, 2023
2aa2228
Add catalog info in plan
Xuanwo Jul 29, 2023
a64513e
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 29, 2023
7ebaafc
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 30, 2023
73fa2c6
Fix build
Xuanwo Jul 30, 2023
145dddd
Fix test
Xuanwo Jul 30, 2023
d6d348e
Fix test
Xuanwo Jul 30, 2023
19f3da3
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 30, 2023
5a47e35
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 31, 2023
15171e2
Merge remote-tracking branch 'origin/main' into catalog-in-metasrv
Xuanwo Jul 31, 2023
9c11187
Use debug for test
Xuanwo Jul 31, 2023
581c669
Fix tenant
Xuanwo Jul 31, 2023
192e513
Merge branch 'main' into catalog-in-metasrv
Xuanwo Jul 31, 2023
5e61d09
fix test
Xuanwo Jul 31, 2023
acdcf9d
Revert "Use debug for test"
Xuanwo Jul 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ build_exceptions! {
// The table is not a clustered table.
UnclusteredTable(1118),
UnknownCatalog(11119),
UnknownCatalogType(11120),

// Data Related Errors

Expand Down
11 changes: 10 additions & 1 deletion src/meta/app/src/schema/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use chrono::Utc;

use crate::storage::StorageParams;

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
pub enum CatalogType {
Default = 1,
Hive = 2,
Expand All @@ -46,6 +46,15 @@ pub enum CatalogOption {
Iceberg(IcebergCatalogOption),
}

impl CatalogOption {
pub fn catalog_type(&self) -> CatalogType {
match self {
CatalogOption::Hive(_) => CatalogType::Hive,
CatalogOption::Iceberg(_) => CatalogType::Iceberg,
}
}
}

/// Option for creating a iceberg catalog
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct HiveCatalogOption {
Expand Down
3 changes: 3 additions & 0 deletions src/query/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ test = false
[dependencies]
common-arrow = { path = "../../common/arrow" }
common-base = { path = "../../common/base" }
common-config = { path = "../config" }
common-exception = { path = "../../common/exception" }
common-expression = { path = "../expression" }
common-io = { path = "../../common/io" }
common-meta-api = { path = "../../meta/api" }
common-meta-app = { path = "../../meta/app" }
common-meta-store = { path = "../../meta/store" }
common-meta-types = { path = "../../meta/types" }
common-pipeline-core = { path = "../pipeline/core" }
common-settings = { path = "../settings" }
Expand Down
12 changes: 11 additions & 1 deletion src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::CountTablesReply;
use common_meta_app::schema::CountTablesReq;
use common_meta_app::schema::CreateDatabaseReply;
Expand Down Expand Up @@ -86,7 +88,15 @@ pub struct StorageDescription {
}

#[async_trait::async_trait]
pub trait Catalog: DynClone + Send + Sync {
pub trait CatalogCreator: Send + Sync + Debug {
async fn try_create(&self, info: Arc<CatalogInfo>) -> Result<Arc<dyn Catalog>>;
}

#[async_trait::async_trait]
pub trait Catalog: DynClone + Send + Sync + Debug {
/// Catalog itself
fn name(&self) -> String;

/// Database.

// Get the database by name.
Expand Down
181 changes: 141 additions & 40 deletions src/query/catalog/src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,166 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::OnceLock;

use common_base::base::GlobalInstance;
use common_config::InnerConfig;
use common_exception::ErrorCode;
use common_exception::Result;
use dashmap::mapref::entry::Entry;
use common_meta_api::SchemaApi;
use common_meta_app::schema::CatalogType;
use common_meta_app::schema::CreateCatalogReq;
use common_meta_app::schema::DropCatalogReq;
use common_meta_app::schema::GetCatalogReq;
use common_meta_app::schema::ListCatalogReq;
use common_meta_store::MetaStore;
use common_meta_store::MetaStoreProvider;
use dashmap::DashMap;

use super::Catalog;
use super::CatalogCreator;

pub const CATALOG_DEFAULT: &str = "default";

pub struct CatalogManager {
pub catalogs: DashMap<String, Arc<dyn Catalog>>,
pub meta: MetaStore,
pub tenant: String,

pub default_catalog: OnceLock<Arc<dyn Catalog>>,
pub catalog_creators: DashMap<CatalogType, Arc<dyn CatalogCreator>>,
}

impl CatalogManager {
pub fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>> {
self.catalogs
.get(catalog_name)
.as_deref()
/// Fetch catalog manager from global instance.
pub fn instance() -> Arc<CatalogManager> {
GlobalInstance::get()
}

/// Init the catalog manager in global instance.
#[async_backtrace::framed]
pub async fn init(conf: &InnerConfig) -> Result<()> {
GlobalInstance::set(Self::try_create(conf).await?);

Ok(())
}

/// Try to create a catalog manager via Config.
#[async_backtrace::framed]
async fn try_create(conf: &InnerConfig) -> Result<Arc<CatalogManager>> {
let meta = {
let provider = Arc::new(MetaStoreProvider::new(conf.meta.to_meta_grpc_client_conf()));

provider.create_meta_store().await?
};

let tenant = conf.query.tenant_id.clone();

let catalog_manager = Self {
meta,
tenant,
default_catalog: OnceLock::new(),
catalog_creators: DashMap::new(),
};

Ok(Arc::new(catalog_manager))
}

pub fn init_default_catalog(&self, default_catalog: Arc<dyn Catalog>) {
self.default_catalog
.set(default_catalog)
.expect("init default catalog must succeed")
}

/// Get default catalog from manager.
///
/// There are some place that we don't have async context, so we provide
/// `get_default_catalog` to allow users fetch default catalog without async.
pub fn get_default_catalog(&self) -> Result<Arc<dyn Catalog>> {
self.default_catalog
.get()
.cloned()
.ok_or_else(|| ErrorCode::BadArguments(format!("no such catalog {}", catalog_name)))
.ok_or_else(|| ErrorCode::BadArguments("default catalog is not initiated".to_string()))
}

pub fn instance() -> Arc<CatalogManager> {
GlobalInstance::get()
/// Register a catalog creator for given name.
pub fn register_catalog(&self, typ: CatalogType, creator: Arc<dyn CatalogCreator>) {
self.catalog_creators.insert(typ, creator);
}

pub fn insert_catalog(
&self,
catalog_name: &str,
catalog: Arc<dyn Catalog>,
if_not_exists: bool,
) -> Result<()> {
// NOTE:
//
// Concurrent write may happen here, should be carefully dealt with.
// The problem occurs when the entry is vacant:
//
// Using `DashMap::entry` can occupy the write lock on the entry,
// ensuring a safe concurrent writing.
//
// If getting with `DashMap::get_mut`, it will unlock the entry and return `None` directly.
// This makes a safe concurrent write hard to implement.
match self.catalogs.entry(catalog_name.to_string()) {
Entry::Occupied(_) => {
if if_not_exists {
Ok(())
} else {
Err(ErrorCode::CatalogAlreadyExists(format!(
"Catalog {} already exists",
catalog_name
)))
}
}
Entry::Vacant(v) => {
v.insert(catalog);
Ok(())
}
/// Get a catalog from manager.
///
/// # NOTES
///
/// DEFAULT catalog is handled specially via `get_default_catalog`. Other catalogs
/// will be fetched from metasrv.
#[async_backtrace::framed]
pub async fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>> {
if catalog_name == CATALOG_DEFAULT {
return self.get_default_catalog();
}

// Get catalog from metasrv.
let info = self
.meta
.get_catalog(GetCatalogReq::new(&self.tenant, catalog_name))
.await?;

let typ = info.meta.catalog_option.catalog_type();
let creator = self
.catalog_creators
.get(&typ)
.ok_or_else(|| ErrorCode::BadArguments(format!("unknown catalog type: {:?}", typ)))?;

creator.try_create(info).await
}

/// Create a new catalog.
///
/// # NOTES
///
/// Trying to create default catalog will return an error.
#[async_backtrace::framed]
pub async fn create_catalog(&self, req: CreateCatalogReq) -> Result<()> {
if req.catalog_name() == CATALOG_DEFAULT {
return Err(ErrorCode::BadArguments(
"default catalog cannot be created".to_string(),
));
}

let _ = self.meta.create_catalog(req).await;

Ok(())
}

/// Drop a catalog.
///
/// # NOTES
///
/// Trying to drop default catalog will return an error.
#[async_backtrace::framed]
pub async fn drop_catalog(&self, _: DropCatalogReq) -> Result<()> {
todo!()
}

#[async_backtrace::framed]
pub async fn list_catalogs(&self) -> Result<Vec<Arc<dyn Catalog>>> {
let mut catalogs = vec![self.get_default_catalog()?];

let infos = self
.meta
.list_catalogs(ListCatalogReq::new(&self.tenant))
.await?;

for info in infos {
let typ = info.meta.catalog_option.catalog_type();
let creator = self.catalog_creators.get(&typ).ok_or_else(|| {
ErrorCode::UnknownCatalogType(format!("unknown catalog type: {:?}", typ))
})?;

let catalog = creator.try_create(info).await?;
catalogs.push(catalog);
}

Ok(catalogs)
}
}
1 change: 1 addition & 0 deletions src/query/catalog/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod interface;
mod manager;

pub use interface::Catalog;
pub use interface::CatalogCreator;
pub use interface::StorageDescription;
pub use manager::CatalogManager;
pub use manager::CATALOG_DEFAULT;
2 changes: 1 addition & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ pub trait TableExt: Table {
let table_info = self.get_table_info();
let name = table_info.name.clone();
let tid = table_info.ident.table_id;
let catalog = ctx.get_catalog(table_info.catalog())?;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let (ident, meta) = catalog.get_table_meta_by_id(tid).await?;
let table_info: TableInfo = TableInfo {
ident,
Expand Down
3 changes: 2 additions & 1 deletion src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ pub trait TableContext: Send + Sync {
fn get_query_str(&self) -> String;

fn get_fragment_id(&self) -> usize;
fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
async fn get_catalog(&self, catalog_name: &str) -> Result<Arc<dyn Catalog>>;
fn get_default_catalog(&self) -> Result<Arc<dyn Catalog>>;
fn get_id(&self) -> String;
fn get_current_catalog(&self) -> String;
fn check_aborting(&self) -> Result<()>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl TableLockHeartbeat {
let shutdown_notify = self.shutdown_notify.clone();

async move {
let catalog = ctx.get_catalog(table_info.catalog())?;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let mut notified = Box::pin(shutdown_notify.notified());
while !shutdown_flag.load(Ordering::Relaxed) {
let mills = {
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/table_lock/table_lock_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl TableLockHandler for RealTableLockHandler {
ctx: Arc<dyn TableContext>,
table_info: TableInfo,
) -> Result<TableLockHeartbeat> {
let catalog = ctx.get_catalog(table_info.catalog())?;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?;
// get a new table lock revision.
let res = catalog
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/tests/it/aggregating_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn create_index(ctx: Arc<QueryContext>, index_name: &str, query: &str) ->
let plan = plan_sql(ctx.clone(), &sql).await?;

if let Plan::CreateIndex(plan) = plan {
let catalog = ctx.get_catalog("default")?;
let catalog = ctx.get_catalog("default").await?;
let create_index_req = CreateIndexReq {
if_not_exists: plan.if_not_exists,
name_ident: IndexNameIdent {
Expand Down
4 changes: 3 additions & 1 deletion src/query/service/src/api/http/v1/tenant_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ pub struct TenantTableInfo {
}

async fn load_tenant_tables(tenant: &str) -> Result<TenantTablesResponse> {
let catalog = CatalogManager::instance().get_catalog(CATALOG_DEFAULT)?;
let catalog = CatalogManager::instance()
.get_catalog(CATALOG_DEFAULT)
.await?;
let databases = catalog.list_databases(tenant).await?;

let mut table_infos: Vec<TenantTableInfo> = vec![];
Expand Down
15 changes: 0 additions & 15 deletions src/query/service/src/catalogs/catalog.rs

This file was deleted.

Loading