From ba5116cb36c13485e29c715ae6081d7975ca868b Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Jun 2021 15:01:11 -0700 Subject: [PATCH 1/8] Support passing worker_class to LocalCUDACluster --- dask_cuda/local_cuda_cluster.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dask_cuda/local_cuda_cluster.py b/dask_cuda/local_cuda_cluster.py index dc3c70d13..34e009115 100644 --- a/dask_cuda/local_cuda_cluster.py +++ b/dask_cuda/local_cuda_cluster.py @@ -190,6 +190,7 @@ def __init__( rmm_log_directory=None, jit_unspill=None, log_spilling=False, + worker_class=None, **kwargs, ): # Required by RAPIDS libraries (e.g., cuDF) to ensure no context @@ -306,6 +307,14 @@ def __init__( cuda_device_index=0, ) + if worker_class is not None: + from functools import partial + + worker_class = partial( + LoggedNanny if log_spilling is True else Nanny, + worker_class=worker_class, + ) + super().__init__( n_workers=0, threads_per_worker=threads_per_worker, @@ -314,7 +323,7 @@ def __init__( data=data, local_directory=local_directory, protocol=protocol, - worker_class=LoggedNanny if log_spilling is True else Nanny, + worker_class=worker_class, config={ "ucx": get_ucx_config( enable_tcp_over_ucx=enable_tcp_over_ucx, From 13bebd3d49744a5162d0482d1afd7335c03162c8 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Jun 2021 15:07:24 -0700 Subject: [PATCH 2/8] Add MockWorker util class that can be used for certain tests --- dask_cuda/utils.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 253a41dfc..58ca1875a 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -9,7 +9,7 @@ import pynvml import toolz -from distributed import wait +from distributed import Worker, wait from distributed.utils import parse_bytes try: @@ -530,3 +530,21 @@ def parse_device_memory_limit(device_memory_limit, device_index=0): return parse_bytes(device_memory_limit) else: return int(device_memory_limit) + + +class MockWorker(Worker): + def __init__(self, *args, **kwargs): + import distributed + + distributed.diagnostics.nvml.device_get_count = MockWorker.device_get_count + self._device_get_count = distributed.diagnostics.nvml.device_get_count + super().__init__(*args, **kwargs) + + def __del__(self): + import distributed + + distributed.diagnostics.nvml.device_get_count = self._device_get_count + + @staticmethod + def device_get_count(): + return 0 From 4caa3d27d01301df55928fea0973c25e51683369 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Jun 2021 15:08:31 -0700 Subject: [PATCH 3/8] Use MockWorker class for LocalCUDACluster CUDA_VISIBLE_DEVICES tests --- dask_cuda/tests/test_local_cuda_cluster.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 0237bfaed..fd7cd06b5 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -10,6 +10,7 @@ from dask_cuda import CUDAWorker, LocalCUDACluster, utils from dask_cuda.initialize import initialize +from dask_cuda.utils import MockWorker _driver_version = rmm._cuda.gpu.driverGetVersion() _runtime_version = rmm._cuda.gpu.runtimeGetVersion() @@ -51,10 +52,13 @@ def get_visible_devices(): # than 8 but as long as the test passes the errors can be ignored. @gen_test(timeout=20) async def test_with_subset_of_cuda_visible_devices(): - os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,6,7" + os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,6,8" try: async with LocalCUDACluster( - scheduler_port=0, asynchronous=True, device_memory_limit=1 + scheduler_port=0, + asynchronous=True, + device_memory_limit=1, + worker_class=MockWorker, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert len(cluster.workers) == 4 @@ -71,7 +75,7 @@ def get_visible_devices(): 2, 3, 6, - 7, + 8, } finally: del os.environ["CUDA_VISIBLE_DEVICES"] From 6a9e86b54a699eb7a53e2823ad4524ab48bfb924 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Jun 2021 15:13:21 -0700 Subject: [PATCH 4/8] Add --worker-class support for dask-cuda-worker --- dask_cuda/cli/dask_cuda_worker.py | 12 ++++++++++++ dask_cuda/cuda_worker.py | 2 ++ 2 files changed, 14 insertions(+) diff --git a/dask_cuda/cli/dask_cuda_worker.py b/dask_cuda/cli/dask_cuda_worker.py index 5b62aae73..9c313667a 100755 --- a/dask_cuda/cli/dask_cuda_worker.py +++ b/dask_cuda/cli/dask_cuda_worker.py @@ -8,6 +8,7 @@ from distributed.cli.utils import check_python_3, install_signal_handlers from distributed.preloading import validate_preload_argv from distributed.security import Security +from distributed.utils import import_term from ..cuda_worker import CUDAWorker @@ -248,6 +249,12 @@ ``proxy_object.ProxyObject`` and ``proxify_host_file.ProxifyHostFile`` for more info.""", ) +@click.option( + "--worker-class", + default=None, + help="""Use a different class than Distributed's default (``distributed.Worker``) + to spawn ``distributed.Nanny``.""", +) def main( scheduler, host, @@ -277,6 +284,7 @@ def main( enable_rdmacm, net_devices, enable_jit_unspill, + worker_class, **kwargs, ): if tls_ca_file and tls_cert and tls_key: @@ -293,6 +301,9 @@ def main( "unsupported one. Scheduler address: %s" % scheduler ) + if worker_class is not None: + worker_class = import_term(worker_class) + worker = CUDAWorker( scheduler, host, @@ -320,6 +331,7 @@ def main( enable_rdmacm, net_devices, enable_jit_unspill, + worker_class, **kwargs, ) diff --git a/dask_cuda/cuda_worker.py b/dask_cuda/cuda_worker.py index 557caadda..69f6a39a0 100644 --- a/dask_cuda/cuda_worker.py +++ b/dask_cuda/cuda_worker.py @@ -74,6 +74,7 @@ def __init__( enable_rdmacm=False, net_devices=None, jit_unspill=None, + worker_class=None, **kwargs, ): # Required by RAPIDS libraries (e.g., cuDF) to ensure no context @@ -235,6 +236,7 @@ def del_pid_file(): ) }, data=data(i), + worker_class=worker_class, **kwargs, ) for i in range(nprocs) From f7042dcdcd2e79f09754e176b9620b752906e4c5 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 3 Jun 2021 15:17:32 -0700 Subject: [PATCH 5/8] Use MockWorker class for dask-cuda-worker CUDA_VISIBLE_DEVICES tests --- dask_cuda/tests/test_dask_cuda_worker.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index 9ca9d8efe..e246a5f3f 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -34,6 +34,8 @@ def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 "--nthreads", str(nthreads), "--no-dashboard", + "--worker-class", + "dask_cuda.utils.MockWorker", ] ): with Client("127.0.0.1:9359", loop=loop) as client: From b3d0ae4079d006b078bfc9acfbf60dfa4ce76704 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 7 Jun 2021 08:24:36 -0700 Subject: [PATCH 6/8] Use device 0 first in CUDA_VISIBLE_DEVICES This is done to ensure the Scheduler and Nanny are started on the first device, thus avoiding the need to mock those. --- dask_cuda/tests/test_dask_cuda_worker.py | 4 ++-- dask_cuda/tests/test_local_cuda_cluster.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index e246a5f3f..b653cff5c 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -19,7 +19,7 @@ def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 - os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,7,8" + os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,7,8" nthreads = 4 try: with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): @@ -46,7 +46,7 @@ def get_visible_devices(): # verify 4 workers with the 4 expected CUDA_VISIBLE_DEVICES result = client.run(get_visible_devices) - expected = {"2,3,7,8": 1, "3,7,8,2": 1, "7,8,2,3": 1, "8,2,3,7": 1} + expected = {"0,3,7,8": 1, "3,7,8,0": 1, "7,8,0,3": 1, "8,0,3,7": 1} for v in result.values(): del expected[v] diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index fd7cd06b5..da9cfb053 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -52,7 +52,7 @@ def get_visible_devices(): # than 8 but as long as the test passes the errors can be ignored. @gen_test(timeout=20) async def test_with_subset_of_cuda_visible_devices(): - os.environ["CUDA_VISIBLE_DEVICES"] = "2,3,6,8" + os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,6,8" try: async with LocalCUDACluster( scheduler_port=0, @@ -72,7 +72,7 @@ def get_visible_devices(): assert all(len(v.split(",")) == 4 for v in result.values()) for i in range(4): assert {int(v.split(",")[i]) for v in result.values()} == { - 2, + 0, 3, 6, 8, From 9bc5c09996c03469454a66c3394b0f5ff268a414 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 7 Jun 2021 12:20:15 -0700 Subject: [PATCH 7/8] Use MockWorker for more tests --- dask_cuda/tests/test_local_cuda_cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index da9cfb053..78f975601 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -110,7 +110,7 @@ async def test_ucx_protocol_type_error(): @gen_test(timeout=20) async def test_n_workers(): async with LocalCUDACluster( - CUDA_VISIBLE_DEVICES="0,1", asynchronous=True + CUDA_VISIBLE_DEVICES="0,1", worker_class=MockWorker, asynchronous=True ) as cluster: assert len(cluster.workers) == 2 assert len(cluster.worker_spec) == 2 @@ -125,7 +125,7 @@ async def test_threads_per_worker(): @gen_test(timeout=20) async def test_all_to_all(): async with LocalCUDACluster( - CUDA_VISIBLE_DEVICES="0,1", asynchronous=True + CUDA_VISIBLE_DEVICES="0,1", worker_class=MockWorker, asynchronous=True ) as cluster: async with Client(cluster, asynchronous=True) as client: workers = list(client.scheduler_info()["workers"]) From 5a175f0b20ef1107a563209f119e9dced191fde6 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Mon, 7 Jun 2021 12:20:40 -0700 Subject: [PATCH 8/8] Add docstrings to MockWorker class --- dask_cuda/utils.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index 58ca1875a..dd8554870 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -533,6 +533,13 @@ def parse_device_memory_limit(device_memory_limit, device_index=0): class MockWorker(Worker): + """Mock Worker class preventing NVML from getting used by SystemMonitor. + + By preventing the Worker from initializing NVML in the SystemMonitor, we can + mock test multiple devices in `CUDA_VISIBLE_DEVICES` behavior with single-GPU + machines. + """ + def __init__(self, *args, **kwargs): import distributed