Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support limit push down for offset_plan #2566

Merged
merged 9 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 205 additions & 5 deletions datafusion/core/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::logical_plan::plan::Projection;
use crate::logical_plan::{Limit, TableScan};
use crate::logical_plan::{LogicalPlan, Union};
use crate::optimizer::optimizer::OptimizerRule;
use datafusion_expr::logical_plan::Offset;
use std::sync::Arc;

/// Optimization rule that tries pushes down LIMIT n
Expand All @@ -43,18 +44,24 @@ fn limit_push_down(
upper_limit: Option<usize>,
plan: &LogicalPlan,
_execution_props: &ExecutionProps,
is_offset: bool,
) -> Result<LogicalPlan> {
match (plan, upper_limit) {
(LogicalPlan::Limit(Limit { n, input }), upper_limit) => {
let smallest = upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n);
let new_limit: usize = if is_offset {
*n + upper_limit.unwrap_or(0)
} else {
upper_limit.map(|x| std::cmp::min(x, *n)).unwrap_or(*n)
};
Ok(LogicalPlan::Limit(Limit {
n: smallest,
n: new_limit,
// push down limit to plan (minimum of upper limit and current limit)
input: Arc::new(limit_push_down(
_optimizer,
Some(smallest),
Some(new_limit),
input.as_ref(),
_execution_props,
false,
)?),
}))
}
Expand Down Expand Up @@ -95,6 +102,7 @@ fn limit_push_down(
upper_limit,
input.as_ref(),
_execution_props,
false,
)?),
schema: schema.clone(),
alias: alias.clone(),
Expand All @@ -119,6 +127,7 @@ fn limit_push_down(
Some(upper_limit),
x,
_execution_props,
false,
)?),
}))
})
Expand All @@ -129,6 +138,25 @@ fn limit_push_down(
schema: schema.clone(),
}))
}
// offset 5 limit 10 then push limit 15 (5 + 10)
// Limit should always be Offset's input
(LogicalPlan::Offset(Offset { offset, input }), upper_limit) => {
let new_limit = if let Some(ul) = upper_limit {
ul + *offset
} else {
*offset
};
Ok(LogicalPlan::Offset(Offset {
offset: *offset,
input: Arc::new(limit_push_down(
_optimizer,
Some(new_limit),
input.as_ref(),
_execution_props,
true,
)?),
}))
}
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
_ => {
Expand All @@ -138,7 +166,9 @@ fn limit_push_down(
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| limit_push_down(_optimizer, None, plan, _execution_props))
.map(|plan| {
limit_push_down(_optimizer, None, plan, _execution_props, false)
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
Expand All @@ -152,7 +182,7 @@ impl OptimizerRule for LimitPushDown {
plan: &LogicalPlan,
execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
limit_push_down(self, None, plan, execution_props)
limit_push_down(self, None, plan, execution_props, false)
}

fn name(&self) -> &str {
Expand All @@ -167,6 +197,8 @@ mod test {
logical_plan::{col, max, LogicalPlan, LogicalPlanBuilder},
test::*,
};
use datafusion_expr::exists;
use datafusion_expr::logical_plan::JoinType;

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
Expand Down Expand Up @@ -278,4 +310,172 @@ mod test {

Ok(())
}

#[test]
fn limit_pushdown_with_offset_projection_table_provider() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
.offset(10)?
.limit(1000)?
.build()?;

// Should push the limit down to table provider
// When it has a select
let expected = "Limit: 1000\
\n Offset: 10\
\n Projection: #test.a\
\n TableScan: test projection=None, limit=1010";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_pushdown_with_offset_after_limit() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a")])?
.limit(1000)?
.offset(10)?
.build()?;

let expected = "Offset: 10\
\n Limit: 1010\
\n Projection: #test.a\
\n TableScan: test projection=None, limit=1010";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_push_down_with_offset_take_smaller_limit() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan)
.offset(10)?
.limit(1000)?
.limit(10)?
.build()?;

// Should push down the smallest limit
// Towards table scan
// This rule doesn't replace multiple limits
let expected = "Limit: 10\
\n Limit: 10\
\n Offset: 10\
\n TableScan: test projection=None, limit=20";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_doesnt_push_down_with_offset_aggregation() -> Result<()> {
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a")], vec![max(col("b"))])?
.offset(10)?
.limit(1000)?
.build()?;

// Limit should *not* push down aggregate node
let expected = "Limit: 1000\
\n Offset: 10\
\n Aggregate: groupBy=[[#test.a]], aggr=[[MAX(#test.b)]]\
\n TableScan: test projection=None";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_should_push_down_with_offset_union() -> Result<()> {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
let table_scan = test_table_scan()?;

let plan = LogicalPlanBuilder::from(table_scan.clone())
.union(LogicalPlanBuilder::from(table_scan).build()?)?
.offset(10)?
.limit(1000)?
.build()?;

// Limit should push down through union
let expected = "Limit: 1000\
\n Offset: 10\
\n Union\
\n Limit: 1010\
\n TableScan: test projection=None, limit=1010\
\n Limit: 1010\
\n TableScan: test projection=None, limit=1010";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_should_not_push_down_with_offset_join() -> Result<()> {
let table_scan_1 = test_table_scan()?;
let table_scan_2 = test_table_scan_with_name("test2")?;

let plan = LogicalPlanBuilder::from(table_scan_1)
.join(
&LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Left,
(vec!["a"], vec!["a"]),
)?
.limit(1000)?
.offset(10)?
.build()?;

// Limit pushdown Not supported in Join
let expected = "Offset: 10\
\n Limit: 1010\
\n Left Join: #test.a = #test2.a\
\n TableScan: test projection=None\
\n TableScan: test2 projection=None";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn limit_should_not_push_down_with_offset_sub_query() -> Result<()> {
let table_scan_1 = test_table_scan_with_name("test1")?;
let table_scan_2 = test_table_scan_with_name("test2")?;

let subquery = LogicalPlanBuilder::from(table_scan_1)
.project(vec![col("a")])?
.filter(col("a").eq(col("test1.a")))?
.build()?;

let outer_query = LogicalPlanBuilder::from(table_scan_2)
.project(vec![col("a")])?
.filter(exists(Arc::new(subquery)))?
.limit(100)?
.offset(10)?
.build()?;

// Limit pushdown Not supported in sub_query
let expected = "Offset: 10\
\n Limit: 110\
\n Filter: EXISTS (Subquery: Filter: #test1.a = #test1.a\
\n Projection: #test1.a\
\n TableScan: test1 projection=None)\
\n Projection: #test2.a\
\n TableScan: test2 projection=None";

assert_optimized_plan_eq(&outer_query, expected);

Ok(())
}
}
47 changes: 42 additions & 5 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let plan = self.order_by(plan, query.order_by)?;

let plan: LogicalPlan = self.offset(plan, query.offset)?;
let plan: LogicalPlan = self.limit(plan, query.limit)?;

self.limit(plan, query.limit)
//make limit as offset's input will enable limit push down simply
self.offset(plan, query.offset)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an offset in sql, we will put it as root node in plan tree. So it will easily push new_limit to limit operator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to apply the offset before the limit here?

It looks like we don't currently have a test with both a non-zero offset and a limit. Could you add one here so we can see what the plan looks like?

Per https://www.postgresql.org/docs/current/queries-limit.html If both OFFSET and LIMIT appear, then OFFSET rows are skipped before starting to count the LIMIT rows that are returned. and I am not sure this change will ensure that

Copy link
Member Author

@Ted-Jiang Ted-Jiang May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

form the explain from pg

postgres=# explain analyze  select * from users limit 5 offset 7;
                                                 QUERY PLAN
-------------------------------------------------------------------------------------------------------------
 Limit  (cost=0.17..0.29 rows=5 width=80) (actual time=0.117..0.226 rows=5 loops=1)
   ->  Seq Scan on users  (cost=0.00..239.02 rows=10002 width=80) (actual time=0.013..0.082 rows=12 loops=1)
 Planning Time: 0.060 ms
 Execution Time: 0.350 ms
(4 rows)

I think in PG, the limit operator has a param with offset, in DF we separate it. So we need apply offset after limit a+b

add test #2566 (comment)

}

fn set_expr_to_plan(
Expand Down Expand Up @@ -2647,6 +2648,9 @@ fn parse_sql_number(n: &str) -> Result<Expr> {
#[cfg(test)]
mod tests {
use crate::datasource::empty::EmptyTable;
use crate::execution::context::ExecutionProps;
use crate::optimizer::limit_push_down::LimitPushDown;
use crate::optimizer::optimizer::OptimizerRule;
use crate::{assert_contains, logical_plan::create_udf, sql::parser::DFParser};
use datafusion_expr::{ScalarFunctionImplementation, Volatility};

Expand Down Expand Up @@ -4375,6 +4379,16 @@ mod tests {
assert_eq!(format!("{:?}", plan), expected);
}

fn quick_test_with_limit_pushdown(sql: &str, expected: &str) {
let plan = logical_plan(sql).unwrap();
let rule = LimitPushDown::new();
let optimized_plan = rule
.optimize(&plan, &ExecutionProps::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}

struct MockContextProvider {}

impl ContextProvider for MockContextProvider {
Expand Down Expand Up @@ -4823,10 +4837,10 @@ mod tests {
}

#[test]
fn test_offset_with_limit() {
fn test_zero_offset_with_limit() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 0;";
let expected = "Limit: 5\
\n Offset: 0\
let expected = "Offset: 0\
\n Limit: 5\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
Expand All @@ -4847,6 +4861,29 @@ mod tests {
quick_test(sql, expected);
}

#[test]
fn test_offset_after_limit_with_limit_push() {
let sql = "select id from person where person.id > 100 LIMIT 5 OFFSET 3;";
let expected = "Offset: 3\
\n Limit: 8\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";

quick_test_with_limit_pushdown(sql, expected);
}

#[test]
fn test_offset_before_limit_with_limit_push() {
let sql = "select id from person where person.id > 100 OFFSET 3 LIMIT 5;";
let expected = "Offset: 3\
\n Limit: 8\
\n Projection: #person.id\
\n Filter: #person.id > Int64(100)\
\n TableScan: person projection=None";
quick_test_with_limit_pushdown(sql, expected);
}

Comment on lines +4864 to +4886
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes it much clearer

fn assert_field_not_found(err: DataFusionError, name: &str) {
match err {
DataFusionError::SchemaError { .. } => {
Expand Down