Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DataFusion scalability as more cores are added #5999

Closed
andygrove opened this issue Apr 13, 2023 · 21 comments
Closed

Improve DataFusion scalability as more cores are added #5999

andygrove opened this issue Apr 13, 2023 · 21 comments
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@andygrove
Copy link
Member

Is your feature request related to a problem or challenge?

I ran some benchmarks in constrained Docker containers and found that DataFusion is pretty close to DuckDB speed when running on a single core but does not scale as well as DuckDB when more cores are added.

At 16 cores, DuckDB was ~2.9x faster than DataFusion for this particular test.

Cores DuckDB 0.7.1 DataFusion Python 21.0.0 DuckDB x Faster
1 315,630.9 318,037.8 1.0
2 155,034.5 197,257.3 1.3
4 75,110.0 111,243.1 1.5
8 37,584.9 73,142.3 1.9
16 18,880.3 55,071.8 2.9

Describe the solution you'd like

I would like DataFusion to scale as well as DuckDB as more cores are added.

Describe alternatives you've considered

No response

Additional context

Instructions for creating the Docker images can be found at https://github.com/sql-benchmarks/sqlbench-runners

@andygrove andygrove added enhancement New feature or request performance Make DataFusion faster labels Apr 13, 2023
@alamb
Copy link
Contributor

alamb commented Apr 14, 2023

Possibly related: #5995

@Dandandan
Copy link
Contributor

Suggestion I posted in slack:

I wonder whether there might be some regressions here wrt scalability with the nr of cores - #4219 comes to mind which makes building the hashmap for the build-side done in a single thread for smaller tables
might be checked by changing datafusion.optimizer.hash_join_single_partition_threshold

@Dandandan
Copy link
Contributor

Dandandan commented Apr 14, 2023

For tpch-h q1 (SF=1) the metrics are as follows for the complete plan with metrics is as follows:

SortExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], metrics=[output_rows=4, elapsed_compute=25.042µs, spill_count=0, spilled_bytes=0]
  CoalescePartitionsExec, metrics=[output_rows=4, elapsed_compute=3.167µs, spill_count=0, spilled_bytes=0, mem_used=0]
    ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order], metrics=[output_rows=4, elapsed_compute=8.376µs, spill_count=0, spilled_bytes=0, mem_used=0]
      AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))], metrics=[output_rows=4, elapsed_compute=181.588µs, spill_count=0, spilled_bytes=0, mem_used=0]
        CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=64, elapsed_compute=350.731µs, spill_count=0, spilled_bytes=0, mem_used=0]
          RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16, metrics=[repart_time=296.707µs, fetch_time=1.652179502s, send_time=110.333µs]
            AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))], metrics=[output_rows=64, elapsed_compute=670.144742ms, spill_count=0, spilled_bytes=0, mem_used=0]
              ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(38, 4))lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows=5916591, elapsed_compute=162.828798ms, spill_count=0, spilled_bytes=0, mem_used=0]
                CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=5916591, elapsed_compute=29.656446ms, spill_count=0, spilled_bytes=0, mem_used=0]
                  FilterExec: l_shipdate@6 <= 10471, metrics=[output_rows=5916591, elapsed_compute=54.707373ms, spill_count=0, spilled_bytes=0, mem_used=0]
                    ParquetExec: limit=None, partitions={16 groups: [[...]]}, predicate=l_shipdate <= Date32("10471"), pruning_predicate=l_shipdate_min@0 <= 10471, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], metrics=[output_rows=6001215, elapsed_compute=16ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, predicate_evaluation_errors=0, bytes_scanned=44132502, num_predicate_creation_errors=0, pushdown_rows_filtered=0, page_index_rows_filtered=0, pushdown_eval_time=32ns, time_elapsed_scanning_until_data=493.033123ms, time_elapsed_scanning_total=1.643740915s, time_elapsed_processing=290.136543ms, time_elapsed_opening=6.494334ms, page_index_eval_time=32ns]

Relevant metrics:

  • ParquetExec time_elapsed_scanning_until_data=493.033123ms
  • ParquetExec time_elapsed_scanning_total=1.643740915s
  • ParquetExec time_elapsed_processing=290.136543ms
  • ParquetExec time_elapsed_opening=6.494334ms
  • FilterExec elapsed_compute=54.707373ms
  • CoalesceBatchesExec elapsed_compute=29.656446ms
  • ProjectionExec elapsed_compute=162.828798ms
  • AggregateExec elapsed_compute=670.144742ms

@andygrove
Copy link
Member Author

andygrove commented Apr 17, 2023

I ran the benchmarks with a simpler version of q1 with just the table scan and filter:

select
    *
from
	lineitem
where
	l_shipdate <= date '1998-12-01' - interval '113 days';

Here are the results:

Cores DataFusion Python 22.0.0 DuckDB DuckDB x Times Faster
1 14376 15000.9 0.96
2 7547.9 7915.7 0.95
4 4243.9 3850.7 1.10
8 2655.8 1869.3 1.42
16 2096.6 954.9 2.20
32 2252.2 531.7 4.24

@andygrove
Copy link
Member Author

andygrove commented Apr 17, 2023

Here are the results for just the table scan:

select
    *
from
	lineitem;
Cores DataFusion Python 22.0.0 DuckDB DuckDB x Times Faster
1 14501.3 14751 0.98
2 7562.5 8135.8 0.93
4 4508.7 3618.6 1.25
8 2363.2 1837.5 1.29
16 2135.4 888 2.40
32 2355 511.4 4.61

This screenshot shows CPU usage with the DataFusion run on the left and the DuckDB run on the right, both using 32 cores.

Screenshot from 2023-04-17 08-22-12

Here is the directory listing of my lineitem directory:

$ ls -l /mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/
total 2134532
-rw-rw-r-- 1 andy andy 91100218 Feb  3 08:45 part-0.parquet
-rw-rw-r-- 1 andy andy 90998179 Feb  3 08:46 part-10.parquet
-rw-rw-r-- 1 andy andy 91113498 Feb  3 08:46 part-11.parquet
-rw-rw-r-- 1 andy andy 90979631 Feb  3 08:46 part-12.parquet
-rw-rw-r-- 1 andy andy 91042031 Feb  3 08:46 part-13.parquet
-rw-rw-r-- 1 andy andy 91013004 Feb  3 08:46 part-14.parquet
-rw-rw-r-- 1 andy andy 91102199 Feb  3 08:46 part-15.parquet
-rw-rw-r-- 1 andy andy 91113429 Feb  3 08:46 part-16.parquet
-rw-rw-r-- 1 andy andy 91057291 Feb  3 08:46 part-17.parquet
-rw-rw-r-- 1 andy andy 90968073 Feb  3 08:46 part-18.parquet
-rw-rw-r-- 1 andy andy 91085729 Feb  3 08:47 part-19.parquet
-rw-rw-r-- 1 andy andy 91065849 Feb  3 08:45 part-1.parquet
-rw-rw-r-- 1 andy andy 90977725 Feb  3 08:47 part-20.parquet
-rw-rw-r-- 1 andy andy 91092444 Feb  3 08:47 part-21.parquet
-rw-rw-r-- 1 andy andy 91114853 Feb  3 08:47 part-22.parquet
-rw-rw-r-- 1 andy andy 91029929 Feb  3 08:47 part-23.parquet
-rw-rw-r-- 1 andy andy 91139350 Feb  3 08:46 part-2.parquet
-rw-rw-r-- 1 andy andy 91167180 Feb  3 08:46 part-3.parquet
-rw-rw-r-- 1 andy andy 91083996 Feb  3 08:46 part-4.parquet
-rw-rw-r-- 1 andy andy 91024700 Feb  3 08:46 part-5.parquet
-rw-rw-r-- 1 andy andy 91022201 Feb  3 08:46 part-6.parquet
-rw-rw-r-- 1 andy andy 91101885 Feb  3 08:46 part-7.parquet
-rw-rw-r-- 1 andy andy 91198702 Feb  3 08:46 part-8.parquet
-rw-rw-r-- 1 andy andy 91120030 Feb  3 08:46 part-9.parquet

@andygrove
Copy link
Member Author

@Dandandan I added more benchmark results. The issues appear to be related to the ParquetExec.

@Dandandan
Copy link
Contributor

Dandandan commented Apr 18, 2023

I could also replicate the issue of non-perfect scaling with loading the tables in memory.

One thing I noticed is that the current round-robin RepartitionExec doesn't spread the batches evenly over the number of output channels, which can already be seen in MemoryExec itself:

MemoryExec: partitions=32, partition_sizes=[32, 32, 32, 32, 32, 32, 32, 32, 26, 26, 26, 25, 25, 25, 25, 25, 25, 25, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16], metrics=[]

It has a bias for the starting partitions as the outputs always go first to those channels.

@Dandandan
Copy link
Contributor

Also, I wonder if DistributionSender / DistributionReceiver didn't introduce any regressions here FYI @crepererum

@crepererum
Copy link
Contributor

Also, I wonder if DistributionSender / DistributionReceiver didn't introduce any regressions here FYI @crepererum

Definitely possible, even if it's only due to changes it triggers in tokio's scheduler. On the other hand, tokio's scheduler isn't really optimized for the DataFusion use case in the first place (as @tustvold probably pointed out before).

@cristian-ilies-vasile
Copy link

There is a small application written in Rust called odbc2parquet that reads data from databases and exports parquet files.
For testing on dummy data I think it is useful.

https://github.com/pacman82/odbc2parquet

@andygrove
Copy link
Member Author

Here are latest results with DataFusion Python 23, which fixes a bug around how the Tokio runtime was being managed.

Cores DuckDB 0.7.1 DataFusion Python 21.0.0 DataFusion Python 23.0.0 DuckDB x Faster
1 315,631 318,038 218,604 0.7
2 155,035 197,257 142,480 0.9
4 75,110 111,243 93,804 1.2
8 37,585 73,142 58,083 1.5
16 18,880 55,072 45,016 2.4

image

@Dandandan
Copy link
Contributor

Dandandan commented Apr 25, 2023

Interestingly, the speedup seems bigger for fewer cores.

There have been quite a few improvement for Q1 between 21 and 23 for aggregates and casts, that might explain most of the difference.

@alamb
Copy link
Contributor

alamb commented Jul 12, 2023

I believe #6929 from @YjyJeff will perhaps help with this

@andygrove
Copy link
Member Author

Latest results with DF 33

image

@alamb
Copy link
Contributor

alamb commented Nov 27, 2023

I believe that #6937 (basically that the fact we are doing much more work as the cores go up (as each group gets processed in each core) contributes to this problem

@andygrove
Copy link
Member Author

image

@andygrove
Copy link
Member Author

I produced the chart using scripts at https://github.com/sql-benchmarks/sqlbench-runners/blob/main/scripts

@comphead
Copy link
Contributor

@sunchao @viirya cc ^^^^

@andygrove
Copy link
Member Author

The regression between 32 and 33 seems to be caused by my "improvement" to filter statistics, although we saw that others saw big improvements with 33 (such as https://x.com/mim_djo/status/1725510410567307702?s=46&t=8JleG5FR5SAVU1vl35CPLg) so maybe this is environment specific. I will continue to debug this in the next few days.

$ git bisect bad
6fe00ce2e30d2af2b64d3a97b877a96109c215f9 is the first bad commit
commit 6fe00ce2e30d2af2b64d3a97b877a96109c215f9
Author: Andy Grove <andygrove73@gmail.com>
Date:   Sun Nov 12 01:41:15 2023 -0700

    Fix join order for TPCH Q17 & Q18 by improving FilterExec statistics (#8126)

@Dandandan
Copy link
Contributor

I wonder if this is just bad configuration?
I created a PR to update the DF configurations to use the defaults.
datafusion-contrib/sqlbench-runners#31

@andygrove
Copy link
Member Author

@Dandandan was correct. It was the configs that were the issue. Here is a run of just versions 32 and 33 using default configs.

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

6 participants