Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Individual CUDA object spilling #451

Merged
merged 64 commits into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
8dc916b
Initial work on highres spilling
madsbk Dec 1, 2020
4e58657
Tracking aliasing device memory objects
madsbk Dec 1, 2020
51c8213
clean up
madsbk Dec 1, 2020
994a8fe
renamed dynamic spilling => object spilling
madsbk Dec 1, 2020
64e02de
removed debugging assert
madsbk Dec 2, 2020
c27b238
Avoid internal comparisons of ProxyObjects
madsbk Dec 2, 2020
bd37c73
obj_pxy_concat(): only proxify when using object spilling
madsbk Dec 2, 2020
64dfdab
Removed use of _obj_pxy_get_meta()
madsbk Dec 2, 2020
40adb1c
sizeof.register_lazy("cupy"): Includes cupy.ndarray handle
madsbk Dec 2, 2020
7addac2
Overwriting the Dask dispatch of Pandas objects
madsbk Dec 3, 2020
7eea004
get_device_memory_objects(): add multi index support
madsbk Dec 3, 2020
c21f62d
Handling make_meta() and get_parallel_type dispatching
madsbk Dec 4, 2020
03fe9b8
Added test of parquet read/write of a proxy object
madsbk Dec 4, 2020
316d0a4
Implemented assignment to a proxied object
madsbk Dec 4, 2020
55eeb79
Renamed __obj_pxy_cache => _obj_pxy_cache
madsbk Dec 4, 2020
597ba34
Added optional arguments to __array__()
madsbk Dec 4, 2020
3b674dc
Implement dispatch of array-concat
madsbk Dec 7, 2020
fb7800d
Implement dispatch of array-tensordot
madsbk Dec 7, 2020
0f6b082
Implement dispatch of array-einsum
madsbk Dec 7, 2020
2e4f627
Implemented unproxify_input_wrapper()
madsbk Dec 8, 2020
3453113
test: _PxyObjTest to use _obj_pxy[]
madsbk Dec 8, 2020
40b294d
Removed ProxyObject.__slots__
madsbk Dec 9, 2020
0631652
proxify(): added subclass=None
madsbk Dec 9, 2020
b10b7fc
Added missing docs
madsbk Dec 9, 2020
314b280
proxify: inherit from cudf._lib.table.Table
madsbk Dec 9, 2020
95be618
Caching device_memory_objects
madsbk Dec 9, 2020
5ad0605
Implemented _obj_pxy_cache_wrapper()
madsbk Dec 10, 2020
c95febe
caching __sizeof__
madsbk Dec 10, 2020
0528028
replacing lists with tuples
madsbk Dec 10, 2020
6f40853
Deserializing CUDA-serialized ProxyObjects in the hostfile
madsbk Dec 10, 2020
eecdfd6
proxify_device_object(ProxyObject): check proxied_id_to_proxy
madsbk Dec 10, 2020
c91b6ee
obj_pxy_cuda_serialize(): shallow copy of obj.
madsbk Dec 11, 2020
bbe7853
Caching __len__()
madsbk Dec 11, 2020
22e71f3
Locking when accessing self.store
madsbk Dec 14, 2020
940e53c
Implemented get_proxied_id_to_proxy()
madsbk Dec 14, 2020
e304d29
Simplified device access info
madsbk Dec 14, 2020
9d422ca
_obj_pxy_serialize(): Avoid doing double work
madsbk Dec 14, 2020
4577a6e
Avoid materializing RangeIndex
madsbk Dec 14, 2020
9e1737c
fixed typo
madsbk Dec 14, 2020
55e80d2
concat_dispatch(): don't set hostfile
madsbk Dec 15, 2020
bd1850c
isort
madsbk Dec 15, 2020
d468ab6
Implemented ProxiesTally
madsbk Dec 15, 2020
44a24c6
Implemented class ProxiesTally.get_unspilled_proxies()
madsbk Dec 15, 2020
d6526e3
ProxiesTally: get its own lock
madsbk Dec 15, 2020
6b37c27
Implemented UnspilledProxies
madsbk Dec 15, 2020
0e7a63c
reformat using correct versions of black and isort
madsbk Dec 16, 2020
fedf8b3
docs
madsbk Dec 16, 2020
055d16d
clean up
madsbk Dec 17, 2020
2ae97f0
Rename: ObjectSpillingHostFile => ProxifyHostFile
madsbk Dec 17, 2020
b8da4dc
removing the object-spilling option
madsbk Dec 17, 2020
17702e1
clean up and docs
madsbk Dec 17, 2020
d25af3a
Fixed typo
madsbk Dec 17, 2020
e21204f
proxify_device_objects(): clean up
madsbk Dec 18, 2020
0609916
get_device_memory_objects(): clean up
madsbk Dec 18, 2020
bcf56e4
Added some typing
madsbk Dec 18, 2020
5b73daf
typo
madsbk Dec 18, 2020
fde45bf
Added CLI help doc for --enable-jit-unspill
madsbk Jan 5, 2021
018db71
test of tensordot and einsum now uses dask.array.
madsbk Jan 5, 2021
bdaeaa6
test_one_item_limit(): testing proxies_tally
madsbk Jan 5, 2021
85d962f
Fixed removal of an "optional"
madsbk Jan 5, 2021
4093550
Fixing typos
madsbk Jan 5, 2021
544417a
rename: _obj_pxy_serialized() => _is_obj_pxy_serialized()
madsbk Jan 5, 2021
d1933d5
removed old TODO list
madsbk Jan 5, 2021
81668b1
test_proxy_object_parquet(): use pyarrow engine
madsbk Jan 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,10 @@
)
@click.option(
"--enable-jit-unspill/--disable-jit-unspill",
default=None, # If not specified, use Dask config
help="Enable just-in-time unspilling",
default=None,
help="Enable just-in-time unspilling. This is experimental and doesn't "
"support memory spilling to disk Please see proxy_object.ProxyObject "
"and proxify_host_file.ProxifyHostFile.",
)
def main(
scheduler,
Expand Down
37 changes: 24 additions & 13 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .proxify_host_file import ProxifyHostFile
from .utils import (
CPUAffinity,
RMMSetup,
Expand Down Expand Up @@ -126,7 +127,6 @@ def del_pid_file():

preload_argv = kwargs.get("preload_argv", [])
kwargs = {"worker_port": None, "listen_address": None}
t = Nanny

if (
not scheduler
Expand Down Expand Up @@ -186,8 +186,29 @@ def del_pid_file():
else:
self.jit_unspill = jit_unspill

if self.jit_unspill:
data = lambda i: (
ProxifyHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that there's no memory_limit here seems to point that there's no capability for host<->disk spilling, is that intended? We also see to be missing local_directory, which will prevent users from running storing things anywhere but the current directory, that seems problematic for certain use cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I now see you added a comment about memory_limit in LocalCUDACluster docs. Can you do the same in

@click.option(
"--enable-jit-unspill/--disable-jit-unspill",
default=None, # If not specified, use Dask config
help="Enable just-in-time unspilling",
?

The local_directory question is still valid though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in this PR we will not support any spilling to disk. I have removed memory_limit and local_directory to make this clear. I don't think local_directory is used for anything else than disk spilling?

I will be up to a future PR to implement disk spilling, which shouldn't be too difficult.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was that local_directory was used for more than just spilling, so I went on a hunt. To be fair, it's still unclear to me even where in the code local_directory is really used, I found one other case where it's used though, when you upload files, see for example https://github.com/dask/distributed/blob/607cfd2ce00edd44c99da3273de0763a426dda7d/distributed/tests/test_worker.py#L176-L202 . That isn't to say this is the only other case, but I couldn't confirm nor deny whether there are other cases besides spilling and file uploading. Maybe @quasiben or @jakirkham would know.

},
)
else:
data = lambda i: (
DeviceHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
},
)

self.nannies = [
t(
Nanny(
scheduler,
scheduler_file=scheduler_file,
nthreads=nthreads,
Expand Down Expand Up @@ -217,17 +238,7 @@ def del_pid_file():
cuda_device_index=i,
)
},
data=(
DeviceHostFile,
{
"device_memory_limit": parse_device_memory_limit(
device_memory_limit, device_index=i
),
"memory_limit": memory_limit,
"local_directory": local_directory,
"jit_unspill": self.jit_unspill,
},
),
data=data(i),
**kwargs,
)
for i in range(nprocs)
Expand Down
25 changes: 6 additions & 19 deletions dask_cuda/device_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@


class DeviceSerialized:
""" Store device object on the host
"""Store device object on the host
This stores a device-side object as
1. A msgpack encodable header
2. A list of `bytes`-like objects (like NumPy arrays)
Expand Down Expand Up @@ -56,7 +56,7 @@ def device_deserialize(header, frames):

@nvtx_annotate("SPILL_D2H", color="red", domain="dask_cuda")
def device_to_host(obj: object) -> DeviceSerialized:
header, frames = serialize(obj, serializers=["dask", "pickle"], on_error="raise")
header, frames = serialize(obj, serializers=("dask", "pickle"), on_error="raise")
return DeviceSerialized(header, frames)


Expand All @@ -76,7 +76,7 @@ def pxy_obj_device_to_host(obj: object) -> proxy_object.ProxyObject:

# Notice, both the "dask" and the "pickle" serializer will
# spill `obj` to main memory.
return proxy_object.asproxy(obj, serializers=["dask", "pickle"])
return proxy_object.asproxy(obj, serializers=("dask", "pickle"))


@nvtx_annotate("SPILL_H2D", color="green", domain="dask_cuda")
Expand All @@ -87,7 +87,7 @@ def pxy_obj_host_to_device(s: proxy_object.ProxyObject) -> object:


class DeviceHostFile(ZictBase):
""" Manages serialization/deserialization of objects.
"""Manages serialization/deserialization of objects.

Three LRU cache levels are controlled, for device, host and disk.
Each level takes care of serializing objects once its limit has been
Expand All @@ -106,16 +106,10 @@ class DeviceHostFile(ZictBase):
implies no spilling to disk.
local_directory: path
Path where to store serialized objects on disk
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).
"""

def __init__(
self,
device_memory_limit=None,
memory_limit=None,
local_directory=None,
jit_unspill=False,
self, device_memory_limit=None, memory_limit=None, local_directory=None,
):
if local_directory is None:
local_directory = dask.config.get("temporary-directory") or os.getcwd()
Expand All @@ -141,14 +135,7 @@ def __init__(

self.device_keys = set()
self.device_func = dict()
if jit_unspill:
self.device_host_func = Func(
pxy_obj_device_to_host, pxy_obj_host_to_device, self.host_buffer
)
else:
self.device_host_func = Func(
device_to_host, host_to_device, self.host_buffer
)
self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)
Expand Down
119 changes: 119 additions & 0 deletions dask_cuda/get_device_memory_objects.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from typing import Any, Set

from dask.sizeof import sizeof
from dask.utils import Dispatch

dispatch = Dispatch(name="get_device_memory_objects")


def get_device_memory_objects(obj: Any) -> Set:
""" Find all CUDA device objects in `obj`

Search through `obj` and find all CUDA device objects, which are objects
that either are known to `dispatch` or implement `__cuda_array_interface__`.

Notice, the CUDA device objects must be hashable.

Parameters
----------
obj: Any
Object to search through

Returns
-------
ret: set
Set of CUDA device memory objects
"""
return set(dispatch(obj))


@dispatch.register(object)
def get_device_memory_objects_default(obj):
if hasattr(obj, "_obj_pxy"):
if obj._obj_pxy["serializers"] is None:
return dispatch(obj._obj_pxy["obj"])
else:
return []
if hasattr(obj, "data"):
return dispatch(obj.data)
if hasattr(obj, "_owner") and obj._owner is not None:
return dispatch(obj._owner)
if hasattr(obj, "__cuda_array_interface__"):
return [obj]
return []


@dispatch.register(list)
@dispatch.register(tuple)
@dispatch.register(set)
@dispatch.register(frozenset)
def get_device_memory_objects_python_sequence(seq):
ret = []
for s in seq:
ret.extend(dispatch(s))
return ret


@dispatch.register(dict)
def get_device_memory_objects_python_dict(seq):
ret = []
for s in seq.values():
ret.extend(dispatch(s))
return ret


@dispatch.register_lazy("cupy")
def get_device_memory_objects_register_cupy():
from cupy.cuda.memory import MemoryPointer

@dispatch.register(MemoryPointer)
def get_device_memory_objects_cupy(obj):
return [obj.mem]


@dispatch.register_lazy("cudf")
def get_device_memory_objects_register_cudf():
import cudf.core.dataframe
import cudf.core.index
import cudf.core.multiindex
import cudf.core.series

@dispatch.register(cudf.core.dataframe.DataFrame)
def get_device_memory_objects_cudf_dataframe(obj):

ret = dispatch(obj._index)
for col in obj._data.columns:
ret += dispatch(col)
return ret

@dispatch.register(cudf.core.series.Series)
def get_device_memory_objects_cudf_series(obj):
return dispatch(obj._index) + dispatch(obj._column)

@dispatch.register(cudf.core.index.RangeIndex)
def get_device_memory_objects_cudf_range_index(obj):
# Avoid materializing RangeIndex. This introduce some inaccuracies
# in total device memory usage but we accept the memory use of
# RangeIndexes are limited.
return []

@dispatch.register(cudf.core.index.Index)
def get_device_memory_objects_cudf_index(obj):
return dispatch(obj._values)

@dispatch.register(cudf.core.multiindex.MultiIndex)
def get_device_memory_objects_cudf_multiindex(obj):
return dispatch(obj._columns)


@sizeof.register_lazy("cupy")
def register_cupy(): # NB: this overwrites dask.sizeof.register_cupy()
import cupy.cuda.memory

@sizeof.register(cupy.cuda.memory.BaseMemory)
def sizeof_cupy_base_memory(x):
return int(x.size)

@sizeof.register(cupy.ndarray)
def sizeof_cupy_ndarray(x):
return int(x.nbytes)
33 changes: 21 additions & 12 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .device_host_file import DeviceHostFile
from .initialize import initialize
from .proxify_host_file import ProxifyHostFile
from .utils import (
CPUAffinity,
RMMSetup,
Expand Down Expand Up @@ -95,7 +96,10 @@ class LocalCUDACluster(LocalCluster):
WARNING: managed memory is currently incompatible with NVLink, trying
to enable both will result in an exception.
jit_unspill: bool
If True, enable just-in-time unspilling (see proxy_object.ProxyObject).
If True, enable just-in-time unspilling. This is experimental and doesn't
support memory spilling to disk. Please see proxy_object.ProxyObject and
proxify_host_file.ProxifyHostFile.


Examples
--------
Expand Down Expand Up @@ -198,17 +202,22 @@ def __init__(
self.jit_unspill = jit_unspill

if data is None:
data = (
DeviceHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.host_memory_limit,
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
"jit_unspill": self.jit_unspill,
},
)
if self.jit_unspill:
data = (
ProxifyHostFile,
{"device_memory_limit": self.device_memory_limit,},
)
else:
data = (
DeviceHostFile,
{
"device_memory_limit": self.device_memory_limit,
"memory_limit": self.host_memory_limit,
"local_directory": local_directory
or dask.config.get("temporary-directory")
or os.getcwd(),
},
)

if enable_tcp_over_ucx or enable_infiniband or enable_nvlink:
if protocol is None:
Expand Down
Loading