diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000000..c17926ce55 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,73 @@ +# Benchmarks + +## NYC Taxi Monthly Trips with Precipitation + +For this benchmark, we adapt [an example data science workload](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/citibike_comparison/analysis/analysis_queries.sql#L1) into a pandas workload that reads from a public S3 bucket and calculates the average trip duration and number of trips based on features like weather conditions, pickup and dropoff location, month, and whether the trip was on a weekday. + +### Dataset + +The New York City Taxi and Limousine Commission's [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) (FHVHV) consists of over one billion trips taken by "for hire vehicles" including Uber and Lyft. To get the weather on a given day, we use a separate dataset of [Central Park weather observations](https://github.com/toddwschneider/nyc-taxi-data/blob/c65ad8332a44f49770644b11576c0529b40bbc76/data/central_park_weather.csv). The For Hire Vehicle High Volume dataset consists of 1,036,465,968 rows and 24 columns. The Central Park Weather dataset consists of 5,538 rows and 9 columns. + +### Setting + +For this benchmark, we use the full FHVHV dataset stored in Parquet files on S3. The total size of this dataset is 24.7 GiB. The Central Park Weather data ia stored in a single CSV file on S3 and its total size is 514 KiB. + +We compared Bodo's performance on this workload to other systems including [Dask](https://www.dask.org/), [Modin on Ray](https://docs.ray.io/en/latest/ray-more-libs/modin/index.html), and [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) and observed a speedup of 20-240x. The implementations for all of these systems can be found in [`nyc_taxi`](./nyc_taxi/). Versions of the packages used are summarized below. + +| Package | Version | +|----------------|----------------| +| bodo | 2024.10 | +| dask | 2024.9.1 | +| dask-cloudprovider | 2024.9.1 | +| modin | 0.32.0 | +| ray | 2.40.0 | +| spark | 3.5.2 | + +For cluster creation and configuration, we use the [Bodo SDK](https://docs.bodo.ai/2024.12/guides/using_bodo_platform/bodo_platform_sdk_guide/) for Bodo, Dask Cloud Provider for Dask, Ray for Modin, and AWS EMR for Spark. Scripts to configure and launch clusters for each system can be found in the same directory as the implementation. + +Each benchmark is collected on a cluster containing 4 worker instances and 128 physical cores. Dask, Modin, and Spark use 4 `r6i.16xlarge` instances, each consisting of 32 physical cores and 256 GiB of memory. Dask Cloud Provider also allocates an additional `c6i.xlarge` instance for the distributed scheduler which contains 2 cores. Bodo is run on 4 `c6i.16xlarge` instances, each consisting of 32 physical cores and 64 GiB of memory. + +### Results + +The graph below summarizes the total execution time of each system (averaged over 3 runs). Results were last collected on December 12th, 2024. + +Monthly High Volume for Hire Vehicle Trips with Precipitation Benchmark Execution Time + +## Local Benchmark + +You can start to see the benefits of using Bodo from your laptop by running a smaller version of our benchmark locally. To set up, install the required packages using pip: + +``` shell +pip install bodo==2024.12.1 +pip install "dask[dataframe]"==2024.12.0 +pip install "modin[all]"==0.32.0 +pip install pyspark==3.5.3 +pip install boto3 # for S3 download +``` + +To run the entire benchmarks as a script + +``` shell +# From the benchmarks/ directory +./nyc_taxi/run_local.sh +``` + +We use a smaller subset of the [For Hire Vehicle High Volume dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page) to allow the workload to run locally on an Apple M2 Macbook Pro with 10 cores and 16 GB memory. Even at this smaller scale, Bodo shows a roughly 3x improvement over the next best system (Dask). The results below were collected December 17th, 2024. Note that these numbers might differ based on your specific hardware and operating system. + +| System | Total Execution Time (s) | +|----------------|----------------| +| Bodo | 1.007 | +| Dask | 3.091 | +| Modin/Ray | 13.65 | +| PySpark | 27.27 | + +To see an even bigger difference, try increasing the number of rows read by specifying a different parquet file such as `s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet`. On this size (~20 million rows), Dask runs out of memory whereas Bodo continue to run: + +``` shell +# From the benchmarks/ directory. +# Run Dask on first parquet file (~20 million rows) +python -m nyc_taxi.local_versions -s dask -d nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet + +# Run Bodo on first parquet file (~20 million rows) +python -m nyc_taxi.local_versions -s bodo -d nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-02.parquet +``` \ No newline at end of file diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/benchmarks/img/nyc-taxi-benchmark.png b/benchmarks/img/nyc-taxi-benchmark.png index 3ceb43fc1c..0434992213 100644 Binary files a/benchmarks/img/nyc-taxi-benchmark.png and b/benchmarks/img/nyc-taxi-benchmark.png differ diff --git a/benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py b/benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py index ef5ad8d668..524ff4a6f0 100644 --- a/benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py +++ b/benchmarks/nyc_taxi/bodo/nyc_taxi_precipitation.py @@ -13,19 +13,20 @@ @bodo.jit(cache=True) -def get_monthly_travels_weather(): - start = time.time() +def get_monthly_travels_weather(weather_dataset, hvfhv_dataset): + start_read = time.time() central_park_weather_observations = pd.read_csv( - "s3://bodo-example-data/nyc-taxi/central_park_weather.csv", parse_dates=["DATE"] + weather_dataset, + parse_dates=["DATE"], ) central_park_weather_observations = central_park_weather_observations.rename( columns={"DATE": "date", "PRCP": "precipitation"}, copy=False ) - fhvhv_tripdata = pd.read_parquet("s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/") + fhvhv_tripdata = pd.read_parquet(hvfhv_dataset) end = time.time() - print("Reading Time: ", (end - start)) + print("Reading Time: ", (end - start_read)) - start = time.time() + start_compute = time.time() central_park_weather_observations["date"] = central_park_weather_observations[ "date" @@ -88,14 +89,17 @@ def get_time_bucket(t): copy=False, ) end = time.time() - print("Monthly Taxi Travel Times Computation Time: ", end - start) + print("Monthly Taxi Travel Times Computation Time: ", end - start_compute) - start = time.time() + start_write = time.time() monthly_trips_weather.to_parquet("monthly_trips_weather.pq") end = time.time() - print("Writing time:", (end - start)) + print("Writing time:", (end - start_write)) + print("Total E2E time:", (end - start_read)) return monthly_trips_weather if __name__ == "__main__": - get_monthly_travels_weather() + weather_dataset = "s3://bodo-example-data/nyc-taxi/central_park_weather.csv" + hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv/" + get_monthly_travels_weather(weather_dataset, hvfhv_dataset) diff --git a/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.ipynb b/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.ipynb index 5dbcfc1d41..f4aeaf0f5e 100644 --- a/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.ipynb +++ b/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.ipynb @@ -32,9 +32,317 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 34, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating scheduler instance\n", + "\n", + "Cloud init\n", + "==========\n", + "\n", + "\n", + "#cloud-config\n", + "\n", + "\n", + "# Bootstrap\n", + "packages:\n", + " - apt-transport-https\n", + " - ca-certificates\n", + " - curl\n", + " - gnupg-agent\n", + " - software-properties-common\n", + " - ubuntu-drivers-common\n", + "\n", + "# Enable ipv4 forwarding, required on CIS hardened machines\n", + "write_files:\n", + " - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf\n", + " content: |\n", + " net.ipv4.conf.all.forwarding=1\n", + "\n", + "# create the docker group\n", + "groups:\n", + " - docker\n", + "\n", + "# Add default auto created user to docker group\n", + "system_info:\n", + " default_user:\n", + " groups: [docker]\n", + "\n", + "\n", + "runcmd:\n", + " \n", + " # Install Docker\n", + " - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -\n", + " - add-apt-repository \"deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable\"\n", + " - apt-get update -y\n", + " - apt-get install -y docker-ce docker-ce-cli containerd.io\n", + " - systemctl start docker\n", + " - systemctl enable docker\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + " # Run container\n", + " - 'docker run --net=host -e EXTRA_CONDA_PACKAGES=\"s3fs==2024.10.0\" daskdev/dask:latest env DASK_INTERNAL_INHERIT_CONFIG=\"\" python -m distributed.cli.dask_scheduler'\n", + "\n", + " \n", + " # Shutdown when command is done\n", + " - shutdown -h now\n", + " \n", + "Created instance i-0311fa1be461bd2a9 as dask-d7369884-scheduler\n", + "Waiting for scheduler to run at 18.220.177.101:8786\n", + "Scheduler is running\n", + "Creating worker instance\n", + "Creating worker instance\n", + "Creating worker instance\n", + "Creating worker instance\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/scottroutledge/miniforge3/envs/benchmark_dask/lib/python3.10/contextlib.py:142: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight! \n", + " next(self.gen)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Cloud init\n", + "==========\n", + "\n", + "\n", + "#cloud-config\n", + "\n", + "\n", + "# Bootstrap\n", + "packages:\n", + " - apt-transport-https\n", + " - ca-certificates\n", + " - curl\n", + " - gnupg-agent\n", + " - software-properties-common\n", + " - ubuntu-drivers-common\n", + "\n", + "# Enable ipv4 forwarding, required on CIS hardened machines\n", + "write_files:\n", + " - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf\n", + " content: |\n", + " net.ipv4.conf.all.forwarding=1\n", + "\n", + "# create the docker group\n", + "groups:\n", + " - docker\n", + "\n", + "# Add default auto created user to docker group\n", + "system_info:\n", + " default_user:\n", + " groups: [docker]\n", + "\n", + "\n", + "runcmd:\n", + " \n", + " # Install Docker\n", + " - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -\n", + " - add-apt-repository \"deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable\"\n", + " - apt-get update -y\n", + " - apt-get install -y docker-ce docker-ce-cli containerd.io\n", + " - systemctl start docker\n", + " - systemctl enable docker\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + " # Run container\n", + " - 'docker run --net=host -e EXTRA_CONDA_PACKAGES=\"s3fs==2024.10.0\" daskdev/dask:latest env DASK_INTERNAL_INHERIT_CONFIG=\"\" python -m distributed.cli.dask_spec tcp://18.220.177.101:8786 --spec ''{\"cls\": \"dask.distributed.Nanny\", \"opts\": {\"name\": \"dask-d7369884-worker-87177d09\"}}'''\n", + "\n", + " \n", + " # Shutdown when command is done\n", + " - shutdown -h now\n", + " \n", + "\n", + "Cloud init\n", + "==========\n", + "\n", + "\n", + "#cloud-config\n", + "\n", + "\n", + "# Bootstrap\n", + "packages:\n", + " - apt-transport-https\n", + " - ca-certificates\n", + " - curl\n", + " - gnupg-agent\n", + " - software-properties-common\n", + " - ubuntu-drivers-common\n", + "\n", + "# Enable ipv4 forwarding, required on CIS hardened machines\n", + "write_files:\n", + " - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf\n", + " content: |\n", + " net.ipv4.conf.all.forwarding=1\n", + "\n", + "# create the docker group\n", + "groups:\n", + " - docker\n", + "\n", + "# Add default auto created user to docker group\n", + "system_info:\n", + " default_user:\n", + " groups: [docker]\n", + "\n", + "\n", + "runcmd:\n", + " \n", + " # Install Docker\n", + " - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -\n", + " - add-apt-repository \"deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable\"\n", + " - apt-get update -y\n", + " - apt-get install -y docker-ce docker-ce-cli containerd.io\n", + " - systemctl start docker\n", + " - systemctl enable docker\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + " # Run container\n", + " - 'docker run --net=host -e EXTRA_CONDA_PACKAGES=\"s3fs==2024.10.0\" daskdev/dask:latest env DASK_INTERNAL_INHERIT_CONFIG=\"\" python -m distributed.cli.dask_spec tcp://18.220.177.101:8786 --spec ''{\"cls\": \"dask.distributed.Nanny\", \"opts\": {\"name\": \"dask-d7369884-worker-592c886c\"}}'''\n", + "\n", + " \n", + " # Shutdown when command is done\n", + " - shutdown -h now\n", + " \n", + "\n", + "Cloud init\n", + "==========\n", + "\n", + "\n", + "#cloud-config\n", + "\n", + "\n", + "# Bootstrap\n", + "packages:\n", + " - apt-transport-https\n", + " - ca-certificates\n", + " - curl\n", + " - gnupg-agent\n", + " - software-properties-common\n", + " - ubuntu-drivers-common\n", + "\n", + "# Enable ipv4 forwarding, required on CIS hardened machines\n", + "write_files:\n", + " - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf\n", + " content: |\n", + " net.ipv4.conf.all.forwarding=1\n", + "\n", + "# create the docker group\n", + "groups:\n", + " - docker\n", + "\n", + "# Add default auto created user to docker group\n", + "system_info:\n", + " default_user:\n", + " groups: [docker]\n", + "\n", + "\n", + "runcmd:\n", + " \n", + " # Install Docker\n", + " - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -\n", + " - add-apt-repository \"deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable\"\n", + " - apt-get update -y\n", + " - apt-get install -y docker-ce docker-ce-cli containerd.io\n", + " - systemctl start docker\n", + " - systemctl enable docker\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + " # Run container\n", + " - 'docker run --net=host -e EXTRA_CONDA_PACKAGES=\"s3fs==2024.10.0\" daskdev/dask:latest env DASK_INTERNAL_INHERIT_CONFIG=\"\" python -m distributed.cli.dask_spec tcp://18.220.177.101:8786 --spec ''{\"cls\": \"dask.distributed.Nanny\", \"opts\": {\"name\": \"dask-d7369884-worker-a09fb848\"}}'''\n", + "\n", + " \n", + " # Shutdown when command is done\n", + " - shutdown -h now\n", + " \n", + "\n", + "Cloud init\n", + "==========\n", + "\n", + "\n", + "#cloud-config\n", + "\n", + "\n", + "# Bootstrap\n", + "packages:\n", + " - apt-transport-https\n", + " - ca-certificates\n", + " - curl\n", + " - gnupg-agent\n", + " - software-properties-common\n", + " - ubuntu-drivers-common\n", + "\n", + "# Enable ipv4 forwarding, required on CIS hardened machines\n", + "write_files:\n", + " - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf\n", + " content: |\n", + " net.ipv4.conf.all.forwarding=1\n", + "\n", + "# create the docker group\n", + "groups:\n", + " - docker\n", + "\n", + "# Add default auto created user to docker group\n", + "system_info:\n", + " default_user:\n", + " groups: [docker]\n", + "\n", + "\n", + "runcmd:\n", + " \n", + " # Install Docker\n", + " - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -\n", + " - add-apt-repository \"deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable\"\n", + " - apt-get update -y\n", + " - apt-get install -y docker-ce docker-ce-cli containerd.io\n", + " - systemctl start docker\n", + " - systemctl enable docker\n", + " \n", + "\n", + " \n", + "\n", + " \n", + "\n", + " # Run container\n", + " - 'docker run --net=host -e EXTRA_CONDA_PACKAGES=\"s3fs==2024.10.0\" daskdev/dask:latest env DASK_INTERNAL_INHERIT_CONFIG=\"\" python -m distributed.cli.dask_spec tcp://18.220.177.101:8786 --spec ''{\"cls\": \"dask.distributed.Nanny\", \"opts\": {\"name\": \"dask-d7369884-worker-f9596efa\"}}'''\n", + "\n", + " \n", + " # Shutdown when command is done\n", + " - shutdown -h now\n", + " \n", + "Created instance i-0e2f3a60157449305 as dask-d7369884-worker-87177d09\n", + "Created instance i-08464c37bec5899d5 as dask-d7369884-worker-f9596efa\n", + "Created instance i-03c7260fe1621977b as dask-d7369884-worker-592c886c\n", + "Created instance i-01836314d011735d9 as dask-d7369884-worker-a09fb848\n" + ] + } + ], "source": [ "env_vars = {\"EXTRA_CONDA_PACKAGES\": \"s3fs==2024.10.0\"}\n", "cluster = EC2Cluster(\n", @@ -42,7 +350,8 @@ " # https://github.com/dask/dask-cloudprovider/issues/249\n", " security=False,\n", " n_workers=4,\n", - " instance_type=\"r6i.16xlarge\",\n", + " scheduler_instance_type=\"c6i.xlarge\",\n", + " worker_instance_type=\"r6i.16xlarge\",\n", " # Region for accessing bodo-example-data\n", " region=\"us-east-2\",\n", " env_vars=env_vars,\n", @@ -52,9 +361,31 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 35, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "http://18.220.177.101:8787/status\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/scottroutledge/miniforge3/envs/benchmark_dask/lib/python3.10/site-packages/distributed/client.py:1606: VersionMismatchWarning: Mismatched versions found\n", + "\n", + "+---------+-----------------+-----------------+---------+\n", + "| Package | Client | Scheduler | Workers |\n", + "+---------+-----------------+-----------------+---------+\n", + "| python | 3.10.15.final.0 | 3.10.12.final.0 | None |\n", + "+---------+-----------------+-----------------+---------+\n", + " warnings.warn(version_module.VersionMismatchWarning(msg[0][\"warning\"]))\n" + ] + } + ], "source": [ "client = Client(cluster)\n", "print(client.dashboard_link)" @@ -62,20 +393,20 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "# first parquet file\n", "dataset = [\n", " f\"s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/fhvhv_tripdata_2019-{i:02}.parquet\"\n", - " for i in range(2, 3)\n", + " for i in range(2, 8)\n", "]" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 36, "metadata": {}, "outputs": [], "source": [ @@ -85,7 +416,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 37, "metadata": {}, "outputs": [], "source": [ @@ -175,16 +506,16 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Total time for IO and compute: 908.5563251972198\n", - "Total time for IO and compute: 907.8074090480804\n", - "Total time for IO and compute: 929.6480667591095\n" + "Total time for IO and compute: 932.2804353237152\n", + "Total time for IO and compute: 888.8601548671722\n", + "Total time for IO and compute: 885.689935207367\n" ] } ], @@ -198,9 +529,21 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 40, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Terminated dask-d7369884-worker-87177d09 (i-0e2f3a60157449305)\n", + "Terminated dask-d7369884-worker-592c886c (i-03c7260fe1621977b)\n", + "Terminated dask-d7369884-worker-f9596efa (i-08464c37bec5899d5)\n", + "Terminated dask-d7369884-worker-a09fb848 (i-01836314d011735d9)\n", + "Terminated dask-d7369884-scheduler (i-0311fa1be461bd2a9)\n" + ] + } + ], "source": [ "client.close()\n", "cluster.close()" diff --git a/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.py b/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.py index d5e724311a..b8744f374d 100644 --- a/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.py +++ b/benchmarks/nyc_taxi/dask/nyc_taxi_precipitation.py @@ -13,24 +13,18 @@ import dask.dataframe as dd from dask.distributed import Client -from dask_cloudprovider.aws import EC2Cluster -def get_monthly_travels_weather(): +def get_monthly_travels_weather(weather_dataset, hvfhv_dataset, storage_options=None): start = time.time() central_park_weather_observations = dd.read_csv( - "s3://bodo-example-data/nyc-taxi/central_park_weather.csv", - parse_dates=["DATE"], - storage_options={"anon": True}, + weather_dataset, parse_dates=["DATE"], storage_options=storage_options ) central_park_weather_observations = central_park_weather_observations.rename( columns={"DATE": "date", "PRCP": "precipitation"} ) - fhvhv_tripdata = dd.read_parquet( - "s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/", - storage_options={"anon": True}, - ) + fhvhv_tripdata = dd.read_parquet(hvfhv_dataset, storage_options=storage_options) central_park_weather_observations["date"] = central_park_weather_observations[ "date" @@ -103,25 +97,48 @@ def get_time_bucket(t): return end - start -def main(): +def local_get_monthly_travels_weather(weather_dataset, hvfhv_dataset): + """Run Dask on local cluster.""" + with Client(): + total_time = get_monthly_travels_weather(weather_dataset, hvfhv_dataset) + print("Total time for IO and compute:", total_time) + + +def ec2_get_monthly_travels_weather(weather_dataset, hvfhv_dataset): + """Run Dask on EC2 cluster.""" + from dask_cloudprovider.aws import EC2Cluster + + # for reading from S3 env_vars = {"EXTRA_CONDA_PACKAGES": "s3fs==2024.10.0"} + + # use an anoymous session to avoid passing credentials to cluster + s3_options = {"anon": True} + with EC2Cluster( # NOTE: Setting security = False to avoid large config size # https://github.com/dask/dask-cloudprovider/issues/249 security=False, n_workers=4, - instance_type="r6i.16xlarge", + scheduler_instance_type="c6i.xlarge", + worker_instance_type="r6i.16xlarge", # Region for accessing bodo-example-data region="us-east-2", env_vars=env_vars, ) as cluster: with Client(cluster) as client: for _ in range(3): - future = client.submit(get_monthly_travels_weather) + future = client.submit( + get_monthly_travels_weather, + weather_dataset, + hvfhv_dataset, + storage_options=s3_options, + ) total_time = future.result() client.restart() print("Total time for IO and compute:", total_time) if __name__ == "__main__": - main() + hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/" + weather_dataset = "s3://bodo-example-data/nyc-taxi/central_park_weather.csv" + ec2_get_monthly_travels_weather(weather_dataset, hvfhv_dataset) diff --git a/benchmarks/nyc_taxi/local_versions.py b/benchmarks/nyc_taxi/local_versions.py new file mode 100755 index 0000000000..496bdcf93a --- /dev/null +++ b/benchmarks/nyc_taxi/local_versions.py @@ -0,0 +1,92 @@ +"""Run local version of Bodo, Dask, Modin, or PySpark benchmarks. +Accepts a parquet file to use as the High Volume For Hire Vehicle dataset +(defaults to first 5 million rows of the dataset). For an exhaustive list of +files, see s3://bodo-example-data/nyc-taxi/fhvhv. You can also optionally +specify which system to run the benchmark on. The default is Bodo. To run +all systems at once, use run_local.sh. + +usage: + python run_local.py --dataset FILE --system SYSTEM +""" + +import argparse +import os + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config +from nyc_taxi.bodo.nyc_taxi_precipitation import ( + get_monthly_travels_weather as bodo_get_monthly_travels_weather, +) +from nyc_taxi.dask.nyc_taxi_precipitation import ( + local_get_monthly_travels_weather as dask_get_monthly_travels_weather, +) +from nyc_taxi.modin_ray.nyc_taxi_precipitation import ( + local_get_monthly_travels_weather as modin_get_monthly_travels_weather, +) +from nyc_taxi.spark.spark_nyc_taxi_precipitation import ( + get_monthly_travels_weather as spark_get_monthly_travels_weather, +) + +SMALL_DATASET_PATH_S3 = "nyc-taxi/fhvhv_5M_rows.pq" +WEATHER_DATASET_PATH_S3 = "nyc-taxi/central_park_weather.csv" +BUCKET_NAME = "bodo-example-data" + + +def download_data_s3(path_to_s3: str, local_data_dir: str = "data") -> str: + """Download the dataset from S3 if already exists, skip download.""" + file_name = path_to_s3.split("/", -1)[1] + local_path = os.path.join(local_data_dir, file_name) + + if os.path.exists(local_path): + return local_path + + print("Downloading dataset from S3...") + + s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED)) + + if not os.path.exists(local_data_dir): + os.mkdir(local_data_dir) + + s3.download_file(BUCKET_NAME, path_to_s3, local_path) + return local_path + + +def main(hvfhv_path_s3: str, system: str): + weather_path = download_data_s3(WEATHER_DATASET_PATH_S3) + hvfhv_path = download_data_s3(hvfhv_path_s3) + + get_monthly_travels_weather_impls = { + "bodo": bodo_get_monthly_travels_weather, + "dask": dask_get_monthly_travels_weather, + "modin": modin_get_monthly_travels_weather, + "spark": spark_get_monthly_travels_weather, + } + + get_monthly_travels_weather = get_monthly_travels_weather_impls[system] + + print(f"Running {system.capitalize()}...") + get_monthly_travels_weather(weather_path, hvfhv_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--dataset", + "-d", + required=False, + default=SMALL_DATASET_PATH_S3, + help="Path to parquet file to use for local benchmark. Include everything after s3://bodo-example-data.", + ) + parser.add_argument( + "--system", + "-s", + required=False, + default="bodo", + help="System to run benchmark on. Options are bodo, dask, modin or spark.", + ) + args = parser.parse_args() + hvfhv_path_s3 = args.dataset + system = args.system + + main(hvfhv_path_s3, system) diff --git a/benchmarks/nyc_taxi/modin-ray/README.md b/benchmarks/nyc_taxi/modin_ray/README.md similarity index 100% rename from benchmarks/nyc_taxi/modin-ray/README.md rename to benchmarks/nyc_taxi/modin_ray/README.md diff --git a/benchmarks/nyc_taxi/modin-ray/env.yml b/benchmarks/nyc_taxi/modin_ray/env.yml similarity index 100% rename from benchmarks/nyc_taxi/modin-ray/env.yml rename to benchmarks/nyc_taxi/modin_ray/env.yml diff --git a/benchmarks/nyc_taxi/modin-ray/modin-cluster.yaml b/benchmarks/nyc_taxi/modin_ray/modin-cluster.yaml similarity index 99% rename from benchmarks/nyc_taxi/modin-ray/modin-cluster.yaml rename to benchmarks/nyc_taxi/modin_ray/modin-cluster.yaml index 2cfba616b3..a4669a12a0 100644 --- a/benchmarks/nyc_taxi/modin-ray/modin-cluster.yaml +++ b/benchmarks/nyc_taxi/modin_ray/modin-cluster.yaml @@ -173,6 +173,7 @@ setup_commands: - conda activate modin && pip install -U fsspec>=2022.11.0 s3fs boto3 pyopenssl - echo "conda activate modin" >> ~/.bashrc - echo 'export MODIN_RAY_CLUSTER=True' >> ~/.bashrc + - echo 'conda list' # Custom commands that will be run on the head node after common setup. head_setup_commands: diff --git a/benchmarks/nyc_taxi/modin-ray/nyc_taxi_precipitation.py b/benchmarks/nyc_taxi/modin_ray/nyc_taxi_precipitation.py similarity index 74% rename from benchmarks/nyc_taxi/modin-ray/nyc_taxi_precipitation.py rename to benchmarks/nyc_taxi/modin_ray/nyc_taxi_precipitation.py index 8ad6cb48b9..1d38ec19d8 100644 --- a/benchmarks/nyc_taxi/modin-ray/nyc_taxi_precipitation.py +++ b/benchmarks/nyc_taxi/modin_ray/nyc_taxi_precipitation.py @@ -1,33 +1,27 @@ import time -import ray - -ray.init(address="auto") -cpu_count = ray.cluster_resources()["CPU"] -print("RAY CPU COUNT: ", cpu_count) import modin.pandas as pd +import ray from modin.pandas.io import to_ray -def run_modin(): - start = time.time() +def get_monthly_travels_weather(weather_dataset, hvfhv_dataset): + start_read = time.time() central_park_weather_observations = pd.read_csv( - "s3://bodo-example-data/nyc-taxi/central_park_weather.csv", + weather_dataset, parse_dates=["DATE"], - storage_options={"anon": True}, ) central_park_weather_observations = central_park_weather_observations.rename( columns={"DATE": "date", "PRCP": "precipitation"}, copy=False, ) fhvhv_tripdata = pd.read_parquet( - "s3://bodo-example-data/nyc-taxi/fhvhv_tripdata/", - storage_options={"anon": True}, + hvfhv_dataset, ) end = time.time() - print("Reading Time: ", (end - start)) + print("Reading Time: ", (end - start_read)) - start = time.time() + start_compute = time.time() central_park_weather_observations["date"] = central_park_weather_observations[ "date" @@ -90,16 +84,29 @@ def get_time_bucket(t): copy=False, ) end = time.time() - print("Monthly Taxi Travel Times Computation Time: ", end - start) - print(monthly_trips_weather.head()) + print("Monthly Taxi Travel Times Computation Time: ", end - start_compute) - start = time.time() + start_write = time.time() monthly_trips_weather_ray = to_ray(monthly_trips_weather) monthly_trips_weather_ray.write_parquet("local:///tmp/data/modin_result.pq") end = time.time() - print("Writing time:", (end - start)) + print("Writing time:", (end - start_write)) + print("Total E2E time:", (end - start_read)) return monthly_trips_weather +def local_get_monthly_travels_weather(weather_dataset, hvfhv_dataset): + """Run Modin on local Ray cluster""" + ray.init() + get_monthly_travels_weather(weather_dataset, hvfhv_dataset) + ray.shutdown() + + if __name__ == "__main__": - result = run_modin() + ray.init(address="auto") + cpu_count = ray.cluster_resources()["CPU"] + print("RAY CPU COUNT: ", cpu_count) + + weather_dataset = "s3://bodo-example-data/nyc-taxi/central_park_weather.csv" + hvfhv_dataset = "s3://bodo-example-data/nyc-taxi/fhvhv/" + get_monthly_travels_weather(weather_dataset, hvfhv_dataset) diff --git a/benchmarks/nyc_taxi/modin-ray/run_modin.sh b/benchmarks/nyc_taxi/modin_ray/run_modin.sh similarity index 100% rename from benchmarks/nyc_taxi/modin-ray/run_modin.sh rename to benchmarks/nyc_taxi/modin_ray/run_modin.sh diff --git a/benchmarks/nyc_taxi/modin-ray/scale_cluster.py b/benchmarks/nyc_taxi/modin_ray/scale_cluster.py similarity index 100% rename from benchmarks/nyc_taxi/modin-ray/scale_cluster.py rename to benchmarks/nyc_taxi/modin_ray/scale_cluster.py diff --git a/benchmarks/nyc_taxi/run_local.sh b/benchmarks/nyc_taxi/run_local.sh new file mode 100755 index 0000000000..cac584ac92 --- /dev/null +++ b/benchmarks/nyc_taxi/run_local.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# Run local benchmark on all systems. +# Note: uses sleep to ensure resources are reset before running the next system. + +python -m nyc_taxi.local_versions -s dask +sleep 4 + +python -m nyc_taxi.local_versions -s bodo +sleep 4 + +python -m nyc_taxi.local_versions -s modin +sleep 4 + +python -m nyc_taxi.local_versions -s spark + diff --git a/benchmarks/nyc_taxi/spark/spark_nyc_taxi_precipitation.py b/benchmarks/nyc_taxi/spark/spark_nyc_taxi_precipitation.py index bbe271af24..1c721e1903 100644 --- a/benchmarks/nyc_taxi/spark/spark_nyc_taxi_precipitation.py +++ b/benchmarks/nyc_taxi/spark/spark_nyc_taxi_precipitation.py @@ -11,7 +11,7 @@ ) -def get_monthly_travels_weather(): +def get_monthly_travels_weather(weather_dataset, hvfhv_dataset): spark = ( SparkSession.builder.appName("MonthlyTravelsWeather") .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2") @@ -27,7 +27,7 @@ def get_monthly_travels_weather(): # Read in weather data using pandas-on-Spark central_park_weather_observations = ps.read_csv( - "s3a://bodo-example-data/nyc-taxi/central_park_weather.csv", + weather_dataset, ).rename(columns={"DATE": "date", "PRCP": "precipitation"}) central_park_weather_observations["date"] = ps.to_datetime( @@ -36,9 +36,7 @@ def get_monthly_travels_weather(): # Read in trip data using spark, this reads a re-written dataset because spark doesn't support reading the original dataset # due to schema unification issues - fhvhv_tripdata = spark.read.parquet( - "s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/" - ).drop("__index_level_0__") + fhvhv_tripdata = spark.read.parquet(hvfhv_dataset).drop("__index_level_0__") # Convert datetime columns and create necessary features fhvhv_tripdata = ( @@ -113,4 +111,7 @@ def get_time_bucket(t): print("Execution time:", time.time() - start) -get_monthly_travels_weather() +if __name__ == "__main__": + weather_dataset = "s3a://bodo-example-data/nyc-taxi/central_park_weather.csv" + hvfhv_dataset = "s3a://bodo-example-data/nyc-taxi/fhvhv_tripdata_rewrite/" + get_monthly_travels_weather(weather_dataset, hvfhv_dataset)