Skip to content

Commit

Permalink
refactor sort exec stream and combine batches
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 10, 2021
1 parent 5c88450 commit 6cc7f96
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 94 deletions.
65 changes: 15 additions & 50 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

//! Serde code to convert from protocol buffers to Rust data structures.

use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;

use crate::error::BallistaError;
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
Expand All @@ -45,7 +41,6 @@ use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
};
use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
coalesce_batches::CoalesceBatchesExec,
Expand All @@ -67,6 +62,9 @@ use datafusion::prelude::CsvReadOptions;
use log::debug;
use protobuf::logical_expr_node::ExprType;
use protobuf::physical_plan_node::PhysicalPlanType;
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::Arc;

impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
type Error = BallistaError;
Expand Down Expand Up @@ -211,6 +209,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
Expand All @@ -227,53 +226,19 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
.map(|(expr, name)| expr.try_into().map(|expr| (expr, name.clone())))
.collect::<Result<Vec<_>, _>>()?;

let mut physical_window_expr = vec![];

let df_planner = DefaultPhysicalPlanner::default();

for (expr, name) in &window_agg_expr {
match expr {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
..
} => {
let arg = df_planner
.create_physical_expr(
&args[0],
&physical_schema,
&ctx_state,
)
.map_err(|e| {
BallistaError::General(format!("{:?}", e))
})?;
if !partition_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with partition by is not yet implemented".to_owned()));
}
if !order_by.is_empty() {
return Err(BallistaError::NotImplemented("Window function with order by is not yet implemented".to_owned()));
}
if window_frame.is_some() {
return Err(BallistaError::NotImplemented("Window function with window frame is not yet implemented".to_owned()));
}
let window_expr = create_window_expr(
&fun,
&[arg],
&physical_schema,
name.to_owned(),
)?;
physical_window_expr.push(window_expr);
}
_ => {
return Err(BallistaError::General(
"Invalid expression for WindowAggrExec".to_string(),
));
}
}
}
let physical_window_expr = window_agg_expr
.iter()
.map(|(expr, name)| {
df_planner.create_window_expr_with_name(
expr,
name.clone(),
&physical_schema,
&ctx_state,
)
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ pub trait WindowExpr: Send + Sync + Debug {
/// Functions which take a single input argument, such as `sum`, return a single [`Expr`],
/// others (e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;

/// Get the sort key of this window function.
fn sort_key(&self) -> &[Arc<dyn PhysicalExpr>];
}

/// An accumulator represents a stateful object that lives throughout the evaluation of multiple rows and
Expand Down
81 changes: 64 additions & 17 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::generate_sort_key;
use crate::variable::VarType;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -143,7 +144,12 @@ impl DefaultPhysicalPlanner {
LogicalPlan::Window {
input, window_expr, ..
} => {
// Initially need to perform the aggregate and then merge the partitions
if window_expr.is_empty() {
return Err(DataFusionError::Internal(
"Impossibly got empty window expression".to_owned(),
));
}

let input_exec = self.create_initial_plan(input, ctx_state)?;
let input_schema = input_exec.schema();

Expand Down Expand Up @@ -731,34 +737,59 @@ impl DefaultPhysicalPlanner {
}
}

/// Create a window expression from a logical expression
pub fn create_window_expr(
/// Create a window expression with a name
pub fn create_window_expr_with_name(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
name: String,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};

match e {
Expr::WindowFunction { fun, args, .. } => {
Expr::WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
} => {
let args = args
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
// if !order_by.is_empty() {
// return Err(DataFusionError::NotImplemented(
// "Window function with order by is not yet implemented".to_owned(),
// ));
// }
windows::create_window_expr(fun, &args, physical_input_schema, name)
if !partition_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"Window function with partition by is not yet implemented"
.to_owned(),
));
}
if !order_by.is_empty() {
return Err(DataFusionError::NotImplemented(
"Window function with order by is not yet implemented".to_owned(),
));
}
if window_frame.is_some() {
return Err(DataFusionError::NotImplemented(
"Window function with window frame is not yet implemented"
.to_owned(),
));
}
let sort_key = generate_sort_key(partition_by, order_by)
.iter()
.map(|e| {
self.create_physical_expr(e, physical_input_schema, ctx_state)
})
.collect::<Result<Vec<_>>>()?;
let expr = windows::create_window_expr(
fun,
&args,
&sort_key,
physical_input_schema,
name,
)?;
Ok(expr)
}
other => Err(DataFusionError::Internal(format!(
"Invalid window expression '{:?}'",
Expand All @@ -767,6 +798,22 @@ impl DefaultPhysicalPlanner {
}
}

/// Create a window expression from a logical expression
pub fn create_window_expr(
&self,
e: &Expr,
logical_input_schema: &DFSchema,
physical_input_schema: &Schema,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn WindowExpr>> {
// unpack aliased logical expressions, e.g. "sum(col) over () as total"
let (name, e) = match e {
Expr::Alias(sub_expr, alias) => (alias.clone(), sub_expr.as_ref()),
_ => (e.name(logical_input_schema)?, e),
};
self.create_window_expr_with_name(e, name, physical_input_schema, ctx_state)
}

/// Create an aggregate expression from a logical expression
pub fn create_aggregate_expr(
&self,
Expand Down
Loading

0 comments on commit 6cc7f96

Please sign in to comment.