diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 8b107680dbd7..21fe7bd33cca 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1006,7 +1006,12 @@ impl TryInto for &LogicalPlan { ))), }) } - LogicalPlan::CreateMemoryTable { name, input } => todo!(), + LogicalPlan::CreateMemoryTable { .. } => { + return Err(proto_error(format!( + "Error converting CreateMemoryTable. Not yet supported in Ballista", + listing_table.options().format + ))) + } } } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index b596163d1fee..6295184208ff 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -241,12 +241,13 @@ impl ExecutionContext { } LogicalPlan::CreateMemoryTable { input, name } => { - let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &input)); + let plan = self.optimize(&input)?; + let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan)); - let batches: Vec<_> = physical.collect().await?; + let batches: Vec<_> = physical.collect_partitioned().await?; let table = Arc::new(MemTable::try_new( - Arc::new(input.schema().as_ref().into()), - vec![batches], + Arc::new(plan.schema().as_ref().into()), + batches, )?); self.register_table(name.as_str(), table)?; diff --git a/datafusion/src/logical_plan/plan.rs b/datafusion/src/logical_plan/plan.rs index 47cb385906e4..20b62f528c0f 100644 --- a/datafusion/src/logical_plan/plan.rs +++ b/datafusion/src/logical_plan/plan.rs @@ -390,12 +390,12 @@ impl LogicalPlan { LogicalPlan::Union { inputs, .. } => inputs.iter().collect(), LogicalPlan::Explain { plan, .. } => vec![plan], LogicalPlan::Analyze { input: plan, .. } => vec![plan], + LogicalPlan::CreateMemoryTable { input, .. } => vec![input], // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } - | LogicalPlan::CreateExternalTable { .. } - | LogicalPlan::CreateMemoryTable { .. } => vec![], + | LogicalPlan::CreateExternalTable { .. } => vec![], } } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d676615cd3f0..8a7fb47bf8f6 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -792,14 +792,11 @@ impl DefaultPhysicalPlanner { )) } | LogicalPlan::CreateMemoryTable {..} => { - // There is no default plan for "CREATE - // TABLE" -- it must be handled at a higher level (so - // that the appropriate table can be registered with - // the context) - Err(DataFusionError::Internal( - "Unsupported logical plan: CreateMemoryTable".to_string(), - )) - + // Create a dummy exec. + Ok(Arc::new(EmptyExec::new( + false, + SchemaRef::new(Schema::empty()), + ))) } LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( "Unsupported logical plan: Explain must be root of the plan".to_string(), diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index d89b51ed9369..cabf666e9d57 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -48,9 +48,9 @@ use arrow::datatypes::*; use hashbrown::HashMap; use sqlparser::ast::{ BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg, - Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, - SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins, - TrimWhereField, UnaryOperator, Value, Values as SQLValues, + HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, + Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor, + TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues, }; use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption}; use sqlparser::ast::{OrderByExpr, Statement}; @@ -136,8 +136,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Statement::CreateTable { query: Some(query), name, + //or_replace: false, + columns, + constraints, + hive_distribution: HiveDistributionStyle::NONE, + //hive_formats: None, + table_properties, + with_options, + file_format: None, + location: None, + like: None, .. - } => { + } if columns.is_empty() + && constraints.is_empty() + && table_properties.is_empty() + && with_options.is_empty() => + { let plan = self.query_to_plan(query)?; Ok(LogicalPlan::CreateMemoryTable { diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 6cd1d3822bce..f27a550b4e62 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -687,6 +687,30 @@ async fn select_all() -> Result<()> { Ok(()) } +#[tokio::test] +async fn create_table_as() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple"; + ctx.sql(sql).await.unwrap(); + + let sql_all = "SELECT * FROM my_table order by c1 LIMIT 1"; + let results_all = execute_to_batches(&mut ctx, sql_all).await; + + let expected = vec![ + "+---------+----------------+------+", + "| c1 | c2 | c3 |", + "+---------+----------------+------+", + "| 0.00001 | 0.000000000001 | true |", + "+---------+----------------+------+", + ]; + + assert_batches_eq!(expected, &results_all); + + Ok(()) +} + #[tokio::test] async fn select_distinct() -> Result<()> { let mut ctx = ExecutionContext::new(); diff --git a/testing b/testing index a8f7be380531..b658b087767b 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit a8f7be380531758eb7962542a5eb020d8795aa20 +Subproject commit b658b087767b041b2081766814655b4dd5a9a439