Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression: Ordering by joined column doesn't return results #8374

Closed
DDtKey opened this issue Nov 30, 2023 · 17 comments
Closed

Regression: Ordering by joined column doesn't return results #8374

DDtKey opened this issue Nov 30, 2023 · 17 comments
Labels
bug Something isn't working regression Something that used to work no longer does

Comments

@DDtKey
Copy link
Contributor

DDtKey commented Nov 30, 2023

Describe the bug

After update to datafusion: 33 I've noticed wrong behavior for our internal test with sorting by multiple columns.
It used to work in datafusion: 31

To Reproduce

MRE with datafusion-cli:

CREATE TABLE users AS VALUES('Alice',50),('Bob',100);
CREATE TABLE employees AS VALUES('Alice','Finance'),('Bob','Marketing');

SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";
0 rows in set. Query took 0.002 seconds.

But at the same time, without ordering by joined column it works:

SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1";
+---------+---------+
| column1 | column2 |
+---------+---------+
| Alice   | 50      |
| Bob     | 100     |
+---------+---------+
2 rows in set. Query took 0.002 seconds.

Expected behavior

It should work as before

Additional context

No response

@DDtKey DDtKey added the bug Something isn't working label Nov 30, 2023
@suxiaogang223
Copy link
Contributor

hi, could you show more information like building features and platform👀? I run the same sql and got the correct result on m1 Mac, both in debug and release mode.

@suxiaogang223
Copy link
Contributor

hi, could you show more information like building features and platform👀? I run the same sql and got the correct result on m1 Mac, both in debug and release mode.

Sorry, the bug can be triggered on branch-33, I used the wrong code on tag 33.0.0-rc1.

@suxiaogang223
Copy link
Contributor

I tried to explain the sql:

explain SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";

On branch-33,the result is:

+---------------+--------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                 |
+---------------+--------------------------------------------------------------------------------------+
| logical_plan  | Projection: u.column1, u.column2                                                     |
|               |   Sort: u.column1 ASC NULLS LAST, e.column2 ASC NULLS LAST                           |
|               |     Projection: u.column1, u.column2, e.column2                                      |
|               |       Inner Join: u.column1 = e.column1                                              |
|               |         SubqueryAlias: u                                                             |
|               |           TableScan: users projection=[column1, column2]                             |
|               |         SubqueryAlias: e                                                             |
|               |           TableScan: employees projection=[column1, column2]                         |
| physical_plan | SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]         |
|               |   SortExec: expr=[column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                 |
|               |     ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                |
|               |       CoalesceBatchesExec: target_batch_size=8192                                    |
|               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)] |
|               |           CoalesceBatchesExec: target_batch_size=8192                                |
|               |             RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=1   |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                          |
|               |           CoalesceBatchesExec: target_batch_size=8192                                |
|               |             RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=1   |
|               |               MemoryExec: partitions=1, partition_sizes=[1]                          |
|               |                                                                                      |
+---------------+--------------------------------------------------------------------------------------+
2 rows in set. Query took 0.022 seconds.

On branch-31, the result is:

+---------------+-----------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                          |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan  | Projection: u.column1, u.column2                                                              |
|               |   Sort: u.column1 ASC NULLS LAST, e.column2 ASC NULLS LAST                                    |
|               |     Projection: u.column1, u.column2, e.column2                                               |
|               |       Inner Join: u.column1 = e.column1                                                       |
|               |         SubqueryAlias: u                                                                      |
|               |           TableScan: users projection=[column1, column2]                                      |
|               |         SubqueryAlias: e                                                                      |
|               |           TableScan: employees projection=[column1, column2]                                  |
| physical_plan | ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                             |
|               |   SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                |
|               |     SortExec: expr=[column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                        |
|               |       ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column2@3 as column2] |
|               |         CoalesceBatchesExec: target_batch_size=8192                                           |
|               |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]        |
|               |             CoalesceBatchesExec: target_batch_size=8192                                       |
|               |               RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=8          |
|               |                 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1          |
|               |                   MemoryExec: partitions=1, partition_sizes=[1]                               |
|               |             CoalesceBatchesExec: target_batch_size=8192                                       |
|               |               RepartitionExec: partitioning=Hash([column1@0], 8), input_partitions=8          |
|               |                 RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1          |
|               |                   MemoryExec: partitions=1, partition_sizes=[1]                               |
|               |                                                                                               |
+---------------+-----------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.019 seconds.

The difference is ProjectionExec, on branch-33, the project wrongly excluded the e.column2, so the SortExec can't sort by e.column2.

@haohuaijin
Copy link
Contributor

After do some research, I find this error cause by ProjectionPushdown rule in physical optimizer

| physical_plan after OutputRequirements  | ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]                                                                                                                     |
|                                         |   SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                                                                                                        |
|                                         |     SortExec: expr=[column1@0 ASC NULLS LAST,column2@2 ASC NULLS LAST]                                                                                                                |
|                                         |       ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column2@3 as column2]  <-- before we have column2@3(e.column2)                                                                                       |
|                                         |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                   |
|                                         |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]                                                                                                |
|                                         |             CoalesceBatchesExec: target_batch_size=8192                                                                                                                               |
|                                         |               RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                 |
|                                         |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                         |
|                                         |             CoalesceBatchesExec: target_batch_size=8192                                                                                                                               |
|                                         |               RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                 |
|                                         |                 MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                         |
|                                         |                                                                                                                                                                                       |
| physical_plan after PipelineChecker     | SAME TEXT AS ABOVE                                                                                                                                                                    |
| physical_plan after LimitAggregation    | SAME TEXT AS ABOVE                                                                                                                                                                    |
| physical_plan after ProjectionPushdown  | SortPreservingMergeExec: [column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                                                                                                          |
|                                         |   SortExec: expr=[column1@0 ASC NULLS LAST,column2@1 ASC NULLS LAST]                                                                                                                  |
|                                         |     ProjectionExec: expr=[column1@0 as column1, column2@1 as column2]   <-- after we elimiate column2@3(e.column2)                                                                                                              |
|                                         |       CoalesceBatchesExec: target_batch_size=8192                                                                                                                                     |
|                                         |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(column1@0, column1@0)]                                                                                                  |
|                                         |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                 |
|                                         |             RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                   |
|                                         |               MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                           |
|                                         |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                 |
|                                         |             RepartitionExec: partitioning=Hash([column1@0], 24), input_partitions=1                                                                                                   |
|                                         |               MemoryExec: partitions=1, partition_sizes=[1]    

the reason for this rewrite, may be because we only use column name for identify a column in below code:

https://github.com/apache/arrow-datafusion/blob/06bbe1298fa8aa042b6a6462e55b2890969d884a/datafusion/core/src/physical_optimizer/projection_pushdown.rs#L866-L872

When the column names are identical, the error will arise

@DDtKey
Copy link
Contributor Author

DDtKey commented Nov 30, 2023

When the column names are identical, the error will arise

Just to clarify: in my tests this failed with different column names as well. Just MRE uses auto column names

@haohuaijin
Copy link
Contributor

haohuaijin commented Nov 30, 2023

Just to clarify: in my tests this failed with different column names as well. Just MRE uses auto column names

@DDtKey could you provide some cases? When the column name is different, I find it works in datafusion 33

DataFusion CLI v33.0.0
❯ create table u(a text, b int) as values ('Alice', 50), ('Bob', 100);
0 rows in set. Query took 0.023 seconds.

❯ create table e(c text, d text) as values ('Alice', 'Finance'), ('Bob', 'Marketing');
0 rows in set. Query took 0.000 seconds.

❯ select u.* from u join e on u.a = e.c order by u.a, e.d;
+-------+-----+
| a     | b   |
+-------+-----+
| Alice | 50  |
| Bob   | 100 |
+-------+-----+
2 rows in set. Query took 0.021 seconds.

@DDtKey
Copy link
Contributor Author

DDtKey commented Nov 30, 2023

Sorry for the confusion, you're right
It works with different column names (used in ORDER BY), so that seems to be the root of the problem.

@Asura7969
Copy link
Contributor

My initial solution:

.find_map(|(index, (projected_expr, alias))| {
  projected_expr.as_any().downcast_ref::<Column>().and_then(
      |projected_column| {
          (column.index() == projected_column.index()       <--- and index comparison
              && column.name().eq(projected_column.name()))
          .then(|| {
              state = RewriteState::RewrittenValid;
              Arc::new(Column::new(alias, index)) as _
          })
      },
  )
})

and index comparison

DataFusion CLI v33.0.0
❯ CREATE TABLE users AS VALUES('Alice',50),('Bob',100);
0 rows in set. Query took 0.022 seconds.

❯ CREATE TABLE employees AS VALUES('Alice','Finance'),('Bob','Marketing');
0 rows in set. Query took 0.008 seconds.

❯ SELECT u.* FROM users u JOIN employees e ON u."column1" = e."column1" ORDER BY u."column1", e."column2";
+---------+---------+
| column1 | column2 |
+---------+---------+
| Alice   | 50      |
| Bob     | 100     |
+---------+---------+

The result is correct

@Asura7969
Copy link
Contributor

replenish:
image
Maybe we should do special processing for SortExec 🤔

@haohuaijin
Copy link
Contributor

My initial solution:

.find_map(|(index, (projected_expr, alias))| {
  projected_expr.as_any().downcast_ref::<Column>().and_then(
      |projected_column| {
          (column.index() == projected_column.index()       <--- and index comparison
              && column.name().eq(projected_column.name()))
          .then(|| {
              state = RewriteState::RewrittenValid;
              Arc::new(Column::new(alias, index)) as _
          })
      },
  )
})

use name and index(the index is column index of input schema) to identify a column, should be under the assumption that the input schema of column's plan and projection_column's plan is the same. Otherwise, some projection that can be pushed down may become unable to be pushed down. And when the schema is the same, we can just use the index to identify a column.

@DDtKey
Copy link
Contributor Author

DDtKey commented Dec 27, 2023

Why don't we consider this issue a regression and continue to release new stable versions?
This worked until version 31, since then we have 34 and the regression is ignored 🤔

note: I'm not talking about bugs in general, but about regressions, unfortunately they occur quite often and they are more dangerous, there is no trust in new versions

Thus we have the following situation:
Each version needs to code adaptation in case of incompatible changes, but it is not correct to use the new version with obvious regression. Thus, the changes are made but cannot be applied (and their number is growing)

I may be a little behind the current datafusion release policy, but I think any regression should be prioritized to release new stable patches. I understand that right now it's just on schedule, but perhaps this is time for a more strict release cycle of stable versions? (question for a separate issue ofc)

cc @alamb

@alamb alamb changed the title Ordering by joined column doesn't return results Regression: Ordering by joined column doesn't return results Dec 28, 2023
@alamb alamb added the regression Something that used to work no longer does label Dec 28, 2023
@alamb
Copy link
Contributor

alamb commented Dec 28, 2023

note: I'm not talking about bugs in general, but about regressions, unfortunately they occur quite often and they are more dangerous, there is no trust in new versions

Thank you for bringing this up -- I agree we need to prioritize regressions -- I personally missed this particular bug as a regression and thought it was a pre-existing bug. I have updated the title to reflect this and created a new tag for regressions

cc @andygrove @viirya and @ozankabak

@ozankabak
Copy link
Contributor

@DDtKey I think some people may have assumed #8485 fixed it (at least I did). You are right that such regressions should get priority and we will prioritize this.

@DDtKey
Copy link
Contributor Author

DDtKey commented Dec 28, 2023

@ozankabak thanks for pointing to the PR. Looks like I've missed that it has been merged prior to releasing 34.0.0 (and the issue has not been closed yet).

So that's my wrong assumption, sorry (to be more clear, my test still fails, but due to another issue #7931, not related to this one, gonna check additionally - it used to work in 31)
I tested MRE and this case works with the latest stable version
Though, as it's been mentioned we may have some underlying issues, but not related to this one.

@alamb
Copy link
Contributor

alamb commented Dec 28, 2023

BTW one of the longer term discussions I would like to have at #8152 and in other venues (I just haven't had time to write it down yet) is how to improve the overall "process maturity" of datafusion -- like @DDtKey points out that regressions should be prioritized, but at the moment we don't really have a mechanism to do that (or, for example, hold the release for such regressions) other than by relying on one of us to catch it manually

@DDtKey
Copy link
Contributor Author

DDtKey commented Jan 4, 2024

Should we close this issue as fixed in 34.0.0 to avoid confusion?

@ozankabak
Copy link
Contributor

@DDtKey sounds good 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working regression Something that used to work no longer does
Projects
None yet
Development

No branches or pull requests

6 participants