Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize/simplify the literal data type and remove unnecessary cast、try_cast #3031

Closed
5 of 6 tasks
liukun4515 opened this issue Aug 4, 2022 · 15 comments · Fixed by #4634
Closed
5 of 6 tasks

optimize/simplify the literal data type and remove unnecessary cast、try_cast #3031

liukun4515 opened this issue Aug 4, 2022 · 15 comments · Fixed by #4634
Assignees
Labels
enhancement New feature or request

Comments

@liukun4515
Copy link
Contributor

liukun4515 commented Aug 4, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Schema:

c1  decimal(18,)

select * from table where c1 = 20;
select * from table where c1 in (12,2)

We will get the sql plan like below

    .....
       ....
         cast(c1 as decimal(20,0), cast(20 as decimal(20,0)

The type of 20 or 12,2 is INT64 in the datafusion, the coerced data type of decimal(18,0) compare with int64 is decimal(20,0) according to the rule get_comparison_common_decimal_type.

We need to optimize this point like spark-issue, and it will reduce unnecessary cast/try_cast in many literal case.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
There is the plan to do the rule:

@liukun4515 liukun4515 added the enhancement New feature or request label Aug 4, 2022
@liukun4515 liukun4515 self-assigned this Aug 4, 2022
@liukun4515
Copy link
Contributor Author

Optimize the decimal value from arrow-rs apache/arrow-rs#2313

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

Here is a self contained reproducer for anyone following along:

❯ create table foo as select column1 as d from (values (1), (2));
+---+
| d |
+---+
| 1 |
| 2 |
+---+
2 rows in set. Query took 0.005 seconds.
❯ create table bar as select cast (d as decimal) as d from foo;
+--------------+
| d            |
+--------------+
| 1.0000000000 |
| 2.0000000000 |
+--------------+
2 rows in set. Query took 0.005 seconds.
❯ explain select * from bar where d = 1.4;
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | Projection: #bar.d                                                                |
|               |   Filter: #bar.d = Float64(1.4)                                                   |
|               |     TableScan: bar projection=[d]                                                 |
| physical_plan | ProjectionExec: expr=[d@0 as d]                                                   |
|               |   CoalesceBatchesExec: target_batch_size=4096                                     |
|               |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|               |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.

The FilterExec line above should not have the CAST operations in them

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

And we can see that as @liukun4515 says the casting is added in the physical plan (rather than the logical plan):

❯ explain verbose select * from bar where d = 1.4;
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| plan_type                                             | plan                                                                              |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+
| initial_logical_plan                                  | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar                                                                |
| logical_plan after simplify_expressions               | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_where_exists           | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_where_in               | SAME TEXT AS ABOVE                                                                |
| logical_plan after decorrelate_scalar_subquery        | SAME TEXT AS ABOVE                                                                |
| logical_plan after subquery_filter_to_join            | SAME TEXT AS ABOVE                                                                |
| logical_plan after eliminate_filter                   | SAME TEXT AS ABOVE                                                                |
| logical_plan after common_sub_expression_eliminate    | SAME TEXT AS ABOVE                                                                |
| logical_plan after eliminate_limit                    | SAME TEXT AS ABOVE                                                                |
| logical_plan after projection_push_down               | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar projection=[d]                                                 |
| logical_plan after rewrite_disjunctive_predicate      | SAME TEXT AS ABOVE                                                                |
| logical_plan after reduce_outer_join                  | SAME TEXT AS ABOVE                                                                |
| logical_plan after filter_push_down                   | SAME TEXT AS ABOVE                                                                |
| logical_plan after limit_push_down                    | SAME TEXT AS ABOVE                                                                |
| logical_plan after SingleDistinctAggregationToGroupBy | SAME TEXT AS ABOVE                                                                |
| logical_plan                                          | Projection: #bar.d                                                                |
|                                                       |   Filter: #bar.d = Float64(1.4)                                                   |
|                                                       |     TableScan: bar projection=[d]                                                 |
| initial_physical_plan                                 | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15))   |
|                                                       |     MemoryExec: partitions=1, partition_sizes=[1]                                 |
|                                                       |                                                                                   |
| physical_plan after aggregate_statistics              | SAME TEXT AS ABOVE                                                                |
| physical_plan after hash_build_probe_order            | SAME TEXT AS ABOVE                                                                |
| physical_plan after coalesce_batches                  | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       MemoryExec: partitions=1, partition_sizes=[1]                               |
|                                                       |                                                                                   |
| physical_plan after repartition                       | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|                                                       |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|                                                       |                                                                                   |
| physical_plan after add_merge_exec                    | SAME TEXT AS ABOVE                                                                |
| physical_plan                                         | ProjectionExec: expr=[d@0 as d]                                                   |
|                                                       |   CoalesceBatchesExec: target_batch_size=4096                                     |
|                                                       |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|                                                       |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|                                                       |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|                                                       |                                                                                   |
+-------------------------------------------------------+-----------------------------------------------------------------------------------+

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

I agree the ideal outcome would be that the logical plan has the casts. I think the reason the type coercion / casting happens in the physical planner was some idea that it could be possible to use different implementations of physical exprs that might have different comparison rules (like some system that could natively compare i32 and i64 -- via a native kernel, for example)

I am not sure if this ability has every been used in practice but I am not sure it would be a simple thing to change now

I suggest you investigate special casing the creation of PhysicalExprs for for cast https://github.com/apache/arrow-datafusion/blob/89bcfc4827a84c37abdf6476ec164611b270492d/datafusion/physical-expr/src/planner.rs#L172-L181

And basically special case if the argument is a literal, do the cast at plan creation time. It wouldn't be super be super general, but I think it would work in this case for you

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

Another alternative might be to follow the model of ConstEvaluator(which operates onExpr`s) any apply it to physical expressions 🤔
https://github.com/apache/arrow-datafusion/blob/3eb55e9a0510d872f6f7765b1a5f17db46486e45/datafusion/optimizer/src/simplify_expressions.rs#L397-L415

You would likely need changes to PhysicalExpr similar to what @iajoiner and i were discussing on #3014 (comment)

@liukun4515
Copy link
Contributor Author

liukun4515 commented Aug 17, 2022

Here is a self contained reproducer for anyone following along:

❯ create table foo as select column1 as d from (values (1), (2));
+---+
| d |
+---+
| 1 |
| 2 |
+---+
2 rows in set. Query took 0.005 seconds.
❯ create table bar as select cast (d as decimal) as d from foo;
+--------------+
| d            |
+--------------+
| 1.0000000000 |
| 2.0000000000 |
+--------------+
2 rows in set. Query took 0.005 seconds.
❯ explain select * from bar where d = 1.4;
+---------------+-----------------------------------------------------------------------------------+
| plan_type     | plan                                                                              |
+---------------+-----------------------------------------------------------------------------------+
| logical_plan  | Projection: #bar.d                                                                |
|               |   Filter: #bar.d = Float64(1.4)                                                   |
|               |     TableScan: bar projection=[d]                                                 |
| physical_plan | ProjectionExec: expr=[d@0 as d]                                                   |
|               |   CoalesceBatchesExec: target_batch_size=4096                                     |
|               |     FilterExec: CAST(d@0 AS Decimal128(38, 15)) = CAST(1.4 AS Decimal128(38, 15)) |
|               |       RepartitionExec: partitioning=RoundRobinBatch(16)                           |
|               |         MemoryExec: partitions=1, partition_sizes=[1]                             |
|               |                                                                                   |
+---------------+-----------------------------------------------------------------------------------+
2 rows in set. Query took 0.005 seconds.

The FilterExec line above should not have the CAST operations in them

The cast is added in the creation of physical expr/physical plan.
It follow a generate rule for coerced binary comparison.
Like below:

INT32 < INT64 -> INT64

DECIMAL(10,2) < DOUBLE -> Other decimal data type.

I think it all in the comparison_binary_numeric_coercion function.

This is just the general rule, and it works well in all cases.
But in many user case, we just use the literal as filter expr and other condition as this issue, the new optimizer rule should resolve this case like in the spark.

I have file a draft pr which add a logical optimizer rule to do this, but it maybe ready tomorrow because of some changes of plan needed to reviewed by myself first. I think the rule can works well for us. #3185

@alamb

@alamb
Copy link
Contributor

alamb commented Aug 17, 2022

The cast is added in the creation of physical expr/physical plan.

Yes I agree

It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step

I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)

@andygrove
Copy link
Member

+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.

@liukun4515
Copy link
Contributor Author

+1 for optimization rules to perform type coercion in the logical plan. I am running into some issues around this myself and am seeing quite a lot of inconsistencies in how we currently handle this.

Do you mean that you wish to migrate the type coercion from physical plan creation to logical plan creation?

But this issue just add a rule to optimize the expr and improve the performance, and will not do and changes in the generation of physical plan or physical expr.

@liukun4515
Copy link
Contributor Author

The cast is added in the creation of physical expr/physical plan.

Yes I agree

It sounds like you are proposing moving the coercion to the logical planning phase -- while I am not opposed to doing so I do think it is likely a large change which I think we'll want to run by other maintainers. But getting a PR up to do the change is the first step

In my knowledge from other database system, the generation for physical expr will not do any changes for original expr and just do transformation from logical expr to physical expr.

I remember we discussed this a long time ago.

But this issue is not used to do this, and just to optimize the case described in the beginning of this issue.

I do think it is worth considering special casing just the casting in the physical planner (as the physical planner is what is adding the casts in the first place)

We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.

@alamb

@alamb
Copy link
Contributor

alamb commented Aug 18, 2022

We can pull a new issue to discussion why and how to migrate the type coercion from physical parse to logical parse. What benefit we can get from the work.

Makes sense to me 👍 thank you @liukun4515

To be clear I think moving coercion to logical planning is a good idea, but I think it will be a non trivial change that will take some time. Thus, I was suggesting the "special case physical expr planning" idea as a way to speed up casting in the short term

@alamb
Copy link
Contributor

alamb commented Sep 19, 2022

@liukun4515 I wonder if this ticket is now complete?

@liukun4515
Copy link
Contributor Author

@liukun4515 I wonder if this ticket is now complete?

There are some follow-up optimization tasks.
I will list them and then close this pr.

@alamb
Copy link
Contributor

alamb commented Dec 14, 2022

ping

@liukun4515
Copy link
Contributor Author

we can close it after merged #4634

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants