Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BSE-4385] Add locally runnable benchmarks #65

Merged
merged 10 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Benchmarks

## Monthly High Volume for Hire Vehicle Trips with Precipitation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name of the benchmark doesn't need to be tied to the dataset name. Same code runs on other types of the NYC Taxi data with minor changes.

Suggested change
## Monthly High Volume for Hire Vehicle Trips with Precipitation
## NYC Taxi Monthly Trips with Precipitation


For this benchmark, we adapt a [SQL query](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/citibike_comparison/analysis/analysis_queries.sql#L1) into a pandas workload that reads from a public S3 bucket and calculates the average trip duration and number of trips based on features like weather conditions, pickup and dropoff location, month, and whether the trip was on a weekday.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL is actually a bit negative in this context. I think the original code was actually in R and was rewritten into SQL later.

Suggested change
For this benchmark, we adapt a [SQL query](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/citibike_comparison/analysis/analysis_queries.sql#L1) into a pandas workload that reads from a public S3 bucket and calculates the average trip duration and number of trips based on features like weather conditions, pickup and dropoff location, month, and whether the trip was on a weekday.
For this benchmark, we adapt an [example data science workload](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/citibike_comparison/analysis/analysis_queries.sql#L1) into a pandas workload that reads from a public S3 bucket and calculates the average trip duration and number of trips based on features like weather conditions, pickup and dropoff location, month, and whether the trip was on a weekday.


### Dataset

The New York City Taxi and Limousine Commission's [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)(FHVHV) consists of over one billion trips taken by "for hire vehicles" including Uber and Lyft. To get the weather on a given day, we use a separate dataset of [Central Park weather observations](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/data/central_park_weather.csv). The For Hire Vehicle High Volume dataset consists of 1,036,465,968 rows and 24 columns. The Central Park Weather dataset consists of 5,538 rows and 9 columns.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a space needed:

Suggested change
The New York City Taxi and Limousine Commission's [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)(FHVHV) consists of over one billion trips taken by "for hire vehicles" including Uber and Lyft. To get the weather on a given day, we use a separate dataset of [Central Park weather observations](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/data/central_park_weather.csv). The For Hire Vehicle High Volume dataset consists of 1,036,465,968 rows and 24 columns. The Central Park Weather dataset consists of 5,538 rows and 9 columns.
The New York City Taxi and Limousine Commission's [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) (FHVHV) consists of over one billion trips taken by "for hire vehicles" including Uber and Lyft. To get the weather on a given day, we use a separate dataset of [Central Park weather observations](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/data/central_park_weather.csv). The For Hire Vehicle High Volume dataset consists of 1,036,465,968 rows and 24 columns. The Central Park Weather dataset consists of 5,538 rows and 9 columns.


### Setting

For this benchmark, we used the full FHVHV dataset stored in parquet files on S3. The total size of this dataset was 24.7 GiB. The Central Park Weather data was stored in a single csv file on S3 and it's total size was 514 KiB.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use active voice and other minor changes:

Suggested change
For this benchmark, we used the full FHVHV dataset stored in parquet files on S3. The total size of this dataset was 24.7 GiB. The Central Park Weather data was stored in a single csv file on S3 and it's total size was 514 KiB.
For this benchmark, we use the full FHVHV dataset stored in Parquet files on S3. The total size of this dataset is 24.7 GiB. The Central Park Weather data is stored in a single CSV file on S3 and its total size is 514 KiB.


We compared Bodo's performance on this workload to other systems including [Dask](https://www.dask.org/), [Modin on Ray](https://docs.ray.io/en/latest/ray-more-libs/modin/index.html), and [Pyspark](https://spark.apache.org/docs/latest/api/python/index.html) and observed a speedup of 20-240x. The implementations for all of these systems can be found in [`nyc_taxi`](./nyc_taxi/). Versions of the packages used are summarized below.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
We compared Bodo's performance on this workload to other systems including [Dask](https://www.dask.org/), [Modin on Ray](https://docs.ray.io/en/latest/ray-more-libs/modin/index.html), and [Pyspark](https://spark.apache.org/docs/latest/api/python/index.html) and observed a speedup of 20-240x. The implementations for all of these systems can be found in [`nyc_taxi`](./nyc_taxi/). Versions of the packages used are summarized below.
We compared Bodo's performance on this workload to other systems including [Dask](https://www.dask.org/), [Modin on Ray](https://docs.ray.io/en/latest/ray-more-libs/modin/index.html), and [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) and observed a speedup of 20-240x. The implementations for all of these systems can be found in [`nyc_taxi`](./nyc_taxi/). Versions of the packages used are summarized below.


| Package | Version |
|----------------|----------------|
| bodo | 2024.10 |
| dask | 2024.9.1 |
| dask-cloudprovider | 2024.9.1 |
| modin | 0.32.0 |
| ray | 2.40.0 |
| spark | 3.5.2 |

For cluster creation and configuration, we used the [Bodo SDK](https://docs.bodo.ai/2024.12/guides/using_bodo_platform/bodo_platform_sdk_guide/) for Bodo, Dask Cloudprovider for Dask, Ray for Modin, and AWS EMR for Spark. Scripts to configure and launch clusters for each system can be found in the same directory as the implementation.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
For cluster creation and configuration, we used the [Bodo SDK](https://docs.bodo.ai/2024.12/guides/using_bodo_platform/bodo_platform_sdk_guide/) for Bodo, Dask Cloudprovider for Dask, Ray for Modin, and AWS EMR for Spark. Scripts to configure and launch clusters for each system can be found in the same directory as the implementation.
For cluster creation and configuration, we used the [Bodo SDK](https://docs.bodo.ai/2024.12/guides/using_bodo_platform/bodo_platform_sdk_guide/) for Bodo, Dask Cloud Provider for Dask, Ray for Modin, and AWS EMR for Spark. Scripts to configure and launch clusters for each system can be found in the same directory as the implementation.


Each benchmark was collected on a cluster containing 4 worker instances and 128 physical cores. Dask, Modin, and Spark used 4 `r6i.16xlarge` instances, each consisting of 32 physical cores and 256 GiB of memory. Dask Cloudprovider also allocated an additional `r6i.16xlarge` instance for the scheduler. Bodo was run on 4 `c6i.16xlarge` instances, each consisting of 32 physical cores and 64 GiB of memory.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cluster configuration should be the same for all systems. It's ok if Dask uses a small extra instance for scheduler.


### Results

The graph below summarizes the total execution time of each system (averaged over 3 runs). Results were last collected on December 12th, 2024.

<img src="./img/nyc_taxi.png" alt="Monthly High Volume for Hire Vehicle Trips with Precipitation Benchmark Execution Time" title="Monthly High Volume for Hire Vehicle Trips with Precipitation Average Execution Time" width="30%">
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the same file path in top level README.md as well.


## Local Benchmark

You can start to see the benefits of using Bodo from your laptop by running a smaller version of our benchmark locally. To set up, install the required packages using pip:

``` shell
pip install bodo
pip install "dask[dataframe]"
pip install "modin[all]"
pip install pyspark
```
Then run the benchmarks!

``` shell
# (from the benchmarks/ directory)
python -m nyc_taxi.run_local
```

We used a smaller subset of the [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) to allow the workload to run locally on an Apple M2 Macbook Pro with 10 cores and 16 GB memory. Even at this smaller scale, Bodo shows a modest improvement over the next best system (Dask).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "modest improvement"? We don't want to be modest when presenting benchmarks :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So bodo is roughly 3x better than dask or even regular pandas in this case. I wanted to highlight that for smaller data sizes these solutions can be roughly equivalent and then in the next paragraph introduce the result that shows Dask OOM which further supports what we are claiming to be good at .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a really good result and we should showcase it as such, including a chart that we can share/ brag about

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will open up a follow up to create the chart. Did we want to create a chart for OOM too? Also should we do a comparison with pandas here as well since it is pretty competitive on smaller datasets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can, yes, let's not be shy about creating a chart to show this.


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we wanted to include hard number here because they could change based on things like network speed etc. I think the better comparison for the local benchmark is able to handle more data (which the script let's you play with different data sizes pretty easily). Later we could graph memory usage or maximum number of rows processed for different systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still show the hard numbers, eevn with a caveat

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some hard numbers here would be useful too. We can position them properly.

To see an even bigger difference, try increasing the number of rows read by specifying a different parquet file such as `s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet`. On this size (~20 million rows), Dask runs out of memory whereas Bodo continue to run:

``` shell
# run Dask on first parquet file (~20 million rows)
python -m nyc_taxi.run_local -b dask -d s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet

# run Bodo on first parquet file (~20 million rows)
python -m nyc_taxi.run_local -b bodo -d s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet
```
Empty file added benchmarks/__init__.py
Empty file.
Binary file added benchmarks/img/nyc_taxi.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 7 additions & 4 deletions benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@


@bodo.jit(cache=True)
def get_monthly_travels_weather():
def get_monthly_travels_weather(hvfhv_dataset):
start = time.time()
central_park_weather_observations = pd.read_csv(
"s3://bodo-example-data/nyc-taxi/central_park_weather.csv", parse_dates=["DATE"]
"s3://bodo-example-data/nyc-taxi/central_park_weather.csv",
parse_dates=["DATE"],
storage_options={"anon": True},
)
central_park_weather_observations = central_park_weather_observations.rename(
columns={"DATE": "date", "PRCP": "precipitation"}, copy=False
)
fhvhv_tripdata = pd.read_parquet("s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/")
fhvhv_tripdata = pd.read_parquet(hvfhv_dataset, storage_options={"anon": True})
end = time.time()
print("Reading Time: ", (end - start))

Expand Down Expand Up @@ -98,4 +100,5 @@ def get_time_bucket(t):


if __name__ == "__main__":
get_monthly_travels_weather()
hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv/"
get_monthly_travels_weather(hvfhv_dataset)
22 changes: 16 additions & 6 deletions benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

import dask.dataframe as dd
from dask.distributed import Client
from dask_cloudprovider.aws import EC2Cluster


def get_monthly_travels_weather():
def get_monthly_travels_weather(hvfhv_dataset):
start = time.time()
central_park_weather_observations = dd.read_csv(
"s3://bodo-example-data/nyc-taxi/central_park_weather.csv",
Expand All @@ -28,7 +27,7 @@ def get_monthly_travels_weather():
)

fhvhv_tripdata = dd.read_parquet(
"s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/",
hvfhv_dataset,
storage_options={"anon": True},
)

Expand Down Expand Up @@ -103,7 +102,17 @@ def get_time_bucket(t):
return end - start


def main():
def local_get_monthly_travels_weather(hvfhv_dataset):
"""Run Dask on local cluster."""
with Client():
total_time = get_monthly_travels_weather(hvfhv_dataset)
print("Total time for IO and compute:", total_time)


def ec2_get_monthly_travels_weather(hvfhv_dataset):
"""Run Dask on EC2 cluster."""
from dask_cloudprovider.aws import EC2Cluster

env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2024.10.0"}
with EC2Cluster(
# NOTE: Setting security = False to avoid large config size
Expand All @@ -117,11 +126,12 @@ def main():
) as cluster:
with Client(cluster) as client:
for _ in range(3):
future = client.submit(get_monthly_travels_weather)
future = client.submit(get_monthly_travels_weather, hvfhv_dataset)
total_time = future.result()
client.restart()
print("Total time for IO and compute:", total_time)


if __name__ == "__main__":
main()
hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/"
ec2_get_monthly_travels_weather(hvfhv_dataset)
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ setup_commands:
- conda activate modin && pip install -U fsspec>=2022.11.0 s3fs boto3 pyopenssl
- echo "conda activate modin" >> ~/.bashrc
- echo 'export MODIN_RAY_CLUSTER=True' >> ~/.bashrc
- echo 'conda list'

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import time

import ray

ray.init(address="auto")
cpu_count = ray.cluster_resources()["CPU"]
print("RAY CPU COUNT: ", cpu_count)
import modin.pandas as pd
import ray
from modin.pandas.io import to_ray


def run_modin():
def get_monthly_travels_weather(hvfhv_dataset):
start = time.time()
central_park_weather_observations = pd.read_csv(
"s3://bodo-example-data/nyc-taxi/central_park_weather.csv",
Expand All @@ -21,7 +17,7 @@ def run_modin():
copy=False,
)
fhvhv_tripdata = pd.read_parquet(
"s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/",
hvfhv_dataset,
storage_options={"anon": True},
)
end = time.time()
Expand Down Expand Up @@ -91,7 +87,6 @@ def get_time_bucket(t):
)
end = time.time()
print("Monthly Taxi Travel Times Computation Time: ", end - start)
print(monthly_trips_weather.head())

start = time.time()
monthly_trips_weather_ray = to_ray(monthly_trips_weather)
Expand All @@ -101,5 +96,17 @@ def get_time_bucket(t):
return monthly_trips_weather


def local_get_monthly_travels_weather(hvfhv_dataset):
"""Run Modin on local Ray cluster"""
ray.init()
get_monthly_travels_weather(hvfhv_dataset)
ray.shutdown()


if __name__ == "__main__":
result = run_modin()
ray.init(address="auto")
cpu_count = ray.cluster_resources()["CPU"]
print("RAY CPU COUNT: ", cpu_count)

hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv/"
get_monthly_travels_weather(hvfhv_dataset)
78 changes: 78 additions & 0 deletions benchmarks/nyc_taxi/run_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Run local version of Bodo, Dask, Modin, and Pyspark benchmarks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Run local version of Bodo, Dask, Modin, and Pyspark benchmarks
"""Run local version of Bodo, Dask, Modin, and PySpark benchmarks

Accepts a list of files to use as the High Volume For Hire Vehicle dataset
(defaults to first 5 million rows of the dataset). For exhaustive list of
files, see s3://bodo-example-data/nyc-taxi/fhvhv. You can also optionally
specify the system to run e.g. just run bodo or "all" to run on all systems.

usage:
python run_local.py --dataset FILE --system SYSTEM
"""

import argparse

from .bodo.nyc_taxi_precipitation import (
get_monthly_travels_weather as bodo_get_monthly_travels_weather,
)
from .dask.nyc_taxi_precipitation import (
local_get_monthly_travels_weather as dask_get_monthly_travels_weather,
)
from .modin_ray.nyc_taxi_precipitation import (
local_get_monthly_travels_weather as modin_get_monthly_travels_weather,
)
from .spark.spark_nyc_taxi_precipitation import (
get_monthly_travels_weather as spark_get_monthly_travels_weather,
)

hvfhv_small = "s3://bodo-example-data/nyc-taxi/fhvhv_5M_rows.pq"


def get_spark_paths(files: str | list[str]) -> str:
"""Gets the equivalent path for spark to use."""
if isinstance(files, str):
return files.replace("s3://", "s3a://").replace("fhvhv/", "fhvhv-rewrite/")

assert len(files) != 1, "Spark benchmark expects a single path argument."

return files[0].replace("s3://", "s3a://").replace("fhvhv/", "fhvhv-rewrite/")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PySpark has slightly different syntax for reading multiple parquet files so we'd have to modify the script to allow this. I think it's fine to just cap it at one since it can't handle more than one parquet file from this dataset anyways (runs out of memory).



def main(dataset: str, system: str):
if system == "dask" or system == "all":
print("Running Dask...")
dask_get_monthly_travels_weather(dataset)

if system == "modin" or system == "all":
print("Running Modin...")
modin_get_monthly_travels_weather(dataset)

if system == "spark" or system == "all":
print("Running Spark...")
spark_get_monthly_travels_weather(get_spark_paths(dataset))

if system == "bodo" or system == "all":
print("Running Bodo...")
bodo_get_monthly_travels_weather(dataset)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset",
"-d",
nargs="*",
required=False,
default=hvfhv_small,
help="Path to parquet file(s) to use for local benchmark.",
)
parser.add_argument(
"--system",
"-s",
required=False,
default="all",
help="System to run benchmark on, 'all' runs all systems.",
)
args = parser.parse_args()
dataset = args.dataset
system = args.system

main(dataset, system)
10 changes: 5 additions & 5 deletions benchmarks/nyc_taxi/spark/spark_nyc_taxi_precipitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)


def get_monthly_travels_weather():
def get_monthly_travels_weather(hvfhv_dataset):
spark = (
SparkSession.builder.appName("MonthlyTravelsWeather")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2")
Expand All @@ -36,9 +36,7 @@ def get_monthly_travels_weather():

# Read in trip data using spark, this reads a re-written dataset because spark doesn't support reading the original dataset
# due to schema unification issues
fhvhv_tripdata = spark.read.parquet(
"s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/"
).drop("__index_level_0__")
fhvhv_tripdata = spark.read.parquet(hvfhv_dataset).drop("__index_level_0__")

# Convert datetime columns and create necessary features
fhvhv_tripdata = (
Expand Down Expand Up @@ -113,4 +111,6 @@ def get_time_bucket(t):
print("Execution time:", time.time() - start)


get_monthly_travels_weather()
if __name__ == "__main__":
hvfhv_dataset = "s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/"
get_monthly_travels_weather(hvfhv_dataset)
Loading