Skip to content

Commit

Permalink
Optimize, retain partitions, make more robust, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Nov 5, 2021
1 parent 45655d7 commit de57fae
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 20 deletions.
7 changes: 6 additions & 1 deletion ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,12 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::CreateMemoryTable { name, input } => todo!(),
LogicalPlan::CreateMemoryTable { .. } => {
return Err(proto_error(format!(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
listing_table.options().format
)))
}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,13 @@ impl ExecutionContext {
}

LogicalPlan::CreateMemoryTable { input, name } => {
let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &input));
let plan = self.optimize(&input)?;
let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));

let batches: Vec<_> = physical.collect().await?;
let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(
Arc::new(input.schema().as_ref().into()),
vec![batches],
Arc::new(plan.schema().as_ref().into()),
batches,
)?);
self.register_table(name.as_str(), table)?;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ impl LogicalPlan {
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain { plan, .. } => vec![plan],
LogicalPlan::Analyze { input: plan, .. } => vec![plan],
LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. } => vec![],
| LogicalPlan::CreateExternalTable { .. } => vec![],
}
}

Expand Down
13 changes: 5 additions & 8 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,14 +792,11 @@ impl DefaultPhysicalPlanner {
))
}
| LogicalPlan::CreateMemoryTable {..} => {
// There is no default plan for "CREATE
// TABLE" -- it must be handled at a higher level (so
// that the appropriate table can be registered with
// the context)
Err(DataFusionError::Internal(
"Unsupported logical plan: CreateMemoryTable".to_string(),
))

// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
false,
SchemaRef::new(Schema::empty()),
)))
}
LogicalPlan::Explain { .. } => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
Expand Down
22 changes: 18 additions & 4 deletions datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ use arrow::datatypes::*;
use hashbrown::HashMap;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem,
SetExpr, SetOperator, ShowStatementFilter, TableFactor, TableWithJoins,
TrimWhereField, UnaryOperator, Value, Values as SQLValues,
HiveDistributionStyle, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
Select, SelectItem, SetExpr, SetOperator, ShowStatementFilter, TableFactor,
TableWithJoins, TrimWhereField, UnaryOperator, Value, Values as SQLValues,
};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{OrderByExpr, Statement};
Expand Down Expand Up @@ -136,8 +136,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Statement::CreateTable {
query: Some(query),
name,
//or_replace: false,
columns,
constraints,
hive_distribution: HiveDistributionStyle::NONE,
//hive_formats: None,
table_properties,
with_options,
file_format: None,
location: None,
like: None,
..
} => {
} if columns.is_empty()
&& constraints.is_empty()
&& table_properties.is_empty()
&& with_options.is_empty() =>
{
let plan = self.query_to_plan(query)?;

Ok(LogicalPlan::CreateMemoryTable {
Expand Down
24 changes: 24 additions & 0 deletions datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ async fn select_all() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn create_table_as() -> Result<()> {
let mut ctx = ExecutionContext::new();
register_aggregate_simple_csv(&mut ctx).await?;

let sql = "CREATE TABLE my_table AS SELECT * FROM aggregate_simple";
ctx.sql(sql).await.unwrap();

let sql_all = "SELECT * FROM my_table order by c1 LIMIT 1";
let results_all = execute_to_batches(&mut ctx, sql_all).await;

let expected = vec![
"+---------+----------------+------+",
"| c1 | c2 | c3 |",
"+---------+----------------+------+",
"| 0.00001 | 0.000000000001 | true |",
"+---------+----------------+------+",
];

assert_batches_eq!(expected, &results_all);

Ok(())
}

#[tokio::test]
async fn select_distinct() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down

0 comments on commit de57fae

Please sign in to comment.