Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for create table as via MemTable #1243

Merged
merged 8 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,9 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
)),
}
}
}
Expand Down
30 changes: 24 additions & 6 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use crate::{
catalog::{CatalogList, MemoryCatalogList},
information_schema::CatalogWithInformationSchema,
},
datasource::file_format::{
avro::AvroFormat,
csv::CsvFormat,
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
datasource::listing::{ListingOptions, ListingTable},
datasource::{
file_format::{
avro::AvroFormat,
csv::CsvFormat,
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
MemTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_limit::EliminateLimit,
physical_optimizer::{
Expand Down Expand Up @@ -237,6 +240,21 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}

LogicalPlan::CreateMemoryTable { input, name } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty cool

let plan = self.optimize(&input)?;
let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(
Arc::new(plan.schema().as_ref().into()),
batches,
)?);
self.register_table(name.as_str(), table)?;

let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}

plan => Ok(Arc::new(DataFrameImpl::new(
self.state.clone(),
&self.optimize(&plan)?,
Expand Down
15 changes: 15 additions & 0 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ pub enum LogicalPlan {
/// Whether the CSV file contains a header
has_header: bool,
},
/// Creates an in memory table.
CreateMemoryTable {
/// The table name
name: String,
/// The logical plan
input: Arc<LogicalPlan>,
},
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down Expand Up @@ -264,6 +271,7 @@ impl LogicalPlan {
LogicalPlan::Analyze { schema, .. } => schema,
LogicalPlan::Extension { node } => node.schema(),
LogicalPlan::Union { schema, .. } => schema,
LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
}
}

Expand Down Expand Up @@ -308,6 +316,7 @@ impl LogicalPlan {
LogicalPlan::Limit { input, .. }
| LogicalPlan::Repartition { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable { input, .. }
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
}
}
Expand Down Expand Up @@ -354,6 +363,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
Expand All @@ -380,6 +390,7 @@ impl LogicalPlan {
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
Expand Down Expand Up @@ -517,6 +528,7 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
LogicalPlan::Extension { node } => {
for input in node.inputs() {
if !input.accept(visitor)? {
Expand Down Expand Up @@ -846,6 +858,9 @@ impl LogicalPlan {
LogicalPlan::CreateExternalTable { ref name, .. } => {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::CreateMemoryTable { ref name, .. } => {
write!(f, "CreateMemoryTable: {:?}", name)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
LogicalPlan::Union { .. } => write!(f, "Union"),
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
let expr = plan.expressions();
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/constant_folding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl OptimizerRule for ConstantFolding {
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::Extension { .. }
| LogicalPlan::Sort { .. }
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ fn optimize_plan(
| LogicalPlan::Values { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Extension { .. } => {
let expr = plan.expressions();
Expand Down
6 changes: 6 additions & 0 deletions datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ pub fn from_plan(
n: *n,
input: Arc::new(inputs[0].clone()),
}),
LogicalPlan::CreateMemoryTable { name, .. } => {
Ok(LogicalPlan::CreateMemoryTable {
input: Arc::new(inputs[0].clone()),
name: name.clone(),
})
}
LogicalPlan::Extension { node } => Ok(LogicalPlan::Extension {
node: node.from_template(expr, inputs),
}),
Expand Down
9 changes: 8 additions & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, limit)))
}
LogicalPlan::CreateExternalTable { .. } => {
LogicalPlan::CreateExternalTable { .. }=> {
// There is no default plan for "CREATE EXTERNAL
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
Expand All @@ -791,6 +791,13 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateExternalTable".to_string(),
))
}
| LogicalPlan::CreateMemoryTable {..} => {
// Create a dummy exec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might make more sense here to throw an error about "unsupported logical plan" -- if we return an EmptyExec that will result in a plan that does nothing (aka doesn't actually create any tables / table providers) but doesn't error which might be surprising

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that before, and it failed I think somehow at the planning stage?

Ok(Arc::new(EmptyExec::new(
false,
SchemaRef::new(Schema::empty()),
)))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
Expand Down
36 changes: 33 additions & 3 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ use arrow::datatypes::*;
use hashbrown::HashMap;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem,
SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
TrimWhereField, UnaryOperator, Value, Values as SQLValues,
HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
Expand Down Expand Up @@ -133,6 +133,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} => self.explain_statement_to_plan(*verbose, *analyze, statement),
Statement::Query(query) => self.query_to_plan(query),
Statement::ShowVariable { variable } => self.show_variable_to_plan(variable),
Statement::CreateTable {
query: Some(query),
name,
or_replace: false,
columns,
constraints,
hive_distribution: HiveDistributionStyle::NONE,
hive_formats: _hive_formats,
table_properties,
with_options,
file_format: None,
location: None,
like: None,
temporary: _temporary,
external: false,
if_not_exists: false,
without_rowid: _without_row_id,
} if columns.is_empty()
&& constraints.is_empty()
&& table_properties.is_empty()
&& with_options.is_empty() =>
{
let plan = self.query_to_plan(query)?;

Ok(LogicalPlan::CreateMemoryTable {
name: name.to_string(),
input: Arc::new(plan),
})
}

Statement::ShowColumns {
extended,
full,
Expand Down
24 changes: 24 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ async fn select_all() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn create_table_as() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;

let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So cool ❤️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing you can try is to create table as select from values lists

ctx.sql(sql).await.unwrap();

let sql_all = "SELECT * FROM my_table order by c1 LIMIT 1";
let results_all = execute_to_batches(&mut ctx, sql_all).await;

let expected = vec![
"+---------+----------------+------+",
"| c1 | c2 | c3 |",
"+---------+----------------+------+",
"| 0.00001 | 0.000000000001 | true |",
"+---------+----------------+------+",
];

assert_batches_eq!(expected, &results_all);

Ok(())
}

#[tokio::test]
async fn select_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down