Skip to content

Commit

Permalink
renamed dynamic spilling => object spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Dec 1, 2020
1 parent edf8891 commit 6e817d2
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 24 deletions.
8 changes: 4 additions & 4 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@
help="Enable just-in-time unspilling",
)
@click.option(
"--enable-dynamic-spill/--disable-dynamic-spill",
"--enable-object-spilling/--disable-object-spilling",
default=None, # If not specified, use Dask config
help="Enable dynamic spilling",
help="Enable object spilling",
)
def main(
scheduler,
Expand Down Expand Up @@ -229,7 +229,7 @@ def main(
enable_rdmacm,
net_devices,
enable_jit_unspill,
enable_dynamic_spill,
enable_object_spilling,
**kwargs,
):
if tls_ca_file and tls_cert and tls_key:
Expand Down Expand Up @@ -267,7 +267,7 @@ def main(
enable_rdmacm,
net_devices,
enable_jit_unspill,
enable_dynamic_spill,
enable_object_spilling,
**kwargs,
)

Expand Down
12 changes: 6 additions & 6 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from distributed.worker import parse_memory_limit

from .device_host_file import DeviceHostFile
from .dynamic_host_file import DynamicHostFile
from .object_spilling_host_file import ObjectSpillingHostFile
from .initialize import initialize
from .utils import (
CPUAffinity,
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(
enable_rdmacm=False,
net_devices=None,
jit_unspill=None,
dynamic_spill=None,
object_spilling=None,
**kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
Expand Down Expand Up @@ -184,12 +184,12 @@ def del_pid_file():
else:
self.jit_unspill = jit_unspill

if dynamic_spill is None:
self.dynamic_spill = dask.config.get("dynamic-spill", default=False)
if object_spilling is None:
self.object_spilling = dask.config.get("object-spilling", default=False)
else:
self.dynamic_spill = dynamic_spill
self.object_spilling = object_spilling

hostfile = DynamicHostFile if self.dynamic_spill else DeviceHostFile
hostfile = ObjectSpillingHostFile if self.object_spilling else DeviceHostFile

self.nannies = [
Nanny(
Expand Down
12 changes: 6 additions & 6 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from distributed.worker import parse_memory_limit

from .device_host_file import DeviceHostFile
from .dynamic_host_file import DynamicHostFile
from .object_spilling_host_file import ObjectSpillingHostFile
from .initialize import initialize
from .utils import (
CPUAffinity,
Expand Down Expand Up @@ -137,7 +137,7 @@ def __init__(
rmm_pool_size=None,
rmm_managed_memory=False,
jit_unspill=None,
dynamic_spill=None,
object_spilling=None,
**kwargs,
):
# Required by RAPIDS libraries (e.g., cuDF) to ensure no context
Expand Down Expand Up @@ -192,12 +192,12 @@ def __init__(
else:
self.jit_unspill = jit_unspill

if dynamic_spill is None:
self.dynamic_spill = dask.config.get("dynamic-spill", default=False)
if object_spilling is None:
self.object_spilling = dask.config.get("object-spilling", default=False)
else:
self.dynamic_spill = dynamic_spill
self.object_spilling = object_spilling

hostfile = DynamicHostFile if self.dynamic_spill else DeviceHostFile
hostfile = ObjectSpillingHostFile if self.object_spilling else DeviceHostFile

if data is None:
data = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .get_device_memory_objects import get_device_memory_objects


class DynamicHostFile(MutableMapping):
class ObjectSpillingHostFile(MutableMapping):
"""Manages serialization/deserialization of objects.
TODO: Three LRU cache levels are controlled, for device, host and disk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from dask.dataframe.shuffle import shuffle_group
from distributed import Client
import dask_cuda
from dask_cuda.dynamic_host_file import DynamicHostFile
from dask_cuda.object_spilling_host_file import ObjectSpillingHostFile

cupy = pytest.importorskip("cupy")
cupy.cuda.set_allocator(None)
itemsize = cupy.arange(1).nbytes


def test_one_item_limit():
dhf = DynamicHostFile(device_memory_limit=itemsize)
dhf = ObjectSpillingHostFile(device_memory_limit=itemsize)
dhf["k1"] = cupy.arange(1) + 1
dhf["k2"] = cupy.arange(1) + 2

Expand Down Expand Up @@ -48,15 +48,15 @@ def test_one_item_limit():
assert not dhf["k3"][1]._obj_pxy_serialized()


@pytest.mark.parametrize("dynamic_spill", [True, False])
def test_local_cuda_cluster(dynamic_spill):
@pytest.mark.parametrize("object_spilling", [True, False])
def test_local_cuda_cluster(object_spilling):
"""Testing spilling of a proxied cudf dataframe in a local cuda cluster"""
cudf = pytest.importorskip("cudf")
dask_cudf = pytest.importorskip("dask_cudf")

def task(x):
assert isinstance(x, cudf.DataFrame)
if dynamic_spill:
if object_spilling:
# Check that `x` is a proxy object and the proxied DataFrame is serialized
assert type(x) is dask_cuda.proxy_object.ProxyObject
assert x._obj_pxy_get_meta()["serializers"] == ["dask", "pickle"]
Expand All @@ -67,7 +67,7 @@ def task(x):

# Notice, setting `device_memory_limit=1B` to trigger spilling
with dask_cuda.LocalCUDACluster(
n_workers=1, device_memory_limit="1B", dynamic_spill=dynamic_spill
n_workers=1, device_memory_limit="1B", object_spilling=object_spilling
) as cluster:
with Client(cluster):
df = cudf.DataFrame({"a": range(10)})
Expand All @@ -89,7 +89,7 @@ def test_dataframes_share_dev_mem():
# They still share the same underlying device memory
assert view1["a"].data._owner._owner is view2["a"].data._owner._owner

dhf = DynamicHostFile(device_memory_limit=160)
dhf = ObjectSpillingHostFile(device_memory_limit=160)
dhf["v1"] = view1
dhf["v2"] = view2
v1 = dhf["v1"]
Expand Down

0 comments on commit 6e817d2

Please sign in to comment.