Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge adjacent filter rule for optimizer #2026

Closed
wants to merge 2 commits into from

Conversation

jackwener
Copy link
Member

Which issue does this PR close?

Closes #2016 .

explain select c1, c2 from test where c3 = true and c2 = 0.000001;

Before

+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #test.c1, #test.c2                                                                                                      |
|               |   Filter: #test.c3                                                                                                                  |
|               |     Filter: #test.c2 = Float64(0.000001)                                                                                            |
|               |       TableScan: test projection=Some([0, 1, 2]), filters=[#test.c3, #test.c2 = Float64(0.000001)]                                  |
| physical_plan | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]                                                                                       |
|               |   CoalesceBatchesExec: target_batch_size=4096                                                                                       |
|               |     FilterExec: c3@2                                                                                                                |
|               |       CoalesceBatchesExec: target_batch_size=4096                                                                                   |
|               |         FilterExec: c2@1 = 0.000001                                                                                                 |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8)                                                                          |
|               |             CsvExec: files=[/home/jakevin/code/arrow-datafusion/datafusion/tests/aggregate_simple.csv], has_header=true, limit=None |
|               |                                                                                                                                     |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------+

After

+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                            |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: #test.c1, #test.c2                                                                                                  |
|               |   Filter: #test.c3 AND #test.c2 = Float64(0.000001)                                                                             |
|               |     TableScan: test projection=Some([0, 1, 2]), filters=[#test.c3, #test.c2 = Float64(0.000001)]                                |
| physical_plan | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2]                                                                                   |
|               |   CoalesceBatchesExec: target_batch_size=4096                                                                                   |
|               |     FilterExec: c3@2 AND c2@1 = 0.000001                                                                                        |
|               |       RepartitionExec: partitioning=RoundRobinBatch(8)                                                                          |
|               |         CsvExec: files=[/home/jakevin/code/arrow-datafusion/datafusion/tests/aggregate_simple.csv], has_header=true, limit=None |
|               |                                                                                                                                 |
+---------------+---------------------------------------------------------------------------------------------------------------------------------+

Rationale for this change

merge adjacent filter

Are there any user-facing changes?

None

@jackwener jackwener force-pushed the merge branch 2 times, most recently from d45b908 to 33b0e91 Compare March 16, 2022 17:15
@jackwener
Copy link
Member Author

jackwener commented Mar 16, 2022

@alamb @Dandandan @houqp PTAL❤😃.

@xudong963
Copy link
Member

In fact, I doubt if it really has gains, could you please do some benchmarks for the case? Thanks @jackwener

@jackwener
Copy link
Member Author

jackwener commented Mar 17, 2022

In fact, I doubt if it really has gains, could you please do some benchmarks for the case? Thanks @jackwener

Let me explain about this rule. It's a common and base optimizer rule. Postgresql, mysql, cockroach all implement this rule.

like pg:

explain select id, firstname from scientist where id > 3 and lastname = 's';

QUERY PLAN
--
Seq Scan on scientist (cost=0.00..17.20 rows=1 width=72)
Filter: ((id > 3) AND ((lastname)::text = 's'::text))

@jackwener
Copy link
Member Author

jackwener commented Mar 17, 2022

In many time, It doesn't have a big impact on performance. But it can simplify the query plan.

What' more, other plan can benefit from it, like filter reorder, because we don't need to traverse the plan node instead of just focus this filter operator. Because the adjacent filter order is exchangeable.

@jackwener
Copy link
Member Author

This postgresql example is more directly.

explain select id, firstname from 
(select id, firstname, lastname from scientist where id > 3 ) t
 where lastname = 's';

QUERY PLAN
--
Seq Scan on scientist (cost=0.00..17.20 rows=1 width=72)
Filter: ((id > 3) AND ((lastname)::text = 's'::text))

@jackwener jackwener changed the title merge adjacent filter rule for optimizer Merge adjacent filter rule for optimizer Mar 17, 2022
@xudong963
Copy link
Member

Make sense to me, thank you @jackwener

xudong963
xudong963 previously approved these changes Mar 17, 2022
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks again @jackwener. I commented on a few minor flaws :)

let new_plan = optimize(plan);

// Apply the optimization to all inputs of the plan
let inputs = &new_plan.inputs();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The & is redundant?

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jackwener for optimizing this. I think we are on the right track currently but could go steps further to remove redundant conditions while combining filters.

Regarding the current tests, I think we could:

  • Have more tests if we want to eliminate unnecessary conditions (as suggested in the code comments). With both conjunctive and disjunctive filter test cases.
  • Add SQL tests as well.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Mar 18, 2022
@doki23
Copy link
Contributor

doki23 commented Mar 19, 2022

In my superficial opinion, we should make the sql planner produce one filter when creating the logical plan instead of adding an optimized rule. In other words, I think may there is a bug in the planner.

@xudong963 xudong963 self-requested a review March 19, 2022 10:07
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@doki23 's comment reminded me, then I tested it with SQL and found that there is no problem as described in this ticket-related issue.

@xudong963 xudong963 dismissed their stale review March 19, 2022 10:14

I found some other problems

@xudong963
Copy link
Member

xudong963 commented Mar 19, 2022

The following is my test code used by sql.

#[tokio::test]
async fn main() -> Result<()> {
    // create local execution context
    let mut ctx = SessionContext::new();

    // register csv file with the execution context
    ctx.register_csv("test", "tests/aggregate_simple.csv", CsvReadOptions::new())
        .await?;

    // execute the query
    let plan = ctx.create_logical_plan(
        "select c1, c2 from test where c3 = true and c2 = 0.000001",
    )?;

    dbg!(plan);

    Ok(())
}

Then I got the plan

Projection: #test.c1, #test.c2
  Filter: #test.c3 = Boolean(true) AND #test.c2 = Float64(0.000001)
    TableScan: test projection=Non

I also tested use datafusion-cli:

❯ create table t as SELECT * FROM (VALUES (1,true), (2,false)) as t;
0 rows in set. Query took 0.003 seconds.
❯ select * from t;
+---------+---------+
| column1 | column2 |
+---------+---------+
| 1       | true    |
| 2       | false   |
+---------+---------+
2 rows in set. Query took 0.002 seconds.
❯ explain select * from t where column1 = 2 and column2 = true;
+---------------+-------------------------------------------------------------------+
| plan_type     | plan                                                              |
+---------------+-------------------------------------------------------------------+
| logical_plan  | Projection: #t.column1, #t.column2                                |
|               |   Filter: #t.column1 = Int64(2) AND #t.column2                    |
|               |     TableScan: t projection=Some([0, 1])                          |
| physical_plan | ProjectionExec: expr=[column1@0 as column1, column2@1 as column2] |
|               |   CoalesceBatchesExec: target_batch_size=4096                     |
|               |     FilterExec: column1@0 = 2 AND column2@1                       |
|               |       RepartitionExec: partitioning=RoundRobinBatch(12)           |
|               |         MemoryExec: partitions=1, partition_sizes=[1]             |
|               |                                                                   |
+---------------+-------------------------------------------------------------------+
2 rows in set. Query took 0.004 seconds.

Two cases will result in adjacent filters in logical plan:

  • Use dataframe: df.xxx.filter(filter1).filter(filter2);
  • Directly build logical plan by LogicalPlanBuilder: LogicalPlanBuilder::from(xx).xxx.filter().filter()...

For dataframe users, they can use the following to avoid adjacent filters

    let filter1 = col("b").eq(lit(10));

    let filter2 = col("a").eq(lit("a"));
    
    let filter = filter1.and(filter2);

    let df = df
        .select_columns(&["a", "b"])?
        .filter(filter)?;

@jackwener
Copy link
Member Author

❤😃 Thanks @xudong963, @doki23

@alamb
Copy link
Contributor

alamb commented Mar 20, 2022

For anyone following along, follow on PR is #2039 #2038

@jackwener jackwener deleted the merge branch November 24, 2022 02:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Merge the adjacent filter
5 participants