Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): show create table #7152

Merged
merged 6 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
statement ok
create table t3 (v1 int, v2 int, v3 int);
create table if not exists t3 (v1 int, v2 int, v3 int) with (appendonly = 'true');

statement ok
create materialized view mv3 as select sum(v1) as sum_v1 from t3;
Expand Down Expand Up @@ -73,6 +73,14 @@ query T
show sources;
----

# Show create objects.

# The `WITH` options should be preserved, and the `IF NOT EXISTS` clause should be omitted.
query TT
show create table t3;
----
public.t3 CREATE TABLE t3 (v1 INT, v2 INT, v3 INT) WITH (appendonly = 'true')

query TT
show create materialized view mv3;
----
Expand Down
13 changes: 4 additions & 9 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use risingwave_frontend::session::SessionImpl;
use risingwave_frontend::test_utils::{create_proto_file, get_explain_output, LocalFrontend};
use risingwave_frontend::{
build_graph, explain_stream_graph, Binder, FrontendOpts, OptimizerContext, OptimizerContextRef,
PlanRef, Planner, WithOptions,
PlanRef, Planner,
};
use risingwave_sqlparser::ast::{ExplainOptions, ObjectName, Statement};
use risingwave_sqlparser::parser::Parser;
Expand Down Expand Up @@ -294,11 +294,8 @@ impl TestCase {
) -> Result<Option<TestCaseResult>> {
let statements = Parser::parse_sql(sql).unwrap();
for stmt in statements {
let handler_args = HandlerArgs {
session: session.clone(),
sql: Arc::from(sql),
with_options: WithOptions::try_from(&stmt)?,
};
// TODO: `sql` may contain multiple statements here.
let handler_args = HandlerArgs::new(session.clone(), &stmt, sql)?;
match stmt.clone() {
Statement::Query(_)
| Statement::Insert { .. }
Expand All @@ -312,9 +309,7 @@ impl TestCase {
..Default::default()
};
let context = OptimizerContext::new(
session.clone(),
Arc::from(sql),
WithOptions::try_from(&stmt)?,
HandlerArgs::new(session.clone(), &stmt, sql)?,
explain_options,
);
let ret = self.apply_query(&stmt, context.into())?;
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct TableCatalog {
/// is not supported yet and expected to be `[0..columns.len()]`
pub value_indices: Vec<usize>,

/// Definition of the materialized view.
/// The full `CREATE TABLE` or `CREATE MATERIALIZED VIEW` definition of the table.
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
pub definition: String,

pub handle_pk_conflict: bool,
Expand Down Expand Up @@ -240,6 +240,11 @@ impl TableCatalog {
)
}

/// Returns the SQL statement that can be used to create this table.
pub fn create_sql(&self) -> String {
self.definition.clone()
}

pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> ProstTable {
ProstTable {
id: self.id.table_id,
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/catalog/view_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ impl ViewCatalog {
pub fn name(&self) -> &str {
&self.name
}

/// Returns the SQL statement that can be used to create this view.
pub fn create_sql(&self) -> String {
format!("CREATE VIEW {} AS {}", self.name, self.sql)
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ pub async fn handle_create_index(
}
}

let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, index_table, index) = gen_create_index_plan(
&session,
context.into(),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub fn gen_create_mv_plan(

let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;

let definition = query.to_string();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this previous method, WITH options cannot be preserved. That'll be problematic for sinks.

let definition = context.normalized_sql().to_owned();

let bound = {
let mut binder = Binder::new(session);
Expand Down Expand Up @@ -123,7 +123,7 @@ pub async fn handle_create_mv(
session.check_relation_name_duplicated(name.clone())?;

let (table, graph) = {
let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns)?;
let graph = build_graph(plan);

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn gen_sink_plan(
let (sink_database_id, sink_schema_id) =
session.get_database_and_schema_id_for_create(sink_schema_name)?;

let definition = query.to_string();
let definition = context.normalized_sql().to_owned();

let bound = {
let mut binder = Binder::new(session);
Expand Down Expand Up @@ -136,7 +136,7 @@ pub async fn handle_create_sink(
session.check_relation_name_duplicated(stmt.sink_name.clone())?;

let (sink, graph) = {
let context = OptimizerContext::new_with_handler_args(handle_args);
let context = OptimizerContext::from_handler_args(handle_args);
let (plan, sink) = gen_sink_plan(&session, context.into(), stmt)?;

(sink, build_graph(plan))
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ pub async fn handle_create_source(
// TODO(Yuanxin): This should be removed after unsupporting `CREATE MATERIALIZED SOURCE`.
if is_materialized {
let (graph, table) = {
let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, table) =
gen_materialize_plan(context.into(), source.clone(), session.user_id())?;
let graph = build_graph(plan);
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub(crate) fn gen_create_table_plan(
columns: Vec<ColumnDef>,
constraints: Vec<TableConstraint>,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let definition = context.normalized_sql().to_owned();
let (column_descs, pk_column_id_from_columns) = bind_sql_columns(columns)?;
gen_create_table_plan_without_bind(
session,
Expand All @@ -229,6 +230,7 @@ pub(crate) fn gen_create_table_plan(
column_descs,
pk_column_id_from_columns,
constraints,
definition,
)
}

Expand All @@ -239,6 +241,7 @@ pub(crate) fn gen_create_table_plan_without_bind(
column_descs: Vec<ColumnDesc>,
pk_column_id_from_columns: Option<ColumnId>,
constraints: Vec<TableConstraint>,
definition: String,
) -> Result<(PlanRef, Option<ProstSource>, ProstTable)> {
let (columns, pk_column_ids, row_id_index) =
bind_sql_table_constraints(column_descs, pk_column_id_from_columns, constraints)?;
Expand All @@ -249,7 +252,6 @@ pub(crate) fn gen_create_table_plan_without_bind(
true => DmlFlag::AppendOnly,
false => DmlFlag::All,
};
let definition = context.sql().to_owned(); // TODO: use formatted SQL

let db_name = session.database();
let (schema_name, name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
Expand Down Expand Up @@ -421,7 +423,7 @@ pub async fn handle_create_table(
}

let (graph, source, table) = {
let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, source, table) = gen_create_table_plan(
&session,
context.into(),
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/handler/create_table_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ pub async fn handle_create_as(
});

let (graph, source, table) = {
let context = OptimizerContext::new_with_handler_args(handler_args.clone());
let context = OptimizerContext::from_handler_args(handler_args.clone());
let (plan, source, table) = gen_create_table_plan_without_bind(
&session,
context.into(),
table_name.clone(),
column_descs,
None,
vec![],
"".to_owned(), // TODO: support `SHOW CREATE TABLE` for `CREATE TABLE AS`
)?;
let graph = build_graph(plan);

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn handle_create_view(

// plan the query to validate it and resolve dependencies
let (dependent_relations, schema) = {
let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, _mode, schema) = super::query::gen_batch_query_plan(
&session,
context.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Res
// TODO: recover the original user statement
Ok(PgResponse::new_for_stream(
StatementType::DESCRIBE_TABLE,
Some(rows.len() as i32),
None,
rows.into(),
vec![
PgFieldDescriptor::new(
Expand Down
9 changes: 2 additions & 7 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ pub fn handle_explain(
options: ExplainOptions,
analyze: bool,
) -> Result<RwPgResponse> {
let context = OptimizerContext::new(
handler_args.session,
handler_args.sql,
handler_args.with_options,
options.clone(),
);
let context = OptimizerContext::new(handler_args, options.clone());

if analyze {
return Err(ErrorCode::NotImplemented("explain analyze".to_string(), 4856.into()).into());
Expand Down Expand Up @@ -170,7 +165,7 @@ pub fn handle_explain(

Ok(PgResponse::new_for_stream(
StatementType::EXPLAIN,
Some(rows.len() as i32),
None,
rows.into(),
vec![PgFieldDescriptor::new(
"QUERY PLAN".to_owned(),
Expand Down
62 changes: 55 additions & 7 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use pgwire::pg_response::{PgResponse, RowSetResult};
use pgwire::pg_server::BoxedError;
use pgwire::types::Row;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_sqlparser::ast::{DropStatement, ObjectType, Statement};
use risingwave_sqlparser::ast::{
CreateSinkStatement, CreateSourceStatement, DropStatement, ObjectType, Statement,
};

use self::util::DataChunkToRowSetAdapter;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
Expand Down Expand Up @@ -90,21 +92,67 @@ impl From<Vec<Row>> for PgResponseStream {
#[derive(Clone)]
pub struct HandlerArgs {
pub session: Arc<SessionImpl>,
pub sql: Arc<str>,
pub sql: String,
pub normalized_sql: String,
pub with_options: WithOptions,
}

impl HandlerArgs {
pub fn new(session: Arc<SessionImpl>, stmt: &Statement, sql: &str) -> Result<Self> {
Ok(Self {
session,
sql: sql.into(),
with_options: WithOptions::try_from(stmt)?,
normalized_sql: Self::normalize_sql(stmt),
})
}

/// Get normalized SQL from the statement.
///
/// - Generally, the normalized SQL is the unparsed (and formatted) result of the statement.
/// - For `CREATE` statements, the clauses like `OR REPLACE` and `IF NOT EXISTS` are removed to
/// make it suitable for the `SHOW CREATE` statements.
fn normalize_sql(stmt: &Statement) -> String {
let mut stmt = stmt.clone();
match &mut stmt {
Statement::CreateView { or_replace, .. } => {
*or_replace = false;
}
Statement::CreateTable {
or_replace,
if_not_exists,
..
} => {
*or_replace = false;
*if_not_exists = false;
}
Statement::CreateIndex { if_not_exists, .. } => {
*if_not_exists = false;
}
Statement::CreateSource {
stmt: CreateSourceStatement { if_not_exists, .. },
..
} => {
*if_not_exists = false;
}
Statement::CreateSink {
stmt: CreateSinkStatement { if_not_exists, .. },
} => {
*if_not_exists = false;
}
_ => {}
}
stmt.to_string()
}
}

pub async fn handle(
session: Arc<SessionImpl>,
stmt: Statement,
sql: &str,
format: bool,
) -> Result<RwPgResponse> {
let handler_args = HandlerArgs {
session,
sql: sql.into(),
with_options: WithOptions::try_from(&stmt)?,
};
let handler_args = HandlerArgs::new(session, &stmt, sql)?;
match stmt {
Statement::Explain {
statement,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn handle_query(

// Subblock to make sure PlanRef (an Rc) is dropped before `await` below.
let (query, query_mode, output_schema) = {
let context = OptimizerContext::new_with_handler_args(handler_args);
let context = OptimizerContext::from_handler_args(handler_args);
let (plan, query_mode, schema) = gen_batch_query_plan(&session, context.into(), stmt)?;

tracing::trace!(
Expand Down
Loading