Skip to content

Commit

Permalink
rewrite approx_median to approx_percentile_cont while planning phase (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
korowa authored and MazterQyou committed Jul 5, 2022
1 parent 119e447 commit a7d1a17
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 181 deletions.
5 changes: 0 additions & 5 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::optimizer::to_approx_perc::ToApproxPerc;

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
Expand Down Expand Up @@ -1151,10 +1150,6 @@ impl SessionState {
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
// ToApproxPerc must be applied last because
// it rewrites only the function and may interfere with
// other rules
Arc::new(ToApproxPerc::new()),
],
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ pub mod optimizer;
pub mod projection_push_down;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
pub mod to_approx_perc;
pub mod utils;
161 changes: 0 additions & 161 deletions datafusion/core/src/optimizer/to_approx_perc.rs

This file was deleted.

64 changes: 50 additions & 14 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2040,15 +2040,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
window_functions::WindowFunction::AggregateFunction(
aggregate_fun,
) => {
let (aggregate_fun, args) = self.aggregate_fn_to_expr(
aggregate_fun,
function,
schema,
)?;

return Ok(Expr::WindowFunction {
fun: window_functions::WindowFunction::AggregateFunction(
aggregate_fun.clone(),
),
args: self.aggregate_fn_to_expr(
aggregate_fun,
function,
schema,
)?,
),
args,
partition_by,
order_by,
window_frame,
Expand All @@ -2073,7 +2075,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// next, aggregate built-ins
if let Ok(fun) = aggregates::AggregateFunction::from_str(&name) {
let distinct = function.distinct;
let args = self.aggregate_fn_to_expr(fun.clone(), function, schema)?;
let (fun, args) = self.aggregate_fn_to_expr(fun, function, schema)?;
return Ok(Expr::AggregateFunction {
fun,
distinct,
Expand Down Expand Up @@ -2167,9 +2169,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fun: aggregates::AggregateFunction,
function: sqlparser::ast::Function,
schema: &DFSchema,
) -> Result<Vec<Expr>> {
if fun == aggregates::AggregateFunction::Count {
function
) -> Result<(aggregates::AggregateFunction, Vec<Expr>)> {
let args = match fun {
aggregates::AggregateFunction::Count => function
.args
.into_iter()
.map(|a| match a {
Expand All @@ -2179,10 +2181,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(lit(1_u8)),
_ => self.sql_fn_arg_to_logical_expr(a, schema),
})
.collect::<Result<Vec<Expr>>>()
} else {
self.function_args_to_expr(function.args, schema)
}
.collect::<Result<Vec<Expr>>>()?,
aggregates::AggregateFunction::ApproxMedian => function
.args
.into_iter()
.map(|a| self.sql_fn_arg_to_logical_expr(a, schema))
.chain(iter::once(Ok(lit(0.5_f64))))
.collect::<Result<Vec<Expr>>>()?,
_ => self.function_args_to_expr(function.args, schema)?,
};

let fun = match fun {
aggregates::AggregateFunction::ApproxMedian => {
aggregates::AggregateFunction::ApproxPercentileCont
}
_ => fun,
};

Ok((fun, args))
}

fn sql_interval_to_literal(
Expand Down Expand Up @@ -3572,6 +3588,15 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn select_approx_median() {
let sql = "SELECT approx_median(age) FROM person";
let expected = "Projection: #APPROXPERCENTILECONT(person.age,Float64(0.5))\
\n Aggregate: groupBy=[[]], aggr=[[APPROXPERCENTILECONT(#person.age, Float64(0.5))]]\
\n TableScan: person projection=None";
quick_test(sql, expected);
}

#[test]
fn select_scalar_func() {
let sql = "SELECT sqrt(age) FROM person";
Expand Down Expand Up @@ -4326,6 +4351,17 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn approx_median_window() {
let sql =
"SELECT order_id, APPROX_MEDIAN(qty) OVER(PARTITION BY order_id) from orders";
let expected = "\
Projection: #orders.order_id, #APPROXPERCENTILECONT(orders.qty,Float64(0.5)) PARTITION BY [#orders.order_id]\
\n WindowAggr: windowExpr=[[APPROXPERCENTILECONT(#orders.qty, Float64(0.5)) PARTITION BY [#orders.order_id]]]\
\n TableScan: orders projection=None";
quick_test(sql, expected);
}

#[test]
fn select_typedstring() {
let sql = "SELECT date '2020-12-10' AS date FROM person";
Expand Down

0 comments on commit a7d1a17

Please sign in to comment.