Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issue with inner joins on columns of type FixedSizeBinary(16) #5456

Closed
maxburke opened this issue Mar 2, 2023 · 2 comments
Closed
Labels
bug Something isn't working performance Make DataFusion faster

Comments

@maxburke
Copy link
Contributor

maxburke commented Mar 2, 2023

I've attached two parquet files. Both files contain a single column with 131072 rows, generated from Arrow with a single record batch. The fsb16.parquet file contains a column of type FixedSizeBinary(16), the ints.parquet contains a column of type Int64.

If I do an inner join, the query returns really quickly:

❯ create external table t0 stored as parquet location 'ints.parquet';
❯ select * from t0 inner join t0 as t1 on t0.ints = t1.ints;
+--------+--------+
...[snip]...
+--------+--------+
131072 rows in set. Query took 0.530 seconds.

Here is the plan for the int64 query:

❯ explain select * from t0 inner join t0 as t1 on t0.ints = t1.ints;
+---------------+----------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                             |
+---------------+----------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: t0.ints, t1.ints                                                                                                     |
|               |   Inner Join: t0.ints = t1.ints                                                                                                  |
|               |     TableScan: t0 projection=[ints]                                                                                              |
|               |     SubqueryAlias: t1                                                                                                            |
|               |       TableScan: t0 projection=[ints]                                                                                            |
| physical_plan | ProjectionExec: expr=[ints@0 as ints, ints@1 as ints]                                                                            |
|               |   CoalesceBatchesExec: target_batch_size=8192                                                                                    |
|               |     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "ints", index: 0 }, Column { name: "ints", index: 0 })] |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                |
|               |         RepartitionExec: partitioning=Hash([Column { name: "ints", index: 0 }], 8), input_partitions=8                           |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                                   |
|               |             ParquetExec: limit=None, partitions={1 group: [[Users/max/src/ul/services/ulv2/ints.parquet]]}, projection=[ints]    |
|               |       CoalesceBatchesExec: target_batch_size=8192                                                                                |
|               |         RepartitionExec: partitioning=Hash([Column { name: "ints", index: 0 }], 8), input_partitions=8                           |
|               |           RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                                   |
|               |             ParquetExec: limit=None, partitions={1 group: [[Users/max/src/ul/services/ulv2/ints.parquet]]}, projection=[ints]    |
|               |                                                                                                                                  |
+---------------+----------------------------------------------------------------------------------------------------------------------------------+

But if I do the same with the FixedSizeBinary(16) file, it takes a very long time, runs up a huge working set (seeing 170GB+ on my computer), and takes a long time. In much of my testing it runs out of memory and dies, but if it finishes it takes ~6 minutes (compared to 0.5s with the int64 columns)

❯ create external table t0 stored as parquet location 'fsb16.parquet';
❯ select * from t0 inner join t0 as t1 on t0.journey_id = t1.journey_id;
+----------------------------------+----------------------------------+
...[snip]...
+----------------------------------+----------------------------------+
358946 rows in set. Query took 356.370 seconds.

Also, I think the results are wrong; the result set should only have 131072 rows, not 358946

And the FixedSizeBinary(16) query plan:

❯ explain select * from t0 inner join t0 as t1 on t0.journey_id = t1.journey_id;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                 |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: t0.journey_id, t1.journey_id                                                                                                                             |
|               |   Inner Join:  Filter: t0.journey_id = t1.journey_id                                                                                                                 |
|               |     TableScan: t0 projection=[journey_id]                                                                                                                            |
|               |     SubqueryAlias: t1                                                                                                                                                |
|               |       TableScan: t0 projection=[journey_id]                                                                                                                          |
| physical_plan | ProjectionExec: expr=[journey_id@0 as journey_id, journey_id@1 as journey_id]                                                                                        |
|               |   RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1                                                                                               |
|               |     NestedLoopJoinExec: join_type=Inner, filter=BinaryExpr { left: Column { name: "journey_id", index: 0 }, op: Eq, right: Column { name: "journey_id", index: 1 } } |
|               |       ParquetExec: limit=None, partitions={1 group: [[Users/max/src/ul/services/ulv2/1677623589235.parquet]]}, projection=[journey_id]                               |
|               |       ParquetExec: limit=None, partitions={1 group: [[Users/max/src/ul/services/ulv2/1677623589235.parquet]]}, projection=[journey_id]                               |
|               |                                                                                                                                                                      |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+

fsb16.parquet.gz
ints.parquet.gz

@maxburke maxburke added the bug Something isn't working label Mar 2, 2023
@maxburke
Copy link
Contributor Author

maxburke commented Mar 2, 2023

One thing I should mention is that I am testing with this patch applied to Arrow because otherwise it's significantly slower in the FixedSizeBinary(16) case: apache/arrow-rs#3793

@jackwener jackwener added the performance Make DataFusion faster label Mar 3, 2023
@maxburke maxburke changed the title Performance / correctness issues with inner joins on columns of type FixedSizeBinary(16) Performance issue with inner joins on columns of type FixedSizeBinary(16) Mar 3, 2023
@maxburke
Copy link
Contributor Author

maxburke commented Mar 3, 2023

Performance issue fixed with #5461

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

2 participants