Skip to content

Commit

Permalink
fix: table resolving logic related to pg_catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
J0HN50N133 committed Aug 24, 2024
1 parent 3973d6b commit b1072ad
Show file tree
Hide file tree
Showing 52 changed files with 379 additions and 128 deletions.
40 changes: 40 additions & 0 deletions src/catalog/src/catalog_protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sql::dialect::{Dialect, MySqlDialect, PostgreSqlDialect};

/// [`CatalogProtocol`] will affect the behaviour of CatalogManager
/// Currently, it mainly affects the table names resolving
/// `pg_catalog` will be only visible under PostgreSQL protocol, and user
/// can access any `pg_catalog` tables with bare table name.
///
/// The `information_schema` will be visible for Mysql and any other protocol.
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum CatalogProtocol {
MySQL,
PostgreSQL,
Other,
}

impl CatalogProtocol {
pub fn from_query_dialect(query_dialect: &dyn Dialect) -> Self {
if query_dialect.is::<MySqlDialect>() {
CatalogProtocol::MySQL
} else if query_dialect.is::<PostgreSqlDialect>() {
CatalogProtocol::PostgreSQL
} else {
CatalogProtocol::Other
}
}
}
70 changes: 55 additions & 15 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::table_name::TableName;
use table::TableRef;

use crate::catalog_protocol::CatalogProtocol;
use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
Expand Down Expand Up @@ -152,7 +153,11 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys)
}

async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
async fn schema_names(
&self,
catalog: &str,
catalog_protocol: CatalogProtocol,
) -> Result<Vec<String>> {
let stream = self
.table_metadata_manager
.schema_manager()
Expand All @@ -163,7 +168,7 @@ impl CatalogManager for KvBackendCatalogManager {
.map_err(BoxedError::new)
.context(ListSchemasSnafu { catalog })?;

keys.extend(self.system_catalog.schema_names());
keys.extend(self.system_catalog.schema_names(catalog_protocol));

Ok(keys.into_iter().collect())
}
Expand Down Expand Up @@ -194,8 +199,13 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}

async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
if self.system_catalog.schema_exists(schema) {
async fn schema_exists(
&self,
catalog: &str,
schema: &str,
catalog_protocol: CatalogProtocol,
) -> Result<bool> {
if self.system_catalog.schema_exists(schema, catalog_protocol) {
return Ok(true);
}

Expand Down Expand Up @@ -225,6 +235,7 @@ impl CatalogManager for KvBackendCatalogManager {
catalog_name: &str,
schema_name: &str,
table_name: &str,
catalog_protocol: CatalogProtocol,
) -> Result<Option<TableRef>> {
if let Some(table) = self
.system_catalog
Expand All @@ -236,15 +247,29 @@ impl CatalogManager for KvBackendCatalogManager {
let table_cache: TableCacheRef = self.cache_registry.get().context(CacheNotFoundSnafu {
name: "table_cache",
})?;

table_cache
if let Some(table) = table_cache
.get_by_ref(&TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
})
.await
.context(GetTableCacheSnafu)
.context(GetTableCacheSnafu)?
{
return Ok(Some(table));
}

if catalog_protocol == CatalogProtocol::PostgreSQL {
// falldown to pg_catalog
if let Some(table) =
self.system_catalog
.table(catalog_name, PG_CATALOG_NAME, table_name)
{
return Ok(Some(table));
}
}

return Ok(None);
}

fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result<TableRef>> {
Expand Down Expand Up @@ -320,12 +345,21 @@ struct SystemCatalog {
}

impl SystemCatalog {
// TODO(j0hn50n133): remove the duplicated hard-coded table names logic
fn schema_names(&self) -> Vec<String> {
vec![
INFORMATION_SCHEMA_NAME.to_string(),
PG_CATALOG_NAME.to_string(),
]
fn schema_names(&self, catalog_protocol: CatalogProtocol) -> Vec<String> {
match catalog_protocol {
// pg_catalog only visible under postgres protocol
CatalogProtocol::PostgreSQL => vec![PG_CATALOG_NAME.to_string()],
// information_schema visible for mysql protocol and other
CatalogProtocol::MySQL => {
vec![INFORMATION_SCHEMA_NAME.to_string()]
}
CatalogProtocol::Other => {
vec![
PG_CATALOG_NAME.to_string(),
INFORMATION_SCHEMA_NAME.to_string(),
]
}
}
}

fn table_names(&self, schema: &str) -> Vec<String> {
Expand All @@ -339,8 +373,14 @@ impl SystemCatalog {
}
}

fn schema_exists(&self, schema: &str) -> bool {
schema == INFORMATION_SCHEMA_NAME || schema == PG_CATALOG_NAME
fn schema_exists(&self, schema: &str, catalog_protocol: CatalogProtocol) -> bool {
match catalog_protocol {
CatalogProtocol::PostgreSQL => schema == PG_CATALOG_NAME,
CatalogProtocol::MySQL => schema == INFORMATION_SCHEMA_NAME,
CatalogProtocol::Other => {
schema == PG_CATALOG_NAME || schema == INFORMATION_SCHEMA_NAME
}
}
}

fn table_exists(&self, schema: &str, table: &str) -> bool {
Expand Down
16 changes: 14 additions & 2 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use api::v1::CreateTableExpr;
use catalog_protocol::CatalogProtocol;
use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::TableRef;

use crate::error::Result;

pub mod catalog_protocol;
pub mod error;
pub mod kvbackend;
pub mod memory;
Expand All @@ -44,13 +46,22 @@ pub trait CatalogManager: Send + Sync {

async fn catalog_names(&self) -> Result<Vec<String>>;

async fn schema_names(&self, catalog: &str) -> Result<Vec<String>>;
async fn schema_names(
&self,
catalog: &str,
catalog_protocol: CatalogProtocol,
) -> Result<Vec<String>>;

async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>>;

async fn catalog_exists(&self, catalog: &str) -> Result<bool>;

async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool>;
async fn schema_exists(
&self,
catalog: &str,
schema: &str,
catalog_protocol: CatalogProtocol,
) -> Result<bool>;

async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool>;

Expand All @@ -60,6 +71,7 @@ pub trait CatalogManager: Send + Sync {
catalog: &str,
schema: &str,
table_name: &str,
catalog_protocol: CatalogProtocol,
) -> Result<Option<TableRef>>;

/// Returns all tables with a stream by catalog and schema.
Expand Down
37 changes: 32 additions & 5 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;

use crate::catalog_protocol::CatalogProtocol;
use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::information_schema::InformationSchemaProvider;
use crate::system_schema::SystemSchemaProvider;
Expand All @@ -53,7 +54,11 @@ impl CatalogManager for MemoryCatalogManager {
Ok(self.catalogs.read().unwrap().keys().cloned().collect())
}

async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
async fn schema_names(
&self,
catalog: &str,
_catalog_protocol: CatalogProtocol,
) -> Result<Vec<String>> {
Ok(self
.catalogs
.read()
Expand Down Expand Up @@ -87,7 +92,12 @@ impl CatalogManager for MemoryCatalogManager {
self.catalog_exist_sync(catalog)
}

async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
async fn schema_exists(
&self,
catalog: &str,
schema: &str,
_catalog_protocol: CatalogProtocol,
) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

Expand All @@ -108,6 +118,7 @@ impl CatalogManager for MemoryCatalogManager {
catalog: &str,
schema: &str,
table_name: &str,
_catalog_protocol: CatalogProtocol,
) -> Result<Option<TableRef>> {
let result = try {
self.catalogs
Expand Down Expand Up @@ -371,6 +382,7 @@ mod tests {
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
NUMBERS_TABLE_NAME,
CatalogProtocol::Other,
)
.await
.unwrap()
Expand All @@ -384,7 +396,12 @@ mod tests {
);

assert!(catalog_list
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "not_exists")
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"not_exists",
CatalogProtocol::Other
)
.await
.unwrap()
.is_none());
Expand All @@ -411,7 +428,12 @@ mod tests {
};
catalog.register_table_sync(register_table_req).unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
CatalogProtocol::Other
)
.await
.unwrap()
.is_some());
Expand All @@ -423,7 +445,12 @@ mod tests {
};
catalog.deregister_table_sync(deregister_table_req).unwrap();
assert!(catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
.table(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
CatalogProtocol::Other
)
.await
.unwrap()
.is_none());
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/src/system_schema/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use sql::statements;
use store_api::storage::{ScanRequest, TableId};

use super::{InformationTable, COLUMNS};
use crate::catalog_protocol::CatalogProtocol::MySQL;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -257,7 +258,7 @@ impl InformationSchemaColumnsBuilder {
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
for schema_name in catalog_manager.schema_names(&catalog_name, MySQL).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

use super::KEY_COLUMN_USAGE;
use crate::catalog_protocol::CatalogProtocol::MySQL;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
Expand Down Expand Up @@ -212,7 +213,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
for schema_name in catalog_manager.schema_names(&catalog_name, MySQL).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);

while let Some(table) = stream.try_next().await? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::{TableInfo, TableType};

use super::PARTITIONS;
use crate::catalog_protocol::CatalogProtocol::MySQL;
use crate::error::{
CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
Expand Down Expand Up @@ -240,7 +241,7 @@ impl InformationSchemaPartitionsBuilder {

let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
for schema_name in catalog_manager.schema_names(&catalog_name, MySQL).await? {
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.try_filter_map(|t| async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use store_api::storage::{RegionId, ScanRequest, TableId};
use table::metadata::TableType;

use super::REGION_PEERS;
use crate::catalog_protocol::CatalogProtocol::MySQL;
use crate::error::{
CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result,
UpgradeWeakCatalogManagerRefSnafu,
Expand Down Expand Up @@ -176,7 +177,7 @@ impl InformationSchemaRegionPeersBuilder {

let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
for schema_name in catalog_manager.schema_names(&catalog_name, MySQL).await? {
let table_id_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.try_filter_map(|t| async move {
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/src/system_schema/information_schema/schemata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

use super::SCHEMATA;
use crate::catalog_protocol::CatalogProtocol::MySQL;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, TableMetadataManagerSnafu,
UpgradeWeakCatalogManagerRefSnafu,
Expand Down Expand Up @@ -171,7 +172,7 @@ impl InformationSchemaSchemataBuilder {
let table_metadata_manager = utils::table_meta_manager(&self.catalog_manager)?;
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
for schema_name in catalog_manager.schema_names(&catalog_name, MySQL).await? {
let opts = if let Some(table_metadata_manager) = &table_metadata_manager {
table_metadata_manager
.schema_manager()
Expand Down
Loading

0 comments on commit b1072ad

Please sign in to comment.