Skip to content

Commit

Permalink
Settings and requirements for polars-gpu version
Browse files Browse the repository at this point in the history
  • Loading branch information
wence- committed Sep 26, 2024
1 parent cf31c4d commit a670c67
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 9 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ run-polars-no-env: ## Run Polars benchmarks
rm -rf data/tables/scale-$(SCALE_FACTOR)/*.tbl
python -m queries.polars

.PHONY: run-polars-gpu-no-env
run-polars-gpu-no-env: run-polars-no-env ## Run Polars CPU and GPU benchmarks
RUN_POLARS_GPU=true CUDA_MODULE_LOADING=EAGER python -m queries.polars

.PHONY: run-duckdb data/tables/
run-duckdb: .venv ## Run DuckDB benchmarks
$(VENV_BIN)/python -m queries.duckdb
Expand Down
101 changes: 92 additions & 9 deletions queries/polars/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import pathlib
import tempfile
from functools import partial
from typing import Literal

import polars as pl

Expand Down Expand Up @@ -60,20 +63,100 @@ def get_part_supp_ds() -> pl.LazyFrame:
return _scan_ds("partsupp")


def _preload_engine(engine):
with tempfile.TemporaryDirectory() as tmpdir:
# GPU engine has one-time lazy-loaded cost in IO, which we
# remove from timings here.
f = pathlib.Path(tmpdir) / "test.pq"
df = pl.DataFrame({"a": [1]})
df.write_parquet(f)
pl.scan_parquet(f).collect(engine=engine)


def obtain_engine_config() -> pl.GPUEngine | Literal["cpu"]:
if not settings.run.polars_gpu:
return "cpu"
import cudf_polars
import rmm
from cudf_polars.callback import set_device
from packaging import version

if version.parse(cudf_polars.__version__) < version.Version("24.10"):
import cudf._lib.pylibcudf as plc
else:
import pylibcudf as plc

device = settings.run.polars_gpu_device
mr_type = settings.run.use_rmm_mr
with set_device(device):
# Must make sure to create memory resource on the requested device
free_memory, _ = rmm.mr.available_device_memory()
# Pick an initial pool of around 80% of the free device
# memory, must be multiple of 256
initial_pool_size = 256 * (int(free_memory * 0.8) // 256)
if mr_type == "cuda":
mr = rmm.mr.CudaMemoryResource()
elif mr_type == "cuda-pool":
mr = rmm.mr.PoolMemoryResource(
rmm.mr.CudaMemoryResource(), initial_pool_size=initial_pool_size
)
elif mr_type == "cuda-async":
mr = rmm.mr.CudaAsyncMemoryResource(initial_pool_size=initial_pool_size)
elif mr_type == "managed":
mr = rmm.mr.ManagedMemoryResource()
elif mr_type == "managed-pool":
mr = rmm.mr.PrefetchResourceAdaptor(
rmm.mr.PoolMemoryResource(
rmm.mr.ManagedMemoryResource(), initial_pool_size=initial_pool_size
)
)
else:
msg = "Unknown memory resource type"
raise RuntimeError(msg)
if mr_type in ("managed", "managed-pool"):
for typ in [
"column_view::get_data",
"mutable_column_view::get_data",
"gather",
"hash_join",
]:
plc.experimental.enable_prefetching(typ)

return pl.GPUEngine(device=device, memory_resource=mr, raise_on_fail=True)


def run_query(query_number: int, lf: pl.LazyFrame) -> None:
streaming = settings.run.polars_streaming
eager = settings.run.polars_eager
gpu = settings.run.polars_gpu

if (eager or streaming) and gpu:
msg = "polars-gpu engine does not support eager or streaming"
raise ValueError(msg)
if settings.run.polars_show_plan:
print(lf.explain(streaming=streaming, optimized=eager))

query = partial(lf.collect, streaming=streaming, no_optimization=eager)

library_name = "polars" if not eager else "polars-eager"
run_query_generic(
query,
query_number,
library_name,
library_version=pl.__version__,
query_checker=check_query_result_pl,
engine = obtain_engine_config()
# Eager load engine backend, so we don't time that.
_preload_engine(engine)
query = partial(
lf.collect, streaming=streaming, no_optimization=eager, engine=engine
)

if gpu:
library_name = f"polars-gpu-{settings.run.use_rmm_mr}"
elif eager:
library_name = "polars-eager"
else:
library_name = "polars"

try:
run_query_generic(
query,
query_number,
library_name,
library_version=pl.__version__,
query_checker=check_query_result_pl,
)
except Exception as e:
print(f"q{query_number} FAILED\n{e}")
6 changes: 6 additions & 0 deletions requirements-polars-gpu.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1.8.2 introduces plan optimisations that produce inequality joins
# that are not yet implemented in cudf-polars
polars[gpu]<1.8.2
packaging

-r requirements-polars-only.txt
12 changes: 12 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class Run(BaseSettings):
polars_show_plan: bool = False
polars_eager: bool = False
polars_streaming: bool = False
polars_gpu: bool = False # Use GPU engine?
polars_gpu_device: int = 0 # The GPU device to run on for polars GPU
# Which style of GPU memory resource to use
# cuda -> cudaMalloc
# cuda-pool -> Pool suballocator wrapped around cudaMalloc
# managed -> cudaMallocManaged
# managed-pool -> Pool suballocator wrapped around cudaMallocManaged
# cuda-async -> cudaMallocAsync (comes with pool)
# See https://docs.rapids.ai/api/rmm/stable/ for details on RMM memory resources
use_rmm_mr: Literal[
"cuda", "cuda-pool", "managed", "managed-pool", "cuda-async"
] = "cuda-async"

modin_memory: int = 8_000_000_000 # Tune as needed for optimal performance

Expand Down

0 comments on commit a670c67

Please sign in to comment.