Skip to content

Commit

Permalink
Add CREATE DATABASE command to SQL (#2094)
Browse files Browse the repository at this point in the history
* Initial commit

* Ballista

* Fix proto

* Ballista fix

* Add test

* Add create schema to test

* Parse catalog in schema ref

* Clippy

* Comment updates
  • Loading branch information
matthewmturner authored Apr 6, 2022
1 parent 38498b7 commit 8b09a5c
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 14 deletions.
7 changes: 7 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message LogicalPlanNode {
LogicalExtensionNode extension = 17;
CreateCatalogSchemaNode create_catalog_schema = 18;
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
}
}

Expand Down Expand Up @@ -156,6 +157,12 @@ message CreateCatalogSchemaNode {
datafusion.DfSchema schema = 3;
}

message CreateCatalogNode {
string catalog_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
}

// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
Expand Down
31 changes: 29 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr, JoinConstraint,
Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
Values,
};
use datafusion::prelude::SessionContext;

Expand Down Expand Up @@ -344,6 +345,19 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: pb_schema.try_into()?,
}))
}
LogicalPlanType::CreateCatalog(create_catalog) => {
let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
"Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
))
})?;

Ok(LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name: create_catalog.catalog_name.clone(),
if_not_exists: create_catalog.if_not_exists,
schema: pb_schema.try_into()?,
}))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, ctx, extension_codec)?;
Expand Down Expand Up @@ -814,6 +828,19 @@ impl AsLogicalPlan for LogicalPlanNode {
},
)),
}),
LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name,
if_not_exists,
schema: df_schema,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalog(
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.into()),
},
)),
}),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
Expand Down
72 changes: 67 additions & 5 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use crate::datasource::listing::ListingTableConfig;
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable,
FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
DropTable, FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
Expand Down Expand Up @@ -330,14 +330,23 @@ impl SessionContext {
}) => {
// sqlparser doesnt accept database / catalog as parameter to CREATE SCHEMA
// so for now, we default to default catalog
let catalog = self.catalog(DEFAULT_CATALOG).ok_or_else(|| {
let tokens: Vec<&str> = schema_name.split('.').collect();
let (catalog, schema_name) = match tokens.len() {
1 => Ok((DEFAULT_CATALOG, schema_name.as_str())),
2 => Ok((tokens[0], tokens[1])),
_ => Err(DataFusionError::Execution(format!(
"Unable to parse catalog from {}",
schema_name
))),
}?;
let catalog = self.catalog(catalog).ok_or_else(|| {
DataFusionError::Execution(format!(
"Missing '{}' catalog",
DEFAULT_CATALOG
))
})?;

let schema = catalog.schema(&schema_name);
let schema = catalog.schema(schema_name);

match (if_not_exists, schema) {
(true, Some(_)) => {
Expand All @@ -346,7 +355,7 @@ impl SessionContext {
}
(true, None) | (false, None) => {
let schema = Arc::new(MemorySchemaProvider::new());
catalog.register_schema(&schema_name, schema)?;
catalog.register_schema(schema_name, schema)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
Expand All @@ -356,6 +365,33 @@ impl SessionContext {
))),
}
}
LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name,
if_not_exists,
..
}) => {
let catalog = self.catalog(catalog_name.as_str());

match (if_not_exists, catalog) {
(true, Some(_)) => {
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(true, None) | (false, None) => {
let new_catalog = Arc::new(MemoryCatalogProvider::new());
self.state
.write()
.catalog_list
.register_catalog(catalog_name, new_catalog);
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
(false, Some(_)) => Err(DataFusionError::Execution(format!(
"Catalog '{:?}' already exists",
catalog_name
))),
}
}

plan => Ok(Arc::new(DataFrame::new(
self.state.clone(),
Expand Down Expand Up @@ -3256,6 +3292,32 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn sql_create_catalog() -> Result<()> {
// the information schema used to introduce cyclic Arcs
let ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

// Create catalog
ctx.sql("CREATE DATABASE test").await?.collect().await?;

// Create schema
ctx.sql("CREATE SCHEMA test.abc").await?.collect().await?;

// Add table to schema
ctx.sql("CREATE TABLE test.abc.y AS VALUES (1,2,3)")
.await?
.collect()
.await?;

// Check table exists in schema
let results = ctx.sql("SELECT * FROM information_schema.tables WHERE table_catalog='test' AND table_schema='abc' AND table_name = 'y'").await.unwrap().collect().await.unwrap();

assert_eq!(results[0].num_rows(), 1);
Ok(())
}

#[tokio::test]
async fn normalized_column_identifiers() {
// create local execution context
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
PlanVisitor, Repartition, TableScan, Union, Values,
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
27 changes: 25 additions & 2 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,18 @@ pub struct CreateExternalTable {
pub struct CreateCatalogSchema {
/// The table schema
pub schema_name: String,
/// The table name
/// Do nothing (except issuing a notice) if a schema with the same name already exists
pub if_not_exists: bool,
/// Empty schema
pub schema: DFSchemaRef,
}

/// Creates a catalog (aka "Database").
#[derive(Clone)]
pub struct CreateCatalog {
/// The catalog name
pub catalog_name: String,
/// Do nothing (except issuing a notice) if a schema with the same name already exists
pub if_not_exists: bool,
/// Empty schema
pub schema: DFSchemaRef,
Expand Down Expand Up @@ -367,6 +378,8 @@ pub enum LogicalPlan {
CreateMemoryTable(CreateMemoryTable),
/// Creates a new catalog schema.
CreateCatalogSchema(CreateCatalogSchema),
/// Creates a new catalog (aka "Database").
CreateCatalog(CreateCatalog),
/// Drops a table.
DropTable(DropTable),
/// Values expression. See
Expand Down Expand Up @@ -414,6 +427,7 @@ impl LogicalPlan {
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
schema
}
LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}
Expand Down Expand Up @@ -456,7 +470,8 @@ impl LogicalPlan {
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
| LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
| LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
| LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit(Limit { input, .. })
Expand Down Expand Up @@ -512,6 +527,7 @@ impl LogicalPlan {
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
Expand Down Expand Up @@ -548,6 +564,7 @@ impl LogicalPlan {
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_) => vec![],
}
}
Expand Down Expand Up @@ -701,6 +718,7 @@ impl LogicalPlan {
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_) => true,
};
if !recurse {
Expand Down Expand Up @@ -1069,6 +1087,11 @@ impl LogicalPlan {
}) => {
write!(f, "CreateCatalogSchema: {:?}", schema_name)
}
LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name, ..
}) => {
write!(f, "CreateCatalog: {:?}", catalog_name)
}
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ fn optimize_plan(
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Extension { .. } => {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ pub fn from_plan(
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CreateCatalogSchema(_) => {
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_) => {
// All of these plan types have no inputs / exprs so should not be called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,15 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateCatalogSchema".to_string(),
))
}
LogicalPlan::CreateCatalog(_) => {
// There is no default plan for "CREATE DATABASE".
// It must be handled at a higher level (so
// that the schema can be registered with
// the context)
Err(DataFusionError::Internal(
"Unsupported logical plan: CreateCatalog".to_string(),
))
}
| LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable (_) => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
Expand Down
11 changes: 10 additions & 1 deletion datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit,
normalize_col, union_with_alias, Column, CreateCatalogSchema,
normalize_col, union_with_alias, Column, CreateCatalog, CreateCatalogSchema,
CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema,
DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType,
ToDFSchema, ToStringifiedPlan,
Expand Down Expand Up @@ -183,6 +183,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
schema: Arc::new(DFSchema::empty()),
})),
Statement::CreateDatabase {
db_name,
if_not_exists,
..
} => Ok(LogicalPlan::CreateCatalog(CreateCatalog {
catalog_name: db_name.to_string(),
if_not_exists,
schema: Arc::new(DFSchema::empty()),
})),
Statement::Drop {
object_type: ObjectType::Table,
if_exists,
Expand Down

0 comments on commit 8b09a5c

Please sign in to comment.