Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BSE-4385] Add locally runnable benchmarks #65

Merged
merged 10 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Benchmarks

## NYC Taxi Monthly Trips with Precipitation

For this benchmark, we adapt [an example data science workload](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/citibike_comparison/analysis/analysis_queries.sql#L1) into a pandas workload that reads from a public S3 bucket and calculates the average trip duration and number of trips based on features like weather conditions, pickup and dropoff location, month, and whether the trip was on a weekday.

### Dataset

The New York City Taxi and Limousine Commission's [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) (FHVHV) consists of over one billion trips taken by "for hire vehicles" including Uber and Lyft. To get the weather on a given day, we use a separate dataset of [Central Park weather observations](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/data/central_park_weather.csv). The For Hire Vehicle High Volume dataset consists of 1,036,465,968 rows and 24 columns. The Central Park Weather dataset consists of 5,538 rows and 9 columns.

### Setting

For this benchmark, we use the full FHVHV dataset stored in Parquet files on S3. The total size of this dataset is 24.7 GiB. The Central Park Weather data ia stored in a single CSV file on S3 and its total size is 514 KiB.

We compared Bodo's performance on this workload to other systems including [Dask](https://www.dask.org/), [Modin on Ray](https://docs.ray.io/en/latest/ray-more-libs/modin/index.html), and [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) and observed a speedup of 20-240x. The implementations for all of these systems can be found in [`nyc_taxi`](./nyc_taxi/). Versions of the packages used are summarized below.

| Package | Version |
|----------------|----------------|
| bodo | 2024.10 |
| dask | 2024.9.1 |
| dask-cloudprovider | 2024.9.1 |
| modin | 0.32.0 |
| ray | 2.40.0 |
| spark | 3.5.2 |

For cluster creation and configuration, we use the [Bodo SDK](https://docs.bodo.ai/2024.12/guides/using_bodo_platform/bodo_platform_sdk_guide/) for Bodo, Dask Cloud Provider for Dask, Ray for Modin, and AWS EMR for Spark. Scripts to configure and launch clusters for each system can be found in the same directory as the implementation.

Each benchmark is collected on a cluster containing 4 worker instances and 128 physical cores. Dask, Modin, and Spark use 4 `r6i.16xlarge` instances, each consisting of 32 physical cores and 256 GiB of memory. Dask Cloud Provider also allocates an additional `c6i.xlarge` instance for the distributed scheduler which contains 2 cores. Bodo is run on 4 `c6i.16xlarge` instances, each consisting of 32 physical cores and 64 GiB of memory.

### Results

The graph below summarizes the total execution time of each system (averaged over 3 runs). Results were last collected on December 12th, 2024.

<img src="./img/nyc-taxi-benchmark.png" alt="Monthly High Volume for Hire Vehicle Trips with Precipitation Benchmark Execution Time" title="Monthly High Volume for Hire Vehicle Trips with Precipitation Average Execution Time" width="30%">

## Local Benchmark

You can start to see the benefits of using Bodo from your laptop by running a smaller version of our benchmark locally. To set up, install the required packages using pip:

``` shell
pip install bodo==2024.12.1
pip install "dask[dataframe]"==2024.12.0
pip install "modin[all]"==0.32.0
pip install pyspark==3.5.3
pip install boto3 # for S3 download
```

To run the entire benchmarks as a script

``` shell
# From the benchmarks/ directory
./nyc_taxi/run_local.sh
```

We use a smaller subset of the [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) to allow the workload to run locally on an Apple M2 Macbook Pro with 10 cores and 16 GB memory. Even at this smaller scale, Bodo shows a roughly 3x improvement over the next best system (Dask). The results below were collected December 17th, 2024. Note that these numbers might differ based on your specific hardware and operating system.

| System | Total Execution Time (s) |
|----------------|----------------|
| Bodo | 1.007 |
| Dask | 3.091 |
| Modin/Ray | 13.65 |
| PySpark | 27.27 |

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we wanted to include hard number here because they could change based on things like network speed etc. I think the better comparison for the local benchmark is able to handle more data (which the script let's you play with different data sizes pretty easily). Later we could graph memory usage or maximum number of rows processed for different systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still show the hard numbers, eevn with a caveat

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some hard numbers here would be useful too. We can position them properly.

To see an even bigger difference, try increasing the number of rows read by specifying a different parquet file such as `s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet`. On this size (~20 million rows), Dask runs out of memory whereas Bodo continue to run:

``` shell
# From the benchmarks/ directory.
# Run Dask on first parquet file (~20 million rows)
python -m nyc_taxi.local_versions -s dask -d nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet

# Run Bodo on first parquet file (~20 million rows)
python -m nyc_taxi.local_versions -s bodo -d nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet
```
Empty file added benchmarks/__init__.py
Empty file.
Binary file modified benchmarks/img/nyc-taxi-benchmark.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
24 changes: 14 additions & 10 deletions benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@


@bodo.jit(cache=True)
def get_monthly_travels_weather():
start = time.time()
def get_monthly_travels_weather(weather_dataset, hvfhv_dataset):
start_read = time.time()
central_park_weather_observations = pd.read_csv(
"s3://bodo-example-data/nyc-taxi/central_park_weather.csv", parse_dates=["DATE"]
weather_dataset,
parse_dates=["DATE"],
)
central_park_weather_observations = central_park_weather_observations.rename(
columns={"DATE": "date", "PRCP": "precipitation"}, copy=False
)
fhvhv_tripdata = pd.read_parquet("s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/")
fhvhv_tripdata = pd.read_parquet(hvfhv_dataset)
end = time.time()
print("Reading Time: ", (end - start))
print("Reading Time: ", (end - start_read))

start = time.time()
start_compute = time.time()

central_park_weather_observations["date"] = central_park_weather_observations[
"date"
Expand Down Expand Up @@ -88,14 +89,17 @@ def get_time_bucket(t):
copy=False,
)
end = time.time()
print("Monthly Taxi Travel Times Computation Time: ", end - start)
print("Monthly Taxi Travel Times Computation Time: ", end - start_compute)

start = time.time()
start_write = time.time()
monthly_trips_weather.to_parquet("monthly_trips_weather.pq")
end = time.time()
print("Writing time:", (end - start))
print("Writing time:", (end - start_write))
print("Total E2E time:", (end - start_read))
return monthly_trips_weather


if __name__ == "__main__":
get_monthly_travels_weather()
weather_dataset = "s3://bodo-example-data/nyc-taxi/central_park_weather.csv"
hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv/"
get_monthly_travels_weather(weather_dataset, hvfhv_dataset)
Loading
Loading