Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor reported performance of DataFusion against DuckDB and Hyper #5942

Closed
alamb opened this issue Apr 10, 2023 · 39 comments
Closed

Poor reported performance of DataFusion against DuckDB and Hyper #5942

alamb opened this issue Apr 10, 2023 · 39 comments
Labels
bug Something isn't working help wanted Extra attention is needed performance Make DataFusion faster

Comments

@alamb
Copy link
Contributor

alamb commented Apr 10, 2023

Describe the bug

There is a blog post that reports relatively poor performance of DataFusion compared to DuckDB and Hyper:

https://www.architecture-performance.fr/ap_blog/tpc-h-benchmark-of-hyper-duckdb-and-datafusion-on-parquet-files/

To Reproduce

I would like someone to try and reproduce the DataFusion performance reported in the blog and propose ways to improve the performance of DataFusion (perhaps by enabling some of the options that are off by default)

Expected behavior

No response

Additional context

@Dandandan suggests on slack that with parallelized scan of a parquet file this benchmark may go faster

@alamb alamb added bug Something isn't working performance Make DataFusion faster help wanted Extra attention is needed labels Apr 10, 2023
@andygrove
Copy link
Member

There was a question on slack asking if the Python bindings enable parallel scans.

The Python SessionContext constructor is here. It uses SessionConfig::default if no config is provided, so yes, parallel scans should be enabled if a recent version is being used.

https://github.com/apache/arrow-datafusion-python/blob/main/src/context.rs#L216-L233

@Dandandan
Copy link
Contributor

Looking at https://github.com/apache/arrow-datafusion/blob/main/dev/changelog/19.0.0.md , it should be enabled since 19.0.0, while the article mentions it's using 20.0.0

@Dandandan
Copy link
Contributor

FYI @djfrancesco we are looking into your article.
Is there any code you could share to replicate the results?

@djfrancesco
Copy link

@Dandandan thanks for your interest. I just created a public repo with the code used for the post : https://github.com/aetperf/sql_olap_bench
I am sorry because the code is a little messy... I might have use DataFusion in a wrong way. I tried several config options but without success. I write these blog posts mostly as notes to myself. I would be glad to update the post with better timings. The main command is the following:

python tpch_bench.py -d path_to_the_folder_tpch_100

It uses a function called run_queries_datafusion_on_parquet from the bench_tools module.

@alamb
Copy link
Contributor Author

alamb commented Apr 10, 2023

I am sorry because the code is a little messy... I might have use DataFusion in a wrong way. I tried several config options but without success. I write these blog posts mostly as notes to myself. I would be glad to update the post with better timings. The main command is the following:

No worries -- clearly if you can't get good performance out of DataFusion we have some things to improve (even if it is only UX).

Thank you for the article

@ozankabak
Copy link
Contributor

Anybody has a preliminary idea why the figures look this bad? Mostly config related, algorithmic issues or code structure/performance related?

@alippai
Copy link
Contributor

alippai commented Apr 11, 2023

Related older post (in a sense that trying Datafusion with defaults yields questionable results): #5141

@mingmwang
Copy link
Contributor

Looks like the blog is broken, I can not open it now.

@andygrove
Copy link
Member

andygrove commented Apr 12, 2023

I took a quick look at the code, and have two observations:

  1. The configs being created are not actually being used anywhere i.e. they are not being passed into the context
  2. There is a comment that DataFusion does not support multiple Parquet files for a table, which is not true (unless this isn't available via the Python bindings?), but maybe this means the DataFusion tests are not comparable to the other tests if they are being run against one parquet file per table?

@andygrove
Copy link
Member

andygrove commented Apr 12, 2023

Here is an example of configuring a context in Python:

    runtime = (
        RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
    )
    config = (
        SessionConfig()
        .with_create_default_catalog_and_schema(True)
        .with_default_catalog_and_schema("foo", "bar")
        .with_target_partitions(1)
        .with_information_schema(True)
        .with_repartition_joins(False)
        .with_repartition_aggregations(False)
        .with_repartition_windows(False)
        .with_parquet_pruning(False)
        .set("datafusion.execution.parquet.pushdown_filters", "true")
    )

    ctx = SessionContext(config, runtime)

@andygrove
Copy link
Member

I created a quick PR to improve the docs around creating a context and will spend more time on this over the next few days. The UX could be improved.

apache/datafusion-python#321

@djfrancesco
Copy link

Thanks for your help @andygrove! I tried versions 20 and 21 and got this Error when using a SessionConfig instance:

      config.set("datafusion.execution.parquet.pushdown_filters", "true")
      ^^^^^^^^^^
  AttributeError: 'datafusion.SessionConfig' object has no attribute 'set'

Maybe it is a new feature ?

@djfrancesco
Copy link

Sorry @mingmwang we have some troubles with Wordpress sometimes... Here is another version : https://aetperf.github.io/2023/03/30/TPC-H-benchmark-of-Hyper,-DuckDB-and-Datafusion-on-Parquet-files.html

@andygrove
Copy link
Member

Maybe it is a new feature ?

Yes, sorry .. the set feature is not yet released. However, you can also pass a dict into the SessionConfig constructor, like this:

config = (
    SessionConfig({ 'datafusion.execution.parquet.pushdown_filters': 'true' })
    .with_create_default_catalog_and_schema(True)
    .with_default_catalog_and_schema("foo", "bar")
    .with_target_partitions(8)
    .with_information_schema(True)
    .with_repartition_joins(False)
    .with_repartition_aggregations(False)
    .with_repartition_windows(False)
    .with_parquet_pruning(False)
)

@yahoNanJing
Copy link
Contributor

Hi @djfrancesco, how many partitions do you set for each engine? Are they the same? And could you share the cpu usage?

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

FWIW I am running the scripts in https://github.com/aetperf/sql_olap_bench now on my own machine (they are very nicely written @djfrancesco ) and will report back any interesting findings

My first plan is to reproduce the datafusion query performance results using datafusion python and datafusion cli directly and see if they are comparable

@djfrancesco
Copy link

@yahoNanJing I have the default settings regarding partitions, for each engine. Here is a graph of CPU usage in the case of scale factor 10:
image

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

So I have two interesting pieces of information:

  1. I can reproduce the reported performance difference on my 8 core cloud machine

I also see DataFusion using only a single core

+--------------+--------------+----------+-----------------+-------------------+---------------------+--------------------+--------------+----------+-------------+
| l_returnflag | l_linestatus | sum_qty  | sum_base_price  | sum_disc_price    | sum_charge          | avg_qty            | avg_price    | avg_disc | count_order |
+--------------+--------------+----------+-----------------+-------------------+---------------------+--------------------+--------------+----------+-------------+
| A            | F            | 37734107 | 56586554400.73  | 53758257134.8700  | 55909065222.827692  | 25.522005853257337 | 38273.129734 | 0.049985 | 1478493     |
| N            | F            | 991417   | 1487504710.38   | 1413082168.0541   | 1469649223.194375   | 25.516471920522985 | 38284.467760 | 0.050093 | 38854       |
| N            | O            | 74476040 | 111701729697.74 | 106118230307.6056 | 110367043872.497010 | 25.50222676958499  | 38249.117988 | 0.049996 | 2920374     |
| R            | F            | 37719753 | 56568041380.90  | 53741292684.6040  | 55889619119.831932  | 25.50579361269077  | 38250.854626 | 0.050009 | 1478870     |
+--------------+--------------+----------+-----------------+-------------------+---------------------+--------------------+--------------+----------+-------------+
4 rows in set. Query took 2.841 seconds.
❯

However, when I run datafusion against "datafusion created" parquet files from https://github.com/apache/arrow-datafusion/tree/main/benchmarks it is 3x faster though much less fast than hyper (like 100ms vs 928ms)

alamb@aal-dev:~/tpch_data/parquet_data_SF1$ datafusion-cli -f ~/sql_olap_bench/q1.txt
+--------------+--------------+-------------+-----------------+-------------------+---------------------+-----------+--------------+----------+-------------+
| l_returnflag | l_linestatus | sum_qty     | sum_base_price  | sum_disc_price    | sum_charge          | avg_qty   | avg_price    | avg_disc | count_order |
+--------------+--------------+-------------+-----------------+-------------------+---------------------+-----------+--------------+----------+-------------+
| A            | F            | 37734107.00 | 56586554400.73  | 53758257134.8700  | 55909065222.827692  | 25.522005 | 38273.129734 | 0.049985 | 1478493     |
| N            | F            | 991417.00   | 1487504710.38   | 1413082168.0541   | 1469649223.194375   | 25.516471 | 38284.467760 | 0.050093 | 38854       |
| N            | O            | 74476040.00 | 111701729697.74 | 106118230307.6056 | 110367043872.497010 | 25.502226 | 38249.117988 | 0.049996 | 2920374     |
| R            | F            | 37719753.00 | 56568041380.90  | 53741292684.6040  | 55889619119.831932  | 25.505793 | 38250.854626 | 0.050009 | 1478870     |
+--------------+--------------+-------------+-----------------+-------------------+---------------------+-----------+--------------+----------+-------------+
4 rows in set. Query took 0.928 seconds.

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

The explain plan clearly shows DataFusion trying to parallelize the scan:

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: lineitem.parquet.l_returnflag ASC NULLS LAST, lineitem.parquet.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |   Projection: lineitem.parquet.l_returnflag, lineitem.parquet.l_linestatus, SUM(lineitem.parquet.l_quantity) AS sum_qty, SUM(lineitem.parquet.l_extendedprice) AS sum_base_price, SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount) AS sum_disc_price, SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount * Int64(1) + lineitem.parquet.l_tax) AS sum_charge, AVG(lineitem.parquet.l_quantity) AS avg_qty, AVG(lineitem.parquet.l_extendedprice) AS avg_price, AVG(lineitem.parquet.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |     Aggregate: groupBy=[[lineitem.parquet.l_returnflag, lineitem.parquet.l_linestatus]], aggr=[[SUM(lineitem.parquet.l_quantity), SUM(lineitem.parquet.l_extendedprice), SUM(CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))lineitem.parquet.l_discountDecimal128(Some(100),23,2)CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4))lineitem.parquet.l_extendedprice AS lineitem.parquet.l_extendedprice * Decimal128(Some(100),23,2) - lineitem.parquet.l_discount) AS SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount), SUM(CAST(CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))lineitem.parquet.l_discountDecimal128(Some(100),23,2)CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4))lineitem.parquet.l_extendedprice AS lineitem.parquet.l_extendedprice * Decimal128(Some(100),23,2) - lineitem.parquet.l_discount AS Decimal128(38, 6)) * CAST(Decimal128(Some(100),23,2) + CAST(lineitem.parquet.l_tax AS Decimal128(23, 2)) AS Decimal128(38, 6))) AS SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount * Int64(1) + lineitem.parquet.l_tax), AVG(lineitem.parquet.l_quantity), AVG(lineitem.parquet.l_extendedprice), AVG(lineitem.parquet.l_discount), COUNT(UInt8(1))]] |
|               |       Projection: CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))lineitem.parquet.l_discountDecimal128(Some(100),23,2)CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4))lineitem.parquet.l_extendedprice, lineitem.parquet.l_quantity, lineitem.parquet.l_extendedprice, lineitem.parquet.l_discount, lineitem.parquet.l_tax, lineitem.parquet.l_returnflag, lineitem.parquet.l_linestatus                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
|               |         Filter: lineitem.parquet.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |           TableScan: lineitem.parquet projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.parquet.l_shipdate <= Date32("10471")]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.parquet.l_quantity)@2 as sum_qty, SUM(lineitem.parquet.l_extendedprice)@3 as sum_base_price, SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount)@4 as sum_disc_price, SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount * Int64(1) + lineitem.parquet.l_tax)@5 as sum_charge, AVG(lineitem.parquet.l_quantity)@6 as avg_qty, AVG(lineitem.parquet.l_extendedprice)@7 as avg_price, AVG(lineitem.parquet.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.parquet.l_quantity), SUM(lineitem.parquet.l_extendedprice), SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount), SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount * Int64(1) + lineitem.parquet.l_tax), AVG(lineitem.parquet.l_quantity), AVG(lineitem.parquet.l_extendedprice), AVG(lineitem.parquet.l_discount), COUNT(UInt8(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 8), input_partitions=8                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |             AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.parquet.l_quantity), SUM(lineitem.parquet.l_extendedprice), SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount), SUM(lineitem.parquet.l_extendedprice * Int64(1) - lineitem.parquet.l_discount * Int64(1) + lineitem.parquet.l_tax), AVG(lineitem.parquet.l_quantity), AVG(lineitem.parquet.l_extendedprice), AVG(lineitem.parquet.l_discount), COUNT(UInt8(1))]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |               ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))CAST(lineitem.parquet.l_discount AS Decimal128(23, 2))lineitem.parquet.l_discountDecimal128(Some(100),23,2)CAST(lineitem.parquet.l_extendedprice AS Decimal128(38, 4))lineitem.parquet.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |                   FilterExec: l_shipdate@6 <= 10471                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |                     ParquetExec: limit=None, partitions={8 groups: [[home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:0..20797508], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:20797508..41595016], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:41595016..62392524], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:62392524..83190032], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:83190032..103987540], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:103987540..124785048], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:124785048..145582556], [home/alamb/tpch_data/parquet_data_SF1/lineitem.parquet:145582556..166380062]]}, predicate=l_shipdate@10 <= 10471, pruning_predicate=l_shipdate_min@0 <= 10471, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Here is the duckdb generated schema:

❯  create external table lineitem stored as parquet location 'lineitem.parquet';
0 rows in set. Query took 0.003 seconds.
❯ describe lineitem;
+-----------------+-------------------+-------------+
| column_name     | data_type         | is_nullable |
+-----------------+-------------------+-------------+
| l_orderkey      | Int32             | YES         |
| l_partkey       | Int32             | YES         |
| l_suppkey       | Int32             | YES         |
| l_linenumber    | Int32             | YES         |
| l_quantity      | Int32             | YES         |
| l_extendedprice | Decimal128(15, 2) | YES         |
| l_discount      | Decimal128(15, 2) | YES         |
| l_tax           | Decimal128(15, 2) | YES         |
| l_returnflag    | Utf8              | YES         |
| l_linestatus    | Utf8              | YES         |
| l_shipdate      | Date32            | YES         |
| l_commitdate    | Date32            | YES         |
| l_receiptdate   | Date32            | YES         |
| l_shipinstruct  | Utf8              | YES         |
| l_shipmode      | Utf8              | YES         |
| l_comment       | Utf8              | YES         |
+-----------------+-------------------+-------------+
16 rows in set. Query took 0.001 seconds.

Here is the datafusion generated schema:

❯ describe lineitem;
+-----------------+-------------------+-------------+
| column_name     | data_type         | is_nullable |
+-----------------+-------------------+-------------+
| l_orderkey      | Int64             | NO          |
| l_partkey       | Int64             | NO          |
| l_suppkey       | Int64             | NO          |
| l_linenumber    | Int32             | NO          |
| l_quantity      | Decimal128(15, 2) | NO          |
| l_extendedprice | Decimal128(15, 2) | NO          |
| l_discount      | Decimal128(15, 2) | NO          |
| l_tax           | Decimal128(15, 2) | NO          |
| l_returnflag    | Utf8              | NO          |
| l_linestatus    | Utf8              | NO          |
| l_shipdate      | Date32            | NO          |
| l_commitdate    | Date32            | NO          |
| l_receiptdate   | Date32            | NO          |
| l_shipinstruct  | Utf8              | NO          |
| l_shipmode      | Utf8              | NO          |
| l_comment       | Utf8              | NO          |
+-----------------+-------------------+-------------+

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

So it seems to me there are two follow ups to explore:

  1. Why is datafusion only using one core on this single large file?
  2. Why is the time different with the two files?

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

I will write a smaller self contained reproducer for the "slow query on a single file" and maybe someone clever can look at that.

@tustvold
Copy link
Contributor

tustvold commented Apr 13, 2023

I wonder if running parquet-layout against the parquet file might prove insightful.

DataFusion is currently limited to row group level parallelism, and there certainly are parquet writers that write very large row groups which would cause issues for this - apache/arrow#34280. Longer-term I would like to eventually get back to #2504 but that is not likely in the next couple of months.

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

The layout is on: #5995 (comment)

@alippai
Copy link
Contributor

alippai commented Apr 13, 2023

So it seems to me there are two follow ups to explore:

  1. Why is datafusion only using one core on this single large file?
  2. Why is the time different with the two files?
  1. Why Decimal128 is introduced for l_quantity in Datafusion but DuckDB uses int32?

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

Why Decimal128 is introduced for l_quantity in Datafusion but DuckDB uses int32?

I do not know -- the parquet files for these tests are created within duckdb somehow (it looks like maybe they have included the dbgen data generator in their system natively)

@alamb
Copy link
Contributor Author

alamb commented Apr 13, 2023

Filed #5995 with a reproducer for the "one core scanning" issue

@djfrancesco
Copy link

the parquet files for these tests are created within duckdb somehow (it looks like maybe they have included the dbgen data generator in their system natively)

Yes I used the DuckDB tpch extension : https://github.com/duckdb/duckdb/tree/master/extension/tpch
Each table is exported as a Parquet file using a COPY ( ) TO, with row group size 122880 (default in DuckDB) and snappy compression

@tustvold
Copy link
Contributor

tustvold commented Apr 13, 2023

#5997 should help with this, the file range logic was partitioning row groups based on the location of their ColumnMetadata, which is normally written at the end of a ColumnChunk, not their actual data. This causes issues because duckdb appears to not write the un-inlined metadata and instead just writes a file offset of 0, causing all the row groups to be placed in the first partition.

Technically this is probably not spec-compliant of DuckDB, but it's more reliable for us to partition based on where the actual page data is anyway.

@alippai
Copy link
Contributor

alippai commented Apr 13, 2023

#5997 should help with this, the file range logic was partitioning row groups based on the location of their ColumnMetadata, which is normally written at the end of a ColumnChunk, not their actual data. This causes issues because duckdb appears to not write the un-inlined metadata and instead just writes a file offset of 0, causing all the row groups to be placed in the first partition.

Technically this is probably not spec-compliant of DuckDB, but it's more reliable for us to partition based on where the actual page data is anyway.

@Mytherin maybe this is interesting for you

@mingmwang
Copy link
Contributor

So it seems to me there are two follow ups to explore:

  1. Why is datafusion only using one core on this single large file?
  2. Why is the time different with the two files?
  1. Why Decimal128 is introduced for l_quantity in Datafusion but DuckDB uses int32?

I think l_quantity should be of type Decimal. I checked the schema generated by SparkSQL tpch tool.

col_name data_type comment
l_orderkey bigint null
l_partkey bigint null
l_suppkey bigint null
l_linenumber int null
l_quantity decimal(12,2) null
l_extendedprice decimal(12,2) null
l_discount decimal(12,2) null
l_tax decimal(12,2) null
l_returnflag string null
l_linestatus string null
l_commitdate date null
l_receiptdate date null
l_shipinstruct string null
l_shipmode string null
l_comment string null
l_shipdate date null

@djfrancesco
Copy link

I think l_quantity should be of type Decimal. I checked the schema generated by SparkSQL tpch tool.

It is also what is specified in the TPC-H document pdf:

Column Name Datatype Requirements
L_QUANTITY decimal

@alippai
Copy link
Contributor

alippai commented Apr 14, 2023

Thanks, this is good news 🎉

@r4ntix
Copy link
Contributor

r4ntix commented Apr 15, 2023

I wonder if running parquet-layout against the parquet file might prove insightful.

DataFusion is currently limited to row group level parallelism, and there certainly are parquet writers that write very large row groups which would cause issues for this - apache/arrow#34280. Longer-term I would like to eventually get back to #2504 but that is not likely in the next couple of months.

The flexibility of the parquet file causes different Writers to use different file generation strategies. The data in a Parquet file can be spread over the row groups and the pages using any encoding and compression the writer or user wants.

If the physical layout of the parquet file affects the way different query engines scan, should we introduce a standard TPC-H Parquet file and re-run the performance comparison test?

I also saw this issue in this paper: https://dl.gi.de/bitstream/handle/20.500.12116/40316/B3-1.pdf?sequence=1&isAllowed=y

we look at three different Parquet writers to show how much Parquet files differ even though they store the same data. Parquet Writer Comparison:

Generator Rows per Row Group Pages per Row Group File Sizes(SF1,SF10,SF100)
Spark 3,000,000 150 192 MB, 2.1 GB, 20 GB
Spark uncompressed 3,000,000 150 333 MB, 3.3 GB, 33 GB
DuckDB 100,352 1 281 MB, 2.8 GB, 28 GB
Arrow 67,108,864 15 - 1800 189 MB, 2.0 GB, 20 GB

For each generator, we measure the number of rows and the number of pages that are stored per row group. The Spark and DuckDB Parquet writers store a fixed number of elements per page and a fixed number of pages per row group. Since Parquet does not force synchronization between the column chunks, there are writers such as Arrow that do not store the same number of elements per page. Arrow uses a fixed data page size between roughly 0.5MB and 1 MB. For DuckDB and Spark, the page sizes vary from 0.5 MB to 6 MB.

Even though we only cover three different Parquet writers, we have already observed two extremes. DuckDB and Arrow do not take advantage of the hierarchical data layout: DuckDB will only use one page per row group, and Arrow stores the entire dataset in one row group for scale factor 1 and 10 since each row group stores 67 million rows.

@r4ntix
Copy link
Contributor

r4ntix commented Apr 15, 2023

I think l_quantity should be of type Decimal. I checked the schema generated by SparkSQL tpch tool.

It is also what is specified in the TPC-H document pdf:

Column Name Datatype Requirements
L_QUANTITY decimal

In the TPC-H documentation, l_quantity is defined as a decimal type.
However, duckdb incorrectly defines it as an INTEGER type, as can be seen in the duckdb source code: https://github.com/duckdb/duckdb/blob/eaf507009fd06573d74cc5742a8643481883a0ff/extension/tpch/dbgen/dbgen.cpp#L405-L415

const char *LineitemInfo::Columns[] = {"l_orderkey",    "l_partkey",       "l_suppkey",  "l_linenumber",
                                       "l_quantity",    "l_extendedprice", "l_discount", "l_tax",
                                       "l_returnflag",  "l_linestatus",    "l_shipdate", "l_commitdate",
                                       "l_receiptdate", "l_shipinstruct",  "l_shipmode", "l_comment"};
const LogicalType LineitemInfo::Types[] = {
    LogicalType(LogicalTypeId::INTEGER), LogicalType(LogicalTypeId::INTEGER), LogicalType(LogicalTypeId::INTEGER),
    LogicalType(LogicalTypeId::INTEGER), LogicalType(LogicalTypeId::INTEGER), LogicalType::DECIMAL(15, 2),
    LogicalType::DECIMAL(15, 2),         LogicalType::DECIMAL(15, 2),         LogicalType(LogicalTypeId::VARCHAR),
    LogicalType(LogicalTypeId::VARCHAR), LogicalType(LogicalTypeId::DATE),    LogicalType(LogicalTypeId::DATE),
    LogicalType(LogicalTypeId::DATE),    LogicalType(LogicalTypeId::VARCHAR), LogicalType(LogicalTypeId::VARCHAR),
    LogicalType(LogicalTypeId::VARCHAR)};

@tustvold
Copy link
Contributor

tustvold commented Apr 15, 2023

Arrow stores the entire dataset in one row group for scale factor 1 and 10 since each row group stores 67 million rows

FWIW this was a bug and should be fixed by apache/arrow#34280.

DuckDB not making use of the hierarchical layout is perplexing, I wonder if this was an intentional design decision, it certainly isn't in the spirit of the specification

@alamb
Copy link
Contributor Author

alamb commented Apr 16, 2023

DuckDB not making use of the hierarchical layout is perplexing, I wonder if this was an intentional design decision, it certainly isn't in the spirit of the specification

I don't understand this statement

This is my understanding of the difference:

┌──────────────────────────────────┐         ┌──────────────────────────────────┐
│        RowGroup Metadata         │         │        RowGroup Metadata         │
│                                  │         │                                  │
├──────────────────────────────────┤         ├──────────────────────────────────┤
│ ┌───────────────────────────────┐│         │        RowGroup Metadata         │
│ │                               ││         │                                  │
│ └───────────────────────────────┘│         ├──────────────────────────────────┤
│ ┌───────────────────────────────┐│         │        RowGroup Metadata         │
│ │                               ││         │                                  │
│ └───────────────────────────────┘│         └──────────────────────────────────┘
│ ┌───────────────────────────────┐│         ┌──────────────────────────────────┐
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│         RowGroup Data            │         │ └───────────────────────────────┘│
└──────────────────────────────────┘         │ ┌───────────────────────────────┐│
┌──────────────────────────────────┐         │ │                               ││
│        RowGroup Metadata         │         │ └───────────────────────────────┘│
│                                  │         │ ┌───────────────────────────────┐│
├──────────────────────────────────┤         │ │                               ││
│ ┌───────────────────────────────┐│         │ └───────────────────────────────┘│
│ │                               ││         │         RowGroup Data            │
│ └───────────────────────────────┘│         └──────────────────────────────────┘
│ ┌───────────────────────────────┐│         ┌──────────────────────────────────┐
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│ ┌───────────────────────────────┐│         │ └───────────────────────────────┘│
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│         RowGroup Data            │         │ └───────────────────────────────┘│
└──────────────────────────────────┘         │ ┌───────────────────────────────┐│
┌──────────────────────────────────┐         │ │                               ││
│        RowGroup Metadata         │         │ └───────────────────────────────┘│
│                                  │         │         RowGroup Data            │
├──────────────────────────────────┤         └──────────────────────────────────┘
│ ┌───────────────────────────────┐│         ┌──────────────────────────────────┐
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│ ┌───────────────────────────────┐│         │ └───────────────────────────────┘│
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│ ┌───────────────────────────────┐│         │ └───────────────────────────────┘│
│ │                               ││         │ ┌───────────────────────────────┐│
│ └───────────────────────────────┘│         │ │                               ││
│         RowGroup Data            │         │ └───────────────────────────────┘│
└──────────────────────────────────┘         │         RowGroup Data            │
                                             └──────────────────────────────────┘
                                                                                 
                                                                                 
               Arrow                                        DuckDB               
                                                                                 
                                                                                 

Wouldn't the DuckDB implementation actually potentially be better (as it allows the metadata to be fetched with one contiguous IO.

Perhaps I am mis understanding

@tustvold
Copy link
Contributor

tustvold commented Apr 16, 2023

I don't understand this statement

Was in response to a single page per row group, not the fact it also has a dubious interpretation of the statistics specification. Inlining statistics makes a lot of sense, single page per row group seems to sacrifice a lot

@alamb
Copy link
Contributor Author

alamb commented May 8, 2023

Proposal for making the settings "fast by default": #6287

@alamb
Copy link
Contributor Author

alamb commented Jun 19, 2024

I think we have addressed this particular concern a long time ago -- we can work on additional performance improvements as we find them. CLosing this one as I don't think it is actionable now

@alamb alamb closed this as completed Jun 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working help wanted Extra attention is needed performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

10 participants