Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask-expr benchmarking post #174

Open
wants to merge 2 commits into
base: gh-pages
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions _posts/2023-10-05-dask-expr-tpch-dask.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
---
layout: post
date: 2023-10-05
tags: [dask, query optimizer, performance, benchmarks]
author: JPatrick Hoefler
theme: twitter
canonical_url: https://blog.coiled.io/blog/dask-expr-tpch-dask.html
---

# TPC-H Benchmarks for Query Optimization with Dask Expressions

<a href="/images/dask_expr/tpch-comparison-dask-confidence.png">
<img src="/images/dask_expr/tpch-comparison-dask-confidence.png"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This figure is never referred to in the text, did I miss that?
    I think there should be a small paragraph or caption explaining what the figure shows. This figure was also the least intuitive for me to understand out of the three, so I think it's especially important to explain. (i.e. what is the x-axis, it's the confidence threshold of what? Of the wall clock time? That's the tab that is selected, but the graph itself doesn't have the axis labelled with what it actually measured)
  2. This figure and text really belong in the results section. I think it's here because you want the readers to see something pretty when they click on the blog post. I think it's ok to have a copy of a pretty figure used as an illustration up the top (but you still need the figure and text to exist in the results section below).

width="70%"
alt=""></a>

[Dask-expr](https://github.com/dask-contrib/dask-expr) is an ongoing effort to add a
[logical query optimization layer](https://medium.com/coiled-hq/high-level-query-optimization-in-dask-995640564ed7) to Dask DataFrames.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was the canonical link for that post: https://blog.coiled.io/blog/dask-expr-introduction.html

We now have the first benchmark results to share that were run against the current DataFrame
implementation.

Dask-expr is up to 3 times faster and more memory efficient in its current state than the status quo.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"status quo" is ambiguous here. Can we update this sentence to be more specific?

The first paragraph in the results section suggests it should be "compared to conventional dask dataframes" (instead of "compared to the previous version of dask-expr" or "compared to other software tools designed for this task", etc.). Is that correct?

This is a very promising result for our future development efforts.

## Results

We are comparing Dask 2023.09.02 with the main branch of Dask-expr. Both implementations
will use the [P2P shuffling algorithm](https://medium.com/coiled-hq/shuffling-large-data-at-constant-memory-in-dask-bb683e92d70b). The results were produced on 100GB of data, e.g. scale
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably best to link to articles directly at https://blog.coiled.io, rather than the Medium mirror.

100 for the [TPC-H](https://www.tpc.org/tpch/) benchmarks. The cluster had 20 workers with 16GB each.

<a href="/images/dask_expr/tpch-comparison-dask.png">
<img src="/images/dask_expr/tpch-comparison-dask.png"
width="70%"
alt="Runtime on a per-query basis of Dask DataFrame and Dask-expr"></a>

We can see that Dask-expr performs better on every single query, up to a 3-times improvement
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of your figures show impressive improvements, very clear and convincing.

compared to the status quo.

Dask-expr reorders the query that was given by the user to only execute things that
are strictly necessary. It will, for example, drop unnecessary columns and thus reduce
the amount of data that is required. We can see that the biggest improvements are for
the queries that contain merge operations. These queries write a lot of data to disk
and send it over the network. Reducing the size of the data that is transferred has
a big impact on the performance.

The reordering results in a reduced memory footprint as well. This is especially helpful
for users that struggle with memory pressure on their clusters.

<a href="/images/dask_expr/tpch-comparison-dask-memory.png">
<img src="/images/dask_expr/tpch-comparison-dask-memory.png"
width="70%"
alt="Memory usage on a per query basis of Dask DataFrame and Dask-expr"></a>

We did another run on 1TB of data, which showed similar results but with a few notable differences:

- Getting Dask-expr to compute the results successfully was very easy. 20 workers very sufficient to
compute the results, which is the same number of workers as we used for the 100GB benchmarks.
- The original version needed a bigger cluster with 50 workers and with 32GB memory each.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- The original version needed a bigger cluster with 50 workers and with 32GB memory each.
- The original version using conventional dask dataframes needed a bigger cluster with 50 workers and with 32GB memory each.

- Some queries didn't complete the original version at all, because there were memory spikes on
single clusters. Dask-expr was able to complete all queries successfully.

Summarizing, getting these queries to complete was significantly easier with Dask-expr.

These results show us that we are on the right course and motivates
us to improve the performance of Dask-expr further. There is still a lot of untapped potential.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "Dask-expr" be capitalized like this in the middle of a sentence?


## About the benchmarks

The TPC-H benchmarks are a set of queries that are commonly used to compare performance of
different databases. Thus, they are very `merge` and `groupby` heavy. Historically, the current
implementation didn't perform very well on these types of queries. The
[new P2P shuffling algorithm](https://medium.com/coiled-hq/shuffling-large-data-at-constant-memory-in-dask-bb683e92d70b) that was introduced earlier this year and now Dask-expr improves
the performance of Dask a lot.

These queries only capture a small part of the [Dask DataFrame API](https://docs.dask.org/en/stable/dataframe.html) but are helpful to compare
performance.

The full implementation of all queries is available [here](https://github.com/coiled/benchmarks/blob/main/tests/benchmarks/test_tpch.py).
A single example query is copied below, so you can get a sense of what they look like:

```python
var1 = datetime.strptime("1995-03-15", "%Y-%m-%d")
var2 = "BUILDING"

lineitem_ds = read_data("lineitem")
orders_ds = read_data("orders")
cutomer_ds = read_data("customer")

flineitem = lineitem_ds[lineitem_ds.l_shipdate > var1]
forders = orders_ds[orders_ds.o_orderdate < var1]
fcustomer = cutomer_ds[cutomer_ds.c_mktsegment == var2]

jn1 = fcustomer.merge(forders, left_on="c_custkey", right_on="o_custkey")
jn2 = jn1.merge(flineitem, left_on="o_orderkey", right_on="l_orderkey")
jn2["revenue"] = jn2.l_extendedprice * (1 - jn2.l_discount)
total = jn2.groupby(["l_orderkey", "o_orderdate", "o_shippriority"])[
"revenue"
].sum()
total.reset_index().compute().sort_values(["revenue"], ascending=False).head(10)[
["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]
]
```

## What's next

We have some additional steps planned over the next couple of weeks. These include:

- Benchmarking Dask expressions against Spark
- Scaling the benchmarks to 1TB, 10TB and 100TB.

We are hoping that we can get additional insights into our implementation and are hoping to identify
bottlenecks.

## Can I use Dask-expr?

Though Dask-expr is still under active development, the project is in a state where users can
try it out. The API coverage is pretty good already. We are still adding new
optimizations, so performance will improve in the future.

You can try it out with:

```python
pip install dask-expr
```

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to repeat the link to Dask-expr here, where users can find the documentation to help them get started.

We don't expect that there exists any hard-to-find-bugs. We did not
reimplement any of the algorithms from Dask DataFrame, so we don't expect hard to identify issues
or an unexpected loss of data.

## Conclusion

Dask Expressions provides a faster and more memory efficient implementation of Dask DataFrames.
The project is still under active development, but it already outperforms the current status quo
by a significant margin on the TPC-H benchmarks.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/dask_expr/tpch-comparison-dask-memory.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/dask_expr/tpch-comparison-dask.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading