Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show create postgresql foreign table #5143

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/datatypes/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,51 @@ impl ConcreteDataType {
_ => None,
}
}

/// Return the datatype name in postgres type system
pub fn postgres_datatype_name(&self) -> &'static str {
match self {
&ConcreteDataType::Null(_) => "UNKNOWN",
&ConcreteDataType::Boolean(_) => "BOOL",
&ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => "CHAR",
&ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => "INT2",
&ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => "INT4",
&ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => "INT8",
&ConcreteDataType::Float32(_) => "FLOAT4",
&ConcreteDataType::Float64(_) => "FLOAT8",
&ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => "BYTEA",
&ConcreteDataType::String(_) => "VARCHAR",
&ConcreteDataType::Date(_) => "DATE",
&ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => "TIMESTAMP",
&ConcreteDataType::Time(_) => "TIME",
&ConcreteDataType::Interval(_) => "INTERVAL",
&ConcreteDataType::Decimal128(_) => "NUMERIC",
&ConcreteDataType::Json(_) => "JSON",
ConcreteDataType::List(list) => match list.item_type() {
&ConcreteDataType::Null(_) => "UNKNOWN",
&ConcreteDataType::Boolean(_) => "_BOOL",
&ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => "_CHAR",
&ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => "_INT2",
&ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => "_INT4",
&ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => "_INT8",
&ConcreteDataType::Float32(_) => "_FLOAT4",
&ConcreteDataType::Float64(_) => "_FLOAT8",
&ConcreteDataType::Binary(_) => "_BYTEA",
&ConcreteDataType::String(_) => "_VARCHAR",
&ConcreteDataType::Date(_) => "_DATE",
&ConcreteDataType::DateTime(_) | &ConcreteDataType::Timestamp(_) => "_TIMESTAMP",
&ConcreteDataType::Time(_) => "_TIME",
&ConcreteDataType::Interval(_) => "_INTERVAL",
&ConcreteDataType::Decimal128(_) => "_NUMERIC",
&ConcreteDataType::Json(_) => "_JSON",
&ConcreteDataType::Duration(_)
| &ConcreteDataType::Dictionary(_)
| &ConcreteDataType::Vector(_)
| &ConcreteDataType::List(_) => "UNKNOWN",
},
&ConcreteDataType::Duration(_) | &ConcreteDataType::Dictionary(_) => "UNKNOWN",
}
}
}

impl From<&ConcreteDataType> for ConcreteDataType {
Expand Down
13 changes: 11 additions & 2 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use set::set_query_timeout;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
use sql::statements::set_variables::SetVariables;
use sql::statements::show::ShowCreateTableVariant;
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sql::util::format_raw_object_name;
Expand Down Expand Up @@ -317,8 +318,16 @@ impl StatementExecutor {
.context(TableNotFoundSnafu { table_name: &table })?;
let table_name = TableName::new(catalog, schema, table);

self.show_create_table(table_name, table_ref, query_ctx)
.await
match show.variant {
ShowCreateTableVariant::Original => {
self.show_create_table(table_name, table_ref, query_ctx)
.await
}
ShowCreateTableVariant::PostgresForeignTable => {
self.show_create_table_for_pg(table_name, table_ref, query_ctx)
.await
}
}
}
Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
Expand Down
20 changes: 20 additions & 0 deletions src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_table_for_pg(
&self,
table_name: TableName,
table: TableRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let table_info = table.table_info();
if table_info.table_type != TableType::Base {
return error::ShowCreateTableBaseOnlySnafu {
table_name: table_name.to_string(),
table_type: table_info.table_type,
}
.fail();
}

query::sql::show_create_foreign_table_for_pg(table, query_ctx)
.context(ExecuteStatementSnafu)
}

#[tracing::instrument(skip_all)]
pub async fn show_create_view(
&self,
Expand Down
48 changes: 48 additions & 0 deletions src/query/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_expr::{case, col, lit, Expr};
use datatypes::prelude::*;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, RawSchema, Schema};
use datatypes::vectors::StringVector;
use itertools::Itertools;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
use regex::Regex;
Expand All @@ -61,6 +62,7 @@ use sql::statements::show::{
use sql::statements::statement::Statement;
use sql::statements::OptionMap;
use sqlparser::ast::ObjectName;
use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use table::TableRef;

Expand Down Expand Up @@ -763,6 +765,52 @@ pub fn show_create_table(
Ok(Output::new_with_record_batches(records))
}

pub fn show_create_foreign_table_for_pg(
table: TableRef,
_query_ctx: QueryContextRef,
) -> Result<Output> {
let table_info = table.table_info();

let table_meta = &table_info.meta;
let table_name = &table_info.name;
let schema = &table_info.meta.schema;
let is_metric_engine = is_metric_engine(&table_meta.engine);

let columns = schema
.column_schemas()
.iter()
.filter_map(|c| {
if is_metric_engine && is_metric_engine_internal_column(&c.name) {
None
} else {
Some(format!(
"\"{}\" {}",
c.name,
c.data_type.postgres_datatype_name()
))
}
})
.join(",\n ");

let sql = format!(
r#"CREATE FOREIGN TABLE ft_{} (
{}
)
SERVER greptimedb
OPTIONS (table_name '{}')"#,
table_name, columns, table_name
);

let columns = vec![
Arc::new(StringVector::from(vec![table_name.clone()])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)
.context(error::CreateRecordBatchSnafu)?;

Ok(Output::new_with_record_batches(records))
}

pub fn show_create_view(
view_name: ObjectName,
definition: &str,
Expand Down
20 changes: 16 additions & 4 deletions src/sql/src/parsers/show_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use crate::error::{
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateDatabase, ShowCreateFlow, ShowCreateTable, ShowCreateView,
ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables,
ShowVariables, ShowViews,
ShowColumns, ShowCreateDatabase, ShowCreateFlow, ShowCreateTable, ShowCreateTableVariant,
ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowStatus, ShowTableStatus,
ShowTables, ShowVariables, ShowViews,
};
use crate::statements::statement::Statement;

Expand Down Expand Up @@ -146,7 +146,19 @@ impl ParserContext<'_> {
name: table_name.to_string(),
}
);
Ok(Statement::ShowCreateTable(ShowCreateTable { table_name }))
let mut variant = ShowCreateTableVariant::Original;
if self.consume_token("FOR") {
if self.consume_token("POSTGRES_FOREIGN_TABLE") {
variant = ShowCreateTableVariant::PostgresForeignTable;
} else {
self.unsupported(self.peek_token_as_string())?;
}
}

Ok(Statement::ShowCreateTable(ShowCreateTable {
table_name,
variant,
}))
}

fn parse_show_create_flow(&mut self) -> Result<Statement> {
Expand Down
46 changes: 45 additions & 1 deletion src/sql/src/statements/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,26 @@ impl Display for ShowCreateDatabase {
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct ShowCreateTable {
pub table_name: ObjectName,
pub variant: ShowCreateTableVariant,
}

/// Variant of a show create table
#[derive(Default, Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub enum ShowCreateTableVariant {
#[default]
Original,
PostgresForeignTable,
}

impl Display for ShowCreateTable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let table_name = &self.table_name;
write!(f, r#"SHOW CREATE TABLE {table_name}"#)
write!(f, r#"SHOW CREATE TABLE {table_name}"#)?;
if let ShowCreateTableVariant::PostgresForeignTable = self.variant {
write!(f, " FOR POSTGRES_FOREIGN_TABLE")?;
}

Ok(())
}
}

Expand Down Expand Up @@ -344,12 +358,31 @@ mod tests {
Statement::ShowCreateTable(show) => {
let table_name = show.table_name.to_string();
assert_eq!(table_name, "test");
assert_eq!(show.variant, ShowCreateTableVariant::Original);
}
_ => {
unreachable!();
}
}

let sql = "SHOW CREATE TABLE test FOR POSTGRES_FOREIGN_TABLE";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowCreateTable { .. });
match &stmts[0] {
Statement::ShowCreateTable(show) => {
let table_name = show.table_name.to_string();
assert_eq!(table_name, "test");
assert_eq!(show.variant, ShowCreateTableVariant::PostgresForeignTable);
}
_ => {
unreachable!();
}
}
}

#[test]
pub fn test_show_create_missing_table_name() {
let sql = "SHOW CREATE TABLE";
Expand All @@ -361,6 +394,17 @@ mod tests {
.is_err());
}

#[test]
pub fn test_show_create_unknown_for() {
let sql = "SHOW CREATE TABLE t FOR UNKNOWN";
assert!(ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default()
)
.is_err());
}

#[test]
pub fn test_show_create_flow() {
let sql = "SHOW CREATE FLOW test";
Expand Down
30 changes: 30 additions & 0 deletions tests/cases/standalone/common/show/show_create.result
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ SHOW CREATE TABLE system_metrics;
| | ) |
+----------------+-----------------------------------------------------------+

SHOW CREATE TABLE system_metrics FOR POSTGRES_FOREIGN_TABLE;

+----------------+------------------------------------------+
| Table | Create Table |
+----------------+------------------------------------------+
| system_metrics | CREATE FOREIGN TABLE ft_system_metrics ( |
| | "id" INT4, |
| | "host" VARCHAR, |
| | "cpu" FLOAT8, |
| | "disk" FLOAT4, |
| | "ts" TIMESTAMP |
| | ) |
| | SERVER greptimedb |
| | OPTIONS (table_name 'system_metrics') |
+----------------+------------------------------------------+

DROP TABLE system_metrics;

Affected Rows: 0
Expand Down Expand Up @@ -141,6 +157,20 @@ show create table t1;
| | ) |
+-------+-----------------------------------+

SHOW CREATE TABLE t1 FOR POSTGRES_FOREIGN_TABLE;

+-------+------------------------------+
| Table | Create Table |
+-------+------------------------------+
| t1 | CREATE FOREIGN TABLE ft_t1 ( |
| | "host" VARCHAR, |
| | "ts" TIMESTAMP, |
| | "val" FLOAT8 |
| | ) |
| | SERVER greptimedb |
| | OPTIONS (table_name 't1') |
+-------+------------------------------+

drop table t1;

Affected Rows: 0
Expand Down
4 changes: 4 additions & 0 deletions tests/cases/standalone/common/show/show_create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ WITH(

SHOW CREATE TABLE system_metrics;

SHOW CREATE TABLE system_metrics FOR POSTGRES_FOREIGN_TABLE;

DROP TABLE system_metrics;

create table table_without_partition (
Expand Down Expand Up @@ -57,6 +59,8 @@ show create table phy;

show create table t1;

SHOW CREATE TABLE t1 FOR POSTGRES_FOREIGN_TABLE;

drop table t1;

drop table phy;
Expand Down
Loading