diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 3ece0acee7a2..8d488212fc45 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -20,6 +20,7 @@ mod predicate; mod region_peers; mod runtime_metrics; pub mod schemata; +mod table_constraints; mod table_names; pub mod tables; @@ -52,6 +53,7 @@ use crate::information_schema::partitions::InformationSchemaPartitions; use crate::information_schema::region_peers::InformationSchemaRegionPeers; use crate::information_schema::runtime_metrics::InformationSchemaMetrics; use crate::information_schema::schemata::InformationSchemaSchemata; +use crate::information_schema::table_constraints::InformationSchemaTableConstraints; use crate::information_schema::tables::InformationSchemaTables; use crate::CatalogManager; @@ -173,6 +175,10 @@ impl InformationSchemaProvider { KEY_COLUMN_USAGE.to_string(), self.build_table(KEY_COLUMN_USAGE).unwrap(), ); + tables.insert( + TABLE_CONSTRAINTS.to_string(), + self.build_table(TABLE_CONSTRAINTS).unwrap(), + ); // Add memory tables for name in MEMORY_TABLES.iter() { @@ -241,6 +247,10 @@ impl InformationSchemaProvider { self.catalog_name.clone(), self.catalog_manager.clone(), )) as _), + TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _), _ => None, } } diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index ef355585733d..d255b2d1441a 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -274,8 +274,8 @@ impl InformationSchemaColumnsBuilder { }; self.add_column( - idx, &predicates, + idx, &catalog_name, &schema_name, &table.table_info().name, @@ -292,8 +292,8 @@ impl InformationSchemaColumnsBuilder { #[allow(clippy::too_many_arguments)] fn add_column( &mut self, - index: usize, predicates: &Predicates, + index: usize, catalog_name: &str, schema_name: &str, table_name: &str, diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index d35ba8e6ce2b..f01b3cbe441b 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -49,6 +49,11 @@ pub const COLUMN_NAME: &str = "column_name"; pub const ORDINAL_POSITION: &str = "ordinal_position"; const INIT_CAPACITY: usize = 42; +/// Primary key constraint name +pub(crate) const PRI_CONSTRAINT_NAME: &str = "PRIMARY"; +/// Time index constraint name +pub(crate) const TIME_INDEX_CONSTRAINT_NAME: &str = "TIME INDEX"; + /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. pub(super) struct InformationSchemaKeyColumnUsage { schema: SchemaRef, @@ -232,7 +237,7 @@ impl InformationSchemaKeyColumnUsageBuilder { self.add_key_column_usage( &predicates, &schema_name, - "TIME INDEX", + TIME_INDEX_CONSTRAINT_NAME, &catalog_name, &schema_name, &table_name, @@ -262,7 +267,7 @@ impl InformationSchemaKeyColumnUsageBuilder { self.add_key_column_usage( &predicates, &schema_name, - "PRIMARY", + PRI_CONSTRAINT_NAME, &catalog_name, &schema_name, &table_name, diff --git a/src/catalog/src/information_schema/table_constraints.rs b/src/catalog/src/information_schema/table_constraints.rs new file mode 100644 index 000000000000..acac899e49eb --- /dev/null +++ b/src/catalog/src/information_schema/table_constraints.rs @@ -0,0 +1,286 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID; +use common_error::ext::BoxedError; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, MutableVector}; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, VectorRef}; +use futures::TryStreamExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ScanRequest, TableId}; + +use super::{InformationTable, TABLE_CONSTRAINTS}; +use crate::error::{ + CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu, +}; +use crate::information_schema::key_column_usage::{ + PRI_CONSTRAINT_NAME, TIME_INDEX_CONSTRAINT_NAME, +}; +use crate::information_schema::Predicates; +use crate::CatalogManager; + +/// The `TABLE_CONSTRAINTS` table describes which tables have constraints. +pub(super) struct InformationSchemaTableConstraints { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, +} + +const CONSTRAINT_CATALOG: &str = "constraint_catalog"; +const CONSTRAINT_SCHEMA: &str = "constraint_schema"; +const CONSTRAINT_NAME: &str = "constraint_name"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const CONSTRAINT_TYPE: &str = "constraint_type"; +const ENFORCED: &str = "enforced"; + +const INIT_CAPACITY: usize = 42; + +const TIME_INDEX_CONSTRAINT_TYPE: &str = "TIME INDEX"; +const PRI_KEY_CONSTRAINT_TYPE: &str = "PRIMARY KEY"; + +impl InformationSchemaTableConstraints { + pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_name, + catalog_manager, + } + } + + fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new( + CONSTRAINT_CATALOG, + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new( + CONSTRAINT_SCHEMA, + ConcreteDataType::string_datatype(), + false, + ), + ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(CONSTRAINT_TYPE, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(ENFORCED, ConcreteDataType::string_datatype(), false), + ])) + } + + fn builder(&self) -> InformationSchemaTableConstraintsBuilder { + InformationSchemaTableConstraintsBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_manager.clone(), + ) + } +} + +impl InformationTable for InformationSchemaTableConstraints { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + TABLE_CONSTRAINTS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_table_constraints(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaTableConstraintsBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + + constraint_schemas: StringVectorBuilder, + constraint_names: StringVectorBuilder, + table_schemas: StringVectorBuilder, + table_names: StringVectorBuilder, + constraint_types: StringVectorBuilder, +} + +impl InformationSchemaTableConstraintsBuilder { + fn new( + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + ) -> Self { + Self { + schema, + catalog_name, + catalog_manager, + constraint_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY), + constraint_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + constraint_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.table_constraints` virtual table + async fn make_table_constraints( + &mut self, + request: Option, + ) -> Result { + let catalog_name = self.catalog_name.clone(); + let catalog_manager = self + .catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + let predicates = Predicates::from_scan_request(&request); + + for schema_name in catalog_manager.schema_names(&catalog_name).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + + while let Some(table) = stream.try_next().await? { + let keys = &table.table_info().meta.primary_key_indices; + let schema = table.schema(); + + if schema.timestamp_index().is_some() { + self.add_table_constraint( + &predicates, + &schema_name, + TIME_INDEX_CONSTRAINT_NAME, + &schema_name, + &table.table_info().name, + TIME_INDEX_CONSTRAINT_TYPE, + ); + } + + if !keys.is_empty() { + self.add_table_constraint( + &predicates, + &schema_name, + PRI_CONSTRAINT_NAME, + &schema_name, + &table.table_info().name, + PRI_KEY_CONSTRAINT_TYPE, + ); + } + } + } + + self.finish() + } + + fn add_table_constraint( + &mut self, + predicates: &Predicates, + constraint_schema: &str, + constraint_name: &str, + table_schema: &str, + table_name: &str, + constraint_type: &str, + ) { + let row = [ + (CONSTRAINT_SCHEMA, &Value::from(constraint_schema)), + (CONSTRAINT_NAME, &Value::from(constraint_name)), + (TABLE_SCHEMA, &Value::from(table_schema)), + (TABLE_NAME, &Value::from(table_name)), + (CONSTRAINT_TYPE, &Value::from(constraint_type)), + ]; + + if !predicates.eval(&row) { + return; + } + + self.constraint_schemas.push(Some(constraint_schema)); + self.constraint_names.push(Some(constraint_name)); + self.table_schemas.push(Some(table_schema)); + self.table_names.push(Some(table_name)); + self.constraint_types.push(Some(constraint_type)); + } + + fn finish(&mut self) -> Result { + let rows_num = self.constraint_names.len(); + + let constraint_catalogs = Arc::new(ConstantVector::new( + Arc::new(StringVector::from(vec!["def"])), + rows_num, + )); + let enforceds = Arc::new(ConstantVector::new( + Arc::new(StringVector::from(vec!["YES"])), + rows_num, + )); + + let columns: Vec = vec![ + constraint_catalogs, + Arc::new(self.constraint_schemas.finish()), + Arc::new(self.constraint_names.finish()), + Arc::new(self.table_schemas.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.constraint_types.finish()), + enforceds, + ]; + + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaTableConstraints { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_table_constraints(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/information_schema/table_names.rs index 32faa00e0370..bed3aae60088 100644 --- a/src/catalog/src/information_schema/table_names.rs +++ b/src/catalog/src/information_schema/table_names.rs @@ -41,3 +41,4 @@ pub const SESSION_STATUS: &str = "session_status"; pub const RUNTIME_METRICS: &str = "runtime_metrics"; pub const PARTITIONS: &str = "partitions"; pub const REGION_PEERS: &str = "greptime_region_peers"; +pub const TABLE_CONSTRAINTS: &str = "table_constraints"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index ddf834dbc9cd..8834b6239f91 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -86,6 +86,8 @@ pub const INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID: u32 = 27; pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28; /// id for information_schema.REGION_PEERS pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29; +/// id for information_schema.columns +pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30; /// ----- End of information_schema tables ----- pub const MITO_ENGINE: &str = "mito"; diff --git a/tests/cases/standalone/common/information_schema/table_constraints.result b/tests/cases/standalone/common/information_schema/table_constraints.result new file mode 100644 index 000000000000..d86a12b6b038 --- /dev/null +++ b/tests/cases/standalone/common/information_schema/table_constraints.result @@ -0,0 +1,58 @@ +--- test information_schema.table_constraints ---- +USE INFORMATION_SCHEMA; + +Affected Rows: 0 + +DESC TABLE TABLE_CONSTRAINTS; + ++--------------------+--------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------------------+--------+-----+------+---------+---------------+ +| constraint_catalog | String | | NO | | FIELD | +| constraint_schema | String | | NO | | FIELD | +| constraint_name | String | | NO | | FIELD | +| table_schema | String | | NO | | FIELD | +| table_name | String | | NO | | FIELD | +| constraint_type | String | | NO | | FIELD | +| enforced | String | | NO | | FIELD | ++--------------------+--------+-----+------+---------+---------------+ + +SELECT * FROM TABLE_CONSTRAINTS ORDER BY TABLE_NAME, CONSTRAINT_NAME; + ++--------------------+-------------------+-----------------+--------------+------------+-----------------+----------+ +| constraint_catalog | constraint_schema | constraint_name | table_schema | table_name | constraint_type | enforced | ++--------------------+-------------------+-----------------+--------------+------------+-----------------+----------+ +| def | public | PRIMARY | public | numbers | PRIMARY KEY | YES | ++--------------------+-------------------+-----------------+--------------+------------+-----------------+----------+ + +CREATE TABLE test(i double, j string, ts timestamp time index, primary key(j)); + +Affected Rows: 0 + +SELECT * FROM TABLE_CONSTRAINTS ORDER BY TABLE_NAME, CONSTRAINT_NAME; + ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ +| constraint_catalog | constraint_schema | constraint_name | table_schema | table_name | constraint_type | enforced | ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ +| def | public | PRIMARY | public | numbers | PRIMARY KEY | YES | +| def | information_schema | PRIMARY | information_schema | test | PRIMARY KEY | YES | +| def | information_schema | TIME INDEX | information_schema | test | TIME INDEX | YES | ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ + +SELECT * FROM TABLE_CONSTRAINTS WHERE TABLE_NAME = 'test' ORDER BY TABLE_NAME, CONSTRAINT_NAME; + ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ +| constraint_catalog | constraint_schema | constraint_name | table_schema | table_name | constraint_type | enforced | ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ +| def | information_schema | PRIMARY | information_schema | test | PRIMARY KEY | YES | +| def | information_schema | TIME INDEX | information_schema | test | TIME INDEX | YES | ++--------------------+--------------------+-----------------+--------------------+------------+-----------------+----------+ + +DROP TABLE test; + +Affected Rows: 0 + +USE public; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/information_schema/table_constraints.sql b/tests/cases/standalone/common/information_schema/table_constraints.sql new file mode 100644 index 000000000000..f715e8334ce9 --- /dev/null +++ b/tests/cases/standalone/common/information_schema/table_constraints.sql @@ -0,0 +1,17 @@ +--- test information_schema.table_constraints ---- + +USE INFORMATION_SCHEMA; + +DESC TABLE TABLE_CONSTRAINTS; + +SELECT * FROM TABLE_CONSTRAINTS ORDER BY TABLE_NAME, CONSTRAINT_NAME; + +CREATE TABLE test(i double, j string, ts timestamp time index, primary key(j)); + +SELECT * FROM TABLE_CONSTRAINTS ORDER BY TABLE_NAME, CONSTRAINT_NAME; + +SELECT * FROM TABLE_CONSTRAINTS WHERE TABLE_NAME = 'test' ORDER BY TABLE_NAME, CONSTRAINT_NAME; + +DROP TABLE test; + +USE public; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 1f35687d20ed..61cd6c844317 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -41,6 +41,7 @@ show tables; | schema_privileges | | schemata | | session_status | +| table_constraints | | table_privileges | | tables | | triggers | diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index e325f8c938e7..8d0a8ca2d3df 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -36,6 +36,7 @@ order by table_schema, table_name; | greptime | information_schema | schema_privileges | LOCAL TEMPORARY | 22 | | | greptime | information_schema | schemata | LOCAL TEMPORARY | 15 | | | greptime | information_schema | session_status | LOCAL TEMPORARY | 26 | | +| greptime | information_schema | table_constraints | LOCAL TEMPORARY | 30 | | | greptime | information_schema | table_privileges | LOCAL TEMPORARY | 23 | | | greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | | greptime | information_schema | triggers | LOCAL TEMPORARY | 24 | | @@ -316,6 +317,13 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | schemata | sql_path | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | | greptime | information_schema | session_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | session_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | constraint_catalog | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | constraint_name | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | constraint_schema | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | constraint_type | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | enforced | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | table_name | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | table_constraints | table_schema | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | table_privileges | grantee | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | table_privileges | is_grantable | 6 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | table_privileges | privilege_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |