From 0e10478ee9a69da2d064d56d20bb142d1fd49a06 Mon Sep 17 00:00:00 2001 From: Aamir Shafi Date: Mon, 25 Jan 2021 22:06:56 -0500 Subject: [PATCH 1/3] Adding MPI-based communication backend. This uses the MVAPICH2-GDR (http://mvapich.cse.ohio-state.edu) library. There are couple of benchmarks added to showcase this device too. Details on the communication device can be seen in https://arxiv.org/abs/2101.08878. --- dask-apps/cudf_merge_mpi.py | 594 ++++++++++++++++++++++++++ dask-apps/cupy_sum_mpi.py | 91 ++++ distributed/comm/__init__.py | 1 + distributed/comm/mpi.py | 781 +++++++++++++++++++++++++++++++++++ 4 files changed, 1467 insertions(+) create mode 100644 dask-apps/cudf_merge_mpi.py create mode 100644 dask-apps/cupy_sum_mpi.py create mode 100644 distributed/comm/mpi.py diff --git a/dask-apps/cudf_merge_mpi.py b/dask-apps/cudf_merge_mpi.py new file mode 100644 index 0000000000..d1a1db25be --- /dev/null +++ b/dask-apps/cudf_merge_mpi.py @@ -0,0 +1,594 @@ +""" + +Taken from: https://github.com/rapidsai/dask-cuda/blob/branch-0.18/dask_cuda/benchmarks/local_cudf_merge.py + +This is the cuDF merge benchmarking application. It has been +modified to work for the MVAPICH2-based (http://mvapich.cse.ohio-state.edu) +communication backend for the Dask Distributed library using the +dask-mpi package. + +""" + +import math +from collections import defaultdict +from time import perf_counter as clock + +import numpy + +from dask.base import tokenize +from dask.dataframe.core import new_dd_object +from dask.distributed import Client, performance_report, wait +from dask.utils import format_bytes, format_time, parse_bytes +from dask_mpi import initialize + +import argparse +import os + +from dask.distributed import SSHCluster + +#adjust as per compute system +GPUS_PER_NODE = 1 # number of GPUs in the system +DASK_INTERFACE = 'ib0' # interface to use for communication +THREADS_PER_NODE = 28 # number of threads per node. + +rank = os.environ['MV2_COMM_WORLD_LOCAL_RANK'] +device_id = int(rank) % GPUS_PER_NODE +os.environ["CUDA_VISIBLE_DEVICES"]=str(device_id) + +#from dask_cuda.local_cuda_cluster import LocalCUDACluster + +#from dask_cuda import explicit_comms +#from utils import ( +# get_cluster_options, +# get_scheduler_workers, +# parse_benchmark_args, +# setup_memory_pool, +#) + +# Benchmarking cuDF merge operation based on +# + +def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): + #print("generate_chunk") + # Setting a seed that triggers max amount of comm in the two-GPU case. + if gpu: + import cupy as xp + + import cudf as xdf + else: + import numpy as xp + import pandas as xdf + + xp.random.seed(2 ** 32 - 1) + + chunk_type = chunk_type or "build" + frac_match = frac_match or 1.0 + if chunk_type == "build": + # Build dataframe + # + # "key" column is a unique sample within [0, local_size * num_chunks) + # + # "shuffle" column is a random selection of partitions (used for shuffle) + # + # "payload" column is a random permutation of the chunk_size + + start = local_size * i_chunk + stop = start + local_size + + parts_array = xp.arange(num_chunks, dtype="int64") + suffle_array = xp.repeat(parts_array, math.ceil(local_size / num_chunks)) + + df = xdf.DataFrame( + { + "key": xp.arange(start, stop=stop, dtype="int64"), + "shuffle": xp.random.permutation(suffle_array)[:local_size], + "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), + } + ) + else: + #print("chunk type is other") + # Other dataframe + # + # "key" column matches values from the build dataframe + # for a fraction (`frac_match`) of the entries. The matching + # entries are perfectly balanced across each partition of the + # "base" dataframe. + # + # "payload" column is a random permutation of the chunk_size + + # Step 1. Choose values that DO match + sub_local_size = local_size // num_chunks + sub_local_size_use = max(int(sub_local_size * frac_match), 1) + arrays = [] + for i in range(num_chunks): + bgn = (local_size * i) + (sub_local_size * i_chunk) + end = bgn + sub_local_size + ar = xp.arange(bgn, stop=end, dtype="int64") + arrays.append(xp.random.permutation(ar)[:sub_local_size_use]) + key_array_match = xp.concatenate(tuple(arrays), axis=0) + + # Step 2. Add values that DON'T match + missing_size = local_size - key_array_match.shape[0] + start = local_size * num_chunks + local_size * i_chunk + stop = start + missing_size + key_array_no_match = xp.arange(start, stop=stop, dtype="int64") + + # Step 3. Combine and create the final dataframe chunk (dask_cudf partition) + key_array_combine = xp.concatenate( + (key_array_match, key_array_no_match), axis=0 + ) + + df = xdf.DataFrame( + { + "key": xp.random.permutation(key_array_combine), + "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), + } + ) + return df + + +def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): + + parts = [chunk_size for i in range(num_chunks)] + device_type = True if args.type == "gpu" else False + meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) + divisions = [None] * (len(parts) + 1) + + name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) + + graph = { + (name, i): ( + generate_chunk, + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) + for i, part in enumerate(parts) + } + + ddf = new_dd_object(graph, name, meta, divisions) + + if chunk_type == "build": + if not args.no_shuffle: + divisions = [i for i in range(num_chunks)] + [num_chunks] + return ddf.set_index("shuffle", divisions=tuple(divisions)) + else: + del ddf["shuffle"] + + return ddf + + +def merge(args, ddf1, ddf2, write_profile): + #print("merge called") + # Lazy merge/join operation + ddf_join = ddf1.merge(ddf2, on=["key"], how="inner") + if args.set_index: + ddf_join = ddf_join.set_index("key") + + # Execute the operations to benchmark + if write_profile is not None: + with performance_report(filename=args.profile): + t1 = clock() + wait(ddf_join.persist()) + took = clock() - t1 + else: + t1 = clock() + wait(ddf_join.persist()) + took = clock() - t1 + return took + + +#def merge_explicit_comms(args, ddf1, ddf2): +# t1 = clock() +# wait(explicit_comms.dataframe_merge(ddf1, ddf2, on="key").persist()) +# took = clock() - t1 +# return took + + +def run(client, args, n_workers, write_profile=None): + # Generate random Dask dataframes + ddf_base = get_random_ddf( + args.chunk_size, n_workers, args.frac_match, "build", args + ).persist() + + ddf_other = get_random_ddf( + args.chunk_size, n_workers, args.frac_match, "other", args + ).persist() + + wait(ddf_base) + wait(ddf_other) + client.wait_for_workers(n_workers) + + assert len(ddf_base.dtypes) == 2 + assert len(ddf_other.dtypes) == 2 + data_processed = len(ddf_base) * sum([t.itemsize for t in ddf_base.dtypes]) + data_processed += len(ddf_other) * sum([t.itemsize for t in ddf_other.dtypes]) + + if args.backend == "dask": + took = merge(args, ddf_base, ddf_other, write_profile) + else: + took = None#merge_explicit_comms(args, ddf_base, ddf_other) + + return (data_processed, took) + + +def main(args): + cluster_options = get_cluster_options(args) + Cluster = cluster_options["class"] + cluster_args = cluster_options["args"] + cluster_kwargs = cluster_options["kwargs"] + scheduler_addr = cluster_options["scheduler_addr"] + + if args.sched_addr: + initialize( + interface=DASK_INTERFACE, + protocol=args.protocol, + nanny=False, + nthreads=THREADS_PER_NODE + ) + import time + time.sleep(5) + + client = Client() + print(client) + else: + cluster = Cluster(*cluster_args, **cluster_kwargs) + if args.multi_node: + import time + + # Allow some time for workers to start and connect to scheduler + # TODO: make this a command-line argument? + time.sleep(15) + + client = Client(scheduler_addr if args.multi_node else cluster) + + if args.type == "gpu": + client.run(setup_memory_pool, disable_pool=args.no_rmm_pool) + # Create an RMM pool on the scheduler due to occasional deserialization + # of CUDA objects. May cause issues with InfiniBand otherwise. + client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool) + + scheduler_workers = client.run_on_scheduler(get_scheduler_workers) + n_workers = len(scheduler_workers) + + took_list = [] + for _ in range(args.runs - 1): + took_list.append(run(client, args, n_workers, write_profile=None)) + took_list.append( + run(client, args, n_workers, write_profile=args.profile) + ) # Only profiling the last run + + # Collect, aggregate, and print peer-to-peer bandwidths + incoming_logs = client.run(lambda dask_worker: dask_worker.incoming_transfer_log) + bandwidths = defaultdict(list) + total_nbytes = defaultdict(list) + for k, L in incoming_logs.items(): + for d in L: + if d["total"] >= args.ignore_size: + bandwidths[k, d["who"]].append(d["bandwidth"]) + total_nbytes[k, d["who"]].append(d["total"]) + bandwidths = { + (scheduler_workers[w1].name, scheduler_workers[w2].name): [ + "%s/s" % format_bytes(x) for x in numpy.quantile(v, [0.25, 0.50, 0.75]) + ] + for (w1, w2), v in bandwidths.items() + } + total_nbytes = { + (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb)) + for (w1, w2), nb in total_nbytes.items() + } + + t_runs = numpy.empty(len(took_list)) + if args.markdown: + print("```") + print("Merge benchmark") + print("-------------------------------") + print(f"backend | {args.backend}") + print(f"merge type | {args.type}") + print(f"rows-per-chunk | {args.chunk_size}") + print(f"protocol | {args.protocol}") + print(f"device(s) | {args.devs}") + print(f"rmm-pool | {(not args.no_rmm_pool)}") + print(f"frac-match | {args.frac_match}") + if args.protocol == "ucx": + print(f"tcp | {args.enable_tcp_over_ucx}") + print(f"ib | {args.enable_infiniband}") + print(f"nvlink | {args.enable_nvlink}") + print(f"data-processed | {format_bytes(took_list[0][0])}") + print("===============================") + print("Wall-clock | Throughput") + print("-------------------------------") + for idx, (data_processed, took) in enumerate(took_list): + throughput = int(data_processed / took) + m = format_time(took) + m += " " * (15 - len(m)) + print(f"{m}| {format_bytes(throughput)}/s") + t_runs[idx] = float(format_bytes(throughput).split(" ")[0]) + print("===============================") + if args.markdown: + print("\n```") + + #if args.plot is not None: + # plot_benchmark(t_runs, args.plot, historical=True) + + if args.backend == "dask": + if args.markdown: + print("
\nWorker-Worker Transfer Rates\n\n```") + print("(w1,w2) | 25% 50% 75% (total nbytes)") + print("-------------------------------") + for (d1, d2), bw in sorted(bandwidths.items()): + fmt = ( + "(%s,%s) | %s %s %s (%s)" + if args.multi_node or args.sched_addr + else "(%02d,%02d) | %s %s %s (%s)" + ) + print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) + if args.markdown: + print("```\n
\n") + + if args.multi_node: + client.shutdown() + client.close() + + +def parse_args(): + special_args = [ + { + "name": ["-b", "--backend",], + "choices": ["dask", "explicit-comms"], + "default": "dask", + "type": str, + "help": "The backend to use.", + }, + { + "name": ["-t", "--type",], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Do merge with GPU or CPU dataframes", + }, + { + "name": ["-c", "--chunk-size",], + "default": 1_000_000, + "metavar": "n", + "type": int, + "help": "Chunk size (default 1_000_000)", + }, + { + "name": "--ignore-size", + "default": "1 MiB", + "metavar": "nbytes", + "type": parse_bytes, + "help": "Ignore messages smaller than this (default '1 MB')", + }, + { + "name": "--frac-match", + "default": 0.3, + "type": float, + "help": "Fraction of rows that matches (default 0.3)", + }, + { + "name": "--no-shuffle", + "action": "store_true", + "help": "Don't shuffle the keys of the left (base) dataframe.", + }, + { + "name": "--markdown", + "action": "store_true", + "help": "Write output as markdown", + }, + {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, + { + "name": ["-s", "--set-index",], + "action": "store_true", + "help": "Call set_index on the key column to sort the joined dataframe.", + }, + ] + + return parse_benchmark_args( + description="Distributed merge (dask/cudf) benchmark", args_list=special_args + ) + +def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]): + parser = argparse.ArgumentParser(description=description) + parser.add_argument( + "-d", "--devs", default="0", type=str, help='GPU devices to use (default "0").' + ) + parser.add_argument( + "-p", + "--protocol", + choices=["tcp", "ucx", "mpi"], + default="tcp", + type=str, + help="The communication protocol to use.", + ) + parser.add_argument( + "--profile", + metavar="PATH", + default=None, + type=str, + help="Write dask profile report (E.g. dask-report.html)", + ) + parser.add_argument( + "--no-rmm-pool", action="store_true", help="Disable the RMM memory pool" + ) + parser.add_argument( + "--enable-tcp-over-ucx", + action="store_true", + dest="enable_tcp_over_ucx", + help="Enable tcp over ucx.", + ) + parser.add_argument( + "--enable-infiniband", + action="store_true", + dest="enable_infiniband", + help="Enable infiniband over ucx.", + ) + parser.add_argument( + "--enable-nvlink", + action="store_true", + dest="enable_nvlink", + help="Enable NVLink over ucx.", + ) + parser.add_argument( + "--disable-tcp-over-ucx", + action="store_false", + dest="enable_tcp_over_ucx", + help="Disable tcp over ucx.", + ) + parser.add_argument( + "--disable-infiniband", + action="store_false", + dest="enable_infiniband", + help="Disable infiniband over ucx.", + ) + parser.add_argument( + "--disable-nvlink", + action="store_false", + dest="enable_nvlink", + help="Disable NVLink over ucx.", + ) + parser.add_argument( + "--ucx-net-devices", + default=None, + type=str, + help="The device to be used for UCX communication, or 'auto'. " + "Ignored if protocol is 'tcp'", + ) + parser.add_argument( + "--single-node", + action="store_true", + dest="multi_node", + help="Runs a single-node cluster on the current host.", + ) + parser.add_argument( + "--multi-node", + action="store_true", + dest="multi_node", + help="Runs a multi-node cluster on the hosts specified by --hosts.", + ) + parser.add_argument( + "--scheduler-address", + default="Not Needed", # MPI4Dask related modification + type=str, + dest="sched_addr", + help="Scheduler Address -- assumes cluster is created outside of benchmark.", + ) + parser.add_argument( + "--hosts", + default=None, + type=str, + help="Specifies a comma-separated list of IP addresses or hostnames. " + "The list begins with the host where the scheduler will be launched " + "followed by any number of workers, with a minimum of 1 worker. " + "Requires --multi-node, ignored otherwise. " + "Usage example: --multi-node --hosts 'dgx12,dgx12,10.10.10.10,dgx13' . " + "In the example, the benchmark is launched with scheduler on host " + "'dgx12' (first in the list), and workers on three hosts being 'dgx12', " + "'10.10.10.10', and 'dgx13'. " + "Note: --devs is currently ignored in multi-node mode and for each host " + "one worker per GPU will be launched.", + ) + + for args in args_list: + name = args.pop("name") + if not isinstance(name, list): + name = [name] + parser.add_argument(*name, **args) + + parser.set_defaults( + enable_tcp_over_ucx=True, enable_infiniband=True, enable_nvlink=True + ) + args = parser.parse_args() + + if args.protocol == "tcp": + args.enable_tcp_over_ucx = False + args.enable_infiniband = False + args.enable_nvlink = False + + if args.multi_node and len(args.hosts.split(",")) < 2: + raise ValueError("--multi-node requires at least 2 hosts") + + return args + + +def get_cluster_options(args): + #print("get_cluster_options") + + if args.multi_node is True: + + #print("get_cluster_options: args.multi_node") + + Cluster = SSHCluster + cluster_args = [args.hosts.split(",")] + scheduler_addr = args.protocol + "://" + cluster_args[0][0] + ":8786" + + worker_options = {} + + # This looks counterintuitive but adding the variable name with + # an empty string is how we can enable CLI booleans currently, + # note that SSHCluster uses the dask-cuda-worker CLI. + if args.enable_tcp_over_ucx: + worker_options["enable_tcp_over_ucx"] = "" + if args.enable_nvlink: + worker_options["enable_nvlink"] = "" + if args.enable_infiniband: + worker_options["enable_infiniband"] = "" + + if args.ucx_net_devices: + worker_options["ucx_net_devices"] = args.ucx_net_devices + + cluster_kwargs = { + "connect_options": {"known_hosts": None}, + "scheduler_options": {"protocol": args.protocol}, + "worker_module": "dask_cuda.dask_cuda_worker", + "worker_options": worker_options, + # "n_workers": len(args.devs.split(",")), + # "CUDA_VISIBLE_DEVICES": args.devs, + } + else: + #print("get_cluster_options: else") + #Cluster = LocalCUDACluster + Cluster = None + scheduler_addr = None + cluster_args = [] + cluster_kwargs = { + "protocol": args.protocol, + "n_workers": len(args.devs.split(",")), + "CUDA_VISIBLE_DEVICES": args.devs, + "ucx_net_devices": args.ucx_net_devices, + "enable_tcp_over_ucx": args.enable_tcp_over_ucx, + "enable_infiniband": args.enable_infiniband, + "enable_nvlink": args.enable_nvlink, + } + + #print("returning cluster object") + return { + "class": Cluster, + "args": cluster_args, + "kwargs": cluster_kwargs, + "scheduler_addr": scheduler_addr, + } + + +def get_scheduler_workers(dask_scheduler=None): + return dask_scheduler.workers + + +def setup_memory_pool(pool_size=None, disable_pool=False): + import cupy + + os.environ['RMM_NO_INITIALIZE'] = 'True' + import rmm + + rmm.reinitialize( + pool_allocator=not disable_pool, devices=0, initial_pool_size=pool_size, + ) + cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) + +if __name__ == "__main__": + main(parse_args()) + diff --git a/dask-apps/cupy_sum_mpi.py b/dask-apps/cupy_sum_mpi.py new file mode 100644 index 0000000000..d2d9c88caf --- /dev/null +++ b/dask-apps/cupy_sum_mpi.py @@ -0,0 +1,91 @@ +""" +This is the sum of cuPy array and its transpose application. It has been +modified to work for the MVAPICH2-based (http://mvapich.cse.ohio-state.edu) +communication backend for the Dask Distributed library using the +dask-mpi package. + +""" + +import os +import asyncio +from collections import defaultdict +from time import perf_counter as clock + +import dask.array as da +from dask.distributed import Client, performance_report, wait +from dask.utils import format_bytes, format_time, parse_bytes +from dask_mpi import initialize + +import cupy +import numpy +import time +import sys + +#adjust as per compute system +GPUS_PER_NODE = 1 # number of GPUs in the system +RUNS = 20 # repititions for the benchmark +DASK_INTERFACE = 'ib0' # interface to use for communication +DASK_PROTOCOL = 'mpi' # protocol for Dask Distributed. Options include ['mpi', 'tcp'] +THREADS_PER_NODE = 28 # number of threads per node. + +rank = os.environ['MV2_COMM_WORLD_LOCAL_RANK'] +device_id = int(rank) % GPUS_PER_NODE +os.environ["CUDA_VISIBLE_DEVICES"]=str(device_id) + +async def run(): + async with Client(asynchronous=True) as client: + + print(client) + + scheduler_workers = await client.run_on_scheduler(get_scheduler_workers) + + took_list = [] + + for i in range(RUNS): + start = time.time() + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + a = rs.normal(100, 1, (int(30000), int(30000)), \ + chunks=(int(1000), int(1000))) + x = a + a.T + + xx = await client.compute(x) + + duration = time.time() - start + took_list.append( (duration, a.npartitions) ) + print("Time for iteration", i, ":", duration) + sys.stdout.flush() + + sums = [] + + for (took, npartitions) in took_list: + sums.append(took) + t = format_time(took) + t += " " * (11 - len(t)) + + print ("Average Wall-clock Time: ", format_time(sum(sums)/len(sums))) + + # Collect, aggregate, and print peer-to-peer bandwidths + incoming_logs = await client.run( + lambda dask_worker: dask_worker.incoming_transfer_log + ) + + outgoing_logs = await client.run( + lambda dask_worker: dask_worker.outgoing_transfer_log + ) + + +def get_scheduler_workers(dask_scheduler=None): + return dask_scheduler.workers + +if __name__ == "__main__": + + initialize( + interface=DASK_INTERFACE, + protocol=DASK_PROTOCOL, + nanny=False, + nthreads=THREADS_PER_NODE + ) + + time.sleep(5) + + asyncio.run(run()) diff --git a/distributed/comm/__init__.py b/distributed/comm/__init__.py index 2ff679ada3..610b7129c4 100644 --- a/distributed/comm/__init__.py +++ b/distributed/comm/__init__.py @@ -14,6 +14,7 @@ def _register_transports(): + from . import mpi from . import inproc from . import tcp diff --git a/distributed/comm/mpi.py b/distributed/comm/mpi.py new file mode 100644 index 0000000000..1c31a7a89b --- /dev/null +++ b/distributed/comm/mpi.py @@ -0,0 +1,781 @@ +""" +This is an MVAPICH2-based (http://mvapich.cse.ohio-state.edu) communication +backend for the Dask Distributed library. + +This is based on the UCX device. UCX: https://github.com/openucx/ucx +""" + +import logging +import struct +import weakref + +import collections +import dask +import asyncio +import socket + +import random +import time +import sys + +import rmm + +from .addressing import parse_host_port, unparse_host_port +from .core import Comm, Connector, Listener, CommClosedError +from .registry import Backend, backends +from .utils import ensure_concrete_host, to_frames, from_frames +from ..utils import ( + ensure_ip, + get_ip, + get_ipv6, + nbytes, + log_errors, + CancelledError, + parse_bytes, +) + +from mpi4py import MPI + +logger = logging.getLogger(__name__) + +host_array = None +device_array = None +INITIAL_TAG_OFFSET = 100 +TAG_QUOTA_PER_CONNECTION = 200 + +# workers and the scheduler randomly select ports for listening to +# incoming connections between this range +LOWER_PORT_RANGE=30000 +UPPER_PORT_RANGE=40000 + +# This constant is used to split large message into chunks of this size +CHUNK_SIZE = 2 ** 30 + +initialized = False + +# The tag_table dictionary is used to maintain 'tag offset' for a pair of +# processes. Imagine a Dask program executing with 4 MPI processes - a scheduler, +# a client, and 2 workers. The MPI.COMM_WORLD communicator is used by all for +# exchanging messages. However, during the connection establishment, all processes +# need to make sure that they have a unique tag range to use with MPI.COMM_WORLD +# for a pair of processes. Otherwise there is a possibility of message interference +# because processes connect with one another dynamically and also there are +# control and data channels. This situation is avoided using this dictionary +# of tags. +tag_table = dict() + +def synchronize_stream(stream=0): + import numba.cuda + + ctx = numba.cuda.current_context() + cu_stream = numba.cuda.driver.drvapi.cu_stream(stream) + stream = numba.cuda.driver.Stream(ctx, cu_stream, None) + stream.synchronize() + +def init_tag_table(): + """ this function initializes tag_table dictionary.""" + global tag_table, INITIAL_TAG_OFFSET + rank = MPI.COMM_WORLD.Get_rank() + size = MPI.COMM_WORLD.Get_size() + + # each process maintains an entry for every other process in the tag_table + for peer in range(size): + if rank != peer: + tag_table.update({peer : INITIAL_TAG_OFFSET}) + + logger.debug(tag_table) + +def init_once(): + global initialized, host_array, device_array + + if initialized is True: + return + + initialized = True + + random.seed(random.randrange(sys.maxsize)) + + name = MPI.Get_processor_name() + init_tag_table() + logger.debug("rank=%s, name=%s", MPI.COMM_WORLD.Get_rank(), name) + + # Find the function, `host_array()`, to use when allocating new host arrays + try: + import numpy + + host_array = lambda n: numpy.empty((n,), dtype="u1") + except ImportError: + host_array = lambda n: bytearray(n) + + # Find the function, `cuda_array()`, to use when allocating new CUDA arrays + try: + import rmm + + if hasattr(rmm, "DeviceBuffer"): + device_array = lambda n: rmm.DeviceBuffer(size=n) + else: # pre-0.11.0 + import numba.cuda + + def rmm_device_array(n): + a = rmm.device_array(n, dtype="u1") + weakref.finalize(a, numba.cuda.current_context) + return a + + device_array = rmm_device_array + except ImportError: + try: + import numba.cuda + + def numba_device_array(n): + a = numba.cuda.device_array((n,), dtype="u1") + weakref.finalize(a, numba.cuda.current_context) + return a + + device_array = numba_device_array + except ImportError: + + def device_array(n): + raise RuntimeError( + "In order to send/recv CUDA arrays, Numba or RMM is required" + ) + + pool_size_str = dask.config.get("rmm.pool-size") + if pool_size_str is not None: + pool_size = parse_bytes(pool_size_str) + rmm.reinitialize( + pool_allocator=True, managed_memory=False, initial_pool_size=pool_size + ) + +class MPI4Dask(Comm): + """Comm object using MPI. + + Parameters + ---------- + peer_rank : int + The MPI rank of the peer process + suggested_tag : int + The suggested tag offet to avoid message interference + local_addr : str + The local address, prefixed with `mpi://` + peer_addr : str + The peer address, prefixed with `mpi://` + p_addr : str + The peer address, prefixed with `mpi://` + deserialize : bool, default True + Whether to deserialize data in :meth:`distributed.protocol.loads` + + Notes + ----- + The read-write cycle uses the following pattern: + + Each msg is serialized into a number of "data" frames. We prepend these + real frames with two additional frames + + 1. is_gpus: Boolean indicator for whether the frame should be + received into GPU memory. Packed in '?' format. Unpack with + ``?`` format. + 2. frame_size : Unsigned int describing the size of frame (in bytes) + to receive. Packed in 'Q' format, so a length-0 frame is equivalent + to an unsized frame. Unpacked with ``Q``. + + The expected read cycle is + + 1. Read the frame describing if connection is closing and number of frames + 2. Read the frame describing whether each data frame is gpu-bound + 3. Read the frame describing whether each data frame is sized + 4. Read all the data frames. + """ + + def __init__(self, peer_rank: int, suggested_tag: int, local_addr: str, \ + peer_addr: str, p_addr: str, deserialize=True): + + Comm.__init__(self) + self._suggested_tag = suggested_tag + self._peer_rank = peer_rank + if local_addr: + assert local_addr.startswith("mpi") + assert peer_addr.startswith("mpi") + self._local_addr = local_addr + self._peer_addr = peer_addr + self._p_addr = p_addr + self.deserialize = deserialize + self._closed = False + logger.debug("MPI4Dask.__init__ %s", self) + + @property + def suggested_tag(self) -> int: + return self._suggested_tag + + @property + def peer_rank(self) -> int: + return self._peer_rank + + @property + def local_address(self) -> str: + return self._local_addr + + @property + def peer_address(self) -> str: + return self._peer_addr + + async def write( + self, + msg: dict, + serializers=("cuda", "dask", "pickle", "error"), + on_error: str = "message", + ): + logger.debug("write() function is called") + with log_errors(): + if self.closed(): + raise CommClosedError("Endpoint is closed -- unable to send message") + try: + if serializers is None: + serializers = ("cuda", "dask", "pickle", "error") + # msg can also be a list of dicts when sending batched messages + frames = await to_frames( + msg, + serializers=serializers, + on_error=on_error, + allow_offload=self.allow_offload, + ) + nframes = len(frames) + cuda_frames = tuple( + hasattr(f, "__cuda_array_interface__") for f in frames + ) + sizes = tuple(nbytes(f) for f in frames) + cuda_send_frames, send_frames = zip( + *( + (is_cuda, each_frame) + for is_cuda, each_frame in zip(cuda_frames, frames) + if len(each_frame) > 0 + ) + ) + + # Send meta data + + # Send close flag and number of frames (_Bool, int64) + #await self.ep.send(struct.pack("?Q", False, nframes)) + #initialize tag with the assigned tag range + tag = self.suggested_tag + packed_msg = struct.pack("?Q", False, nframes) + s_size = struct.calcsize("?Q") + logger.debug("write_1: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + + await self.mpi_send(packed_msg, s_size, tag) + tag = tag + 1 + + # Send which frames are CUDA (bool) and + # how large each frame is (uint64) + #await self.ep.send( + # struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes) + #) + packed_msg = struct.pack(nframes * "?" + nframes * "Q", \ + *cuda_frames, *sizes) + s_size = struct.calcsize(nframes * "?" + nframes * "Q") + + await self.mpi_send(packed_msg, s_size, tag) + tag = tag + 1 + logger.debug("write_2: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + + # Send frames + + # It is necessary to first synchronize the default stream + # before start sending. + + # non-blocking CUDA streams. + if any(cuda_send_frames): + synchronize_stream(0) + + sizes_new = tuple(i for i in sizes if i != 0) # remove all 0's from this list + read_counter = 0 + + for each_frame in send_frames: + #await self.ep.send(each_frame) + s_size = sizes_new[read_counter] + await self.mpi_send(each_frame, s_size, tag) + tag = tag + 1 + read_counter = read_counter + 1 + logger.debug("write_3: me=%s, you=%s, tag=%s, sizes=%s, sizes_new=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag, sizes, sizes_new) + return sum(sizes_new) + except (Exception): + self.abort() + raise CommClosedError("While writing, the connection was closed") + + # a utility function for sending messages larger than CHUNK_SIZE data + async def mpi_send_large(self, buf, size, _tag): + + me = MPI.COMM_WORLD.Get_rank() + you = self.peer_rank + + logger.debug("mpi_send_large: host=%s, me=%s, you=%s, tag=%s, size=%s, type(buf)=%s", \ + socket.gethostname(), me, you, _tag, size, type(buf)) + + blk_size = CHUNK_SIZE + num_of_blks = int(size / blk_size) + last_blk_size = size % blk_size + + logger.debug("mpi_send_large: blk_size=%s, num_of_blks=%s, last_blk_size=%s", \ + blk_size, num_of_blks, last_blk_size) + + num_of_reqs = num_of_blks + + if last_blk_size is not 0: + num_of_reqs = num_of_reqs + 1 + + reqs = [] + + for i in range(num_of_blks): + s_idx = (i) * blk_size + e_idx = (i+1) * blk_size + + if type(buf) == rmm._lib.device_buffer.DeviceBuffer: + # need this if because rmm.DeviceBuffer is not subscriptable + shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \ + size=blk_size) + r = MPI.COMM_WORLD.Isend([shadow_buf, MPI.BYTE], dest=you, tag=_tag) + else: + r = MPI.COMM_WORLD.Isend([buf[s_idx:e_idx], blk_size], dest=you, tag=_tag) + _tag = _tag + 1 + reqs.append(r) + + if last_blk_size is not 0: + s_idx = num_of_blks*blk_size + e_idx = s_idx+last_blk_size + + if type(buf) == rmm._lib.device_buffer.DeviceBuffer: + # need this if because rmm.DeviceBuffer is not subscriptable + shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \ + size=last_blk_size) + r = MPI.COMM_WORLD.Isend([shadow_buf, MPI.BYTE], dest=you, tag=_tag) + else: + r = MPI.COMM_WORLD.Isend([buf[s_idx:e_idx], last_blk_size], \ + dest=you, tag=_tag) + + _tag = _tag + 1 + reqs.append(r) + + assert len(reqs) == num_of_reqs + + flag = MPI.Request.Testall(reqs) + + while flag is False: + await asyncio.sleep(0) + flag = MPI.Request.Testall(reqs) + + # a utility function for sending messages through MPI + async def mpi_send(self, buf, size, _tag): + + me = MPI.COMM_WORLD.Get_rank() + you = self.peer_rank + rank = MPI.COMM_WORLD.Get_rank() + + if me == 0: + logger.debug("mpi_send: host=%s, suggested_tag=%s, peer_rank=%s, me=%s, you=%s, rank=%s, tag=%s, size=%s, type(buf)=%s", \ + socket.gethostname(), self.suggested_tag, self.peer_rank, me, you, rank, _tag, size, type(buf)) + + if size > CHUNK_SIZE: + # if message size is larger than CHUNK_SIZE, split it into chunks and communicate + logger.debug("mpi_send: host=%s, comm=%s, me=%s, you=%s, rank=%s, tag=%s, size=%s, type(buf)=%s", \ + socket.gethostname(), MPI.COMM_WORLD, me, you, rank, _tag, size, type(buf)) + #if type(buf) == cupy.core.core.ndarray: + #h_buf = numpy.empty((size,), dtype="u1") + #h_buf = numpy.frombuffer(buf.tobytes(), dtype="u1") + await self.mpi_send_large(buf, size, _tag) + return + + req = MPI.COMM_WORLD.Isend([buf, size], dest=you, tag=_tag) + + status = req.Test() + + while status is False: + await asyncio.sleep(0) + status = req.Test() + + # a utility function for receiving messages larger than CHUNK_SIZE data + async def mpi_recv_large(self, buf, size, _tag): + + me = MPI.COMM_WORLD.Get_rank() + you = self.peer_rank + + logger.debug("mpi_recv_large: host=%s, me=%s, you=%s, tag=%s, size=%s, type(buf)=%s", \ + socket.gethostname(), me, you, _tag, size, type(buf)) + + blk_size = CHUNK_SIZE + num_of_blks = int(size / blk_size) + last_blk_size = size % blk_size + + logger.debug("mpi_recv_large: blk_size=%s, num_of_blks=%s, last_blk_size=%s", \ + blk_size, num_of_blks, last_blk_size) + + num_of_reqs = num_of_blks + + if last_blk_size is not 0: + num_of_reqs = num_of_reqs + 1 + + reqs = [] + + for i in range(num_of_blks): + s_idx = (i) * blk_size + e_idx = (i+1) * blk_size + + if type(buf) == rmm._lib.device_buffer.DeviceBuffer: + # need this if because rmm.DeviceBuffer is not subscriptable + shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \ + size=blk_size) + r = MPI.COMM_WORLD.Irecv([shadow_buf, MPI.BYTE], source=you, tag=_tag) + else: + r = MPI.COMM_WORLD.Irecv([buf[s_idx:e_idx], blk_size], source=you, \ + tag=_tag) + _tag = _tag + 1 + reqs.append(r) + + if last_blk_size is not 0: + s_idx = num_of_blks*blk_size + e_idx = s_idx+last_blk_size + + if type(buf) == rmm._lib.device_buffer.DeviceBuffer: + # need this if because rmm.DeviceBuffer is not subscriptable + shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \ + size=last_blk_size) + r = MPI.COMM_WORLD.Irecv([shadow_buf, MPI.BYTE], source=you, tag=_tag) + else: + r = MPI.COMM_WORLD.Irecv([buf[s_idx:e_idx], last_blk_size], \ + source=you, tag=_tag) + _tag = _tag + 1 + reqs.append(r) + + assert len(reqs) == num_of_reqs + + flag = MPI.Request.Testall(reqs) + + while flag is False: + await asyncio.sleep(0) + flag = MPI.Request.Testall(reqs) + + # a utility function for receiving messages through MPI + async def mpi_recv(self, buf, size, _tag): + + import numpy as np + rank = MPI.COMM_WORLD.Get_rank() + me = rank + you = self.peer_rank + + if size > CHUNK_SIZE: + + logger.debug("mpi_recv: me=%s, you=%s, tag=%s, size=%s, type(buf)=%s", \ + me, you, _tag, size, type(buf)) + await self.mpi_recv_large(buf, size, _tag) + return + + req = MPI.COMM_WORLD.Irecv([buf, size], source=you, tag=_tag) + + status = req.Test() + + while status is False: + await asyncio.sleep(0) + status = req.Test() + + if you == 0 and isinstance(buf, np.ndarray): + logger.debug("mpi_recv: me=%s, you=%s, tag=%s, size=%s, type(buf)=%s, buf[:10]=%s", \ + me, you, _tag, size, type(buf), bytearray(buf[:10])) + + async def read(self, deserializers=("cuda", "dask", "pickle", "error")): + logger.debug("read() function is called") + with log_errors(): + if self.closed(): + #logger.debug("inside self.closed()") + raise CommClosedError("Endpoint is closed -- unable to read message") + + if deserializers is None: + deserializers = ("cuda", "dask", "pickle", "error") + + try: + # Recv meta data + + # Recv close flag and number of frames (_Bool, int64) + buf_size = struct.calcsize("?Q") + msg = host_array(struct.calcsize("?Q")) + tag = self.suggested_tag + await self.mpi_recv(msg, buf_size, tag) + tag = tag + 1 + (shutdown, nframes) = struct.unpack("?Q", msg) + logger.debug("read_1: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + + if shutdown: # The writer is closing the connection + raise CancelledError("Connection closed by writer") + + # Recv which frames are CUDA (bool) and + # how large each frame is (uint64) + header_fmt = nframes * "?" + nframes * "Q" + buf_size = struct.calcsize(header_fmt) + header = host_array(struct.calcsize(header_fmt)) + #await self.ep.recv(header) + await self.mpi_recv(header, buf_size, tag) + tag = tag + 1 + logger.debug("read_2: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + header = struct.unpack(header_fmt, header) + cuda_frames, sizes = header[:nframes], header[nframes:] + except (Exception, CancelledError): + self.abort() + raise CommClosedError("While reading, the connection was closed") + else: + # Recv frames + frames = [ + device_array(each_size) if is_cuda else host_array(each_size) + for is_cuda, each_size in zip(cuda_frames, sizes) + ] + cuda_recv_frames, recv_frames = zip( + *( + (is_cuda, each_frame) + for is_cuda, each_frame in zip(cuda_frames, frames) + if len(each_frame) > 0 + ) + ) + + # It is necessary to first populate `frames` with CUDA arrays + # and synchronize the default stream before starting receiving + # to ensure buffers have been allocated + if any(cuda_recv_frames): + synchronize_stream(0) + + + sizes_new = tuple(i for i in sizes if i != 0) # remove all 0's from this list + assert len(sizes_new) < TAG_QUOTA_PER_CONNECTION - 2 + + logger.debug("read_3: me=%s, you=%s, tag=%s, sizes=%s, sizes_new=%s, rf_len=%s, crf_len=%s", \ + MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag, sizes, sizes_new, len(recv_frames), len(cuda_recv_frames)) + + read_counter = 0 + for each_frame in recv_frames: + #await self.ep.recv(each_frame) + await self.mpi_recv(each_frame, sizes_new[read_counter], tag) + tag = tag + 1 + read_counter = read_counter + 1 + + logger.debug("read_4: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + + msg = await from_frames( + frames, + deserialize=self.deserialize, + deserializers=deserializers, + allow_offload=self.allow_offload, + ) + logger.debug("read_5: me=%s, you=%s, tag=%s", MPI.COMM_WORLD.Get_rank(), self.peer_rank, tag) + return msg + + async def close(self): + if self._closed is False: + self._closed = True + + def abort(self): + if self._closed is False: + self._closed = True + + def closed(self): + return self._closed + + +class MPI4DaskConnector(Connector): + prefix = "mpi://" + comm_class = MPI4Dask + encrypted = False + + async def connect(self, address: str, \ + deserialize=True, \ + **connection_args) -> MPI4Dask: + + rand_num = 0.001 * random.randint(0, 1000) + await asyncio.sleep(rand_num) + logger.debug("%s: connecting to address=%s with delay=%s", \ + socket.gethostname(), address, rand_num) + init_once() + + ip, port = parse_host_port(address) + reader, writer = await asyncio.open_connection(ip, port) + + peer_addr = writer.get_extra_info('peername') + local_addr = writer.get_extra_info('sockname') + local_rank = MPI.COMM_WORLD.Get_rank() + + logger.debug("%s: connect: local_addr=%s, peer_addr=%s", \ + socket.gethostname(), local_addr[0], peer_addr[0]) + + assert reader.at_eof() == False + data = await reader.read(4) + logger.debug("data: %s", data) + peer_rank = int(data.decode()) + + data = str(local_rank).encode() + writer.write(data) + await writer.drain() + + suggested_tag = tag_table.get(peer_rank) + #new_suggested_tag = suggested_tag + TAG_QUOTA_PER_CONNECTION + tag_table.update({peer_rank : + (suggested_tag+TAG_QUOTA_PER_CONNECTION)}) + logger.debug("%s: connect: data=%s, rank=%s, peer_rank=%s, suggested_tag=%s", \ + socket.gethostname(), data, local_rank, peer_rank, suggested_tag) + + return self.comm_class( + peer_rank, + suggested_tag, + local_addr = self.prefix + local_addr[0] + ":" + \ + str(local_addr[1]), + peer_addr = self.prefix + peer_addr[0] + ":" + \ + str(peer_addr[1]), + p_addr = peer_addr[0], + deserialize=deserialize, + ) + +class MPI4DaskListener(Listener): + prefix = MPI4DaskConnector.prefix + comm_class = MPI4DaskConnector.comm_class + encrypted = MPI4DaskConnector.encrypted + + def __init__( + self, + address: str, + comm_handler: None, + deserialize=False, + allow_offload=True, + **connection_args + ): + global LOWER_PORT_RANGE, UPPER_PORT_RANGE + + if not address.startswith("mpi"): + address = "mpi://" + address + + logger.debug("%s: MPI4DaskListener.__init__ %s", \ + socket.gethostname(), address) + self.ip, self._input_port = parse_host_port(address, default_port=0) + # choose a random port between LOWER_PORT_RANGE and UPPER_PORT_RANGE + self._input_port = random.randint(LOWER_PORT_RANGE, UPPER_PORT_RANGE) + self.comm_handler = comm_handler + self.deserialize = deserialize + self.allow_offload = allow_offload + self.mpi_server = None + self.connection_args = connection_args + + @property + def port(self): + return self._input_port + + @property + def address(self): + return "mpi://" + self.ip + ":" + str(self.port) + + async def start(self): + + async def serve_forever(reader, writer): + + peer_addr = writer.get_extra_info('peername') + local_addr = writer.get_extra_info('sockname') + logger.debug("%s: listen(): local=%s, peer=%s", \ + socket.gethostname(), local_addr[0], peer_addr[0]) + + local_rank = MPI.COMM_WORLD.Get_rank() + + data = str(local_rank).encode() + writer.write(data) + await writer.drain() + logger.debug("listen(): wrote data") + + assert reader.at_eof() == False + data = await reader.read(4) + peer_rank = int(data.decode()) + logger.debug("listen(): read data") + + suggested_tag = tag_table.get(peer_rank) + #new_suggested_tag = suggested_tag + TAG_QUOTA_PER_CONNECTION + tag_table.update({peer_rank : (suggested_tag + TAG_QUOTA_PER_CONNECTION)}) + logger.debug("%s: listen(): data=%s, rank=%s, peer_rank=%s, suggested_tag=%s", \ + socket.gethostname(), data, local_rank, peer_rank, suggested_tag) + + mpi = MPI4Dask( + peer_rank, + suggested_tag, + local_addr = self.prefix + local_addr[0] + ":" + \ + str(local_addr[1]), + peer_addr = self.prefix + peer_addr[0] + ":" + \ + str(peer_addr[1]), + p_addr = peer_addr[0], + deserialize=self.deserialize, + ) + + mpi.allow_offload = self.allow_offload + + try: + await self.on_connection(mpi) + except CommClosedError: + logger.debug("Connection closed before handshake completed") + return + + if self.comm_handler: + logger.debug("%s: calling comm_handler(mpi)", socket.gethostname()) + await self.comm_handler(mpi) + + init_once() + logger.debug("asyncio.start_server()") + coro = asyncio.start_server(serve_forever, \ + None, \ + port=self._input_port, \ + backlog=1024) + task = asyncio.create_task(coro) + self.mpi_server = await task + addr = self.mpi_server.sockets[0].getsockname() + logger.debug(f'Serving on {addr}') + + def stop(self): + self.mpi_server = None + + def get_host_port(self): + # TODO: TCP raises if this hasn't started yet. + return self.ip, self.port + + @property + def listen_address(self): + return self.prefix + unparse_host_port(*self.get_host_port()) + + @property + def contact_address(self): + host, port = self.get_host_port() + host = ensure_concrete_host(host) # TODO: ensure_concrete_host + return self.prefix + unparse_host_port(host, port) + + @property + def bound_address(self): + # TODO: Does this become part of the base API? Kinda hazy, since + # we exclude in for inproc. + return self.get_host_port() + +class MPI4DaskBackend(Backend): + # I / O + + def get_connector(self): + return MPI4DaskConnector() + + def get_listener(self, loc, handle_comm, deserialize, **connection_args): + return MPI4DaskListener(loc, handle_comm, deserialize, **connection_args) + + # Address handling + # This duplicates BaseTCPBackend + + def get_address_host(self, loc): + return parse_host_port(loc)[0] + + def get_address_host_port(self, loc): + return parse_host_port(loc) + + def resolve_address(self, loc): + host, port = parse_host_port(loc) + return unparse_host_port(ensure_ip(host), port) + + def get_local_address_for(self, loc): + host, port = parse_host_port(loc) + host = ensure_ip(host) + if ":" in host: + local_host = get_ipv6(host) + else: + local_host = get_ip(host) + return unparse_host_port(local_host, None) + +backends["mpi"] = MPI4DaskBackend() From 78d1e195fbe447c0df09501e544e39033dc5cd95 Mon Sep 17 00:00:00 2001 From: ZackWRyan Date: Wed, 30 Jun 2021 20:17:45 -0400 Subject: [PATCH 2/3] Initial refactoring and removal of examples --- dask-apps/cudf_merge_mpi.py | 594 ----------------------------------- dask-apps/cupy_sum_mpi.py | 91 ------ distributed/comm/__init__.py | 6 +- distributed/comm/mpi.py | 57 +--- distributed/comm/ucx.py | 50 +-- distributed/comm/utils.py | 66 ++++ 6 files changed, 77 insertions(+), 787 deletions(-) delete mode 100644 dask-apps/cudf_merge_mpi.py delete mode 100644 dask-apps/cupy_sum_mpi.py diff --git a/dask-apps/cudf_merge_mpi.py b/dask-apps/cudf_merge_mpi.py deleted file mode 100644 index d1a1db25be..0000000000 --- a/dask-apps/cudf_merge_mpi.py +++ /dev/null @@ -1,594 +0,0 @@ -""" - -Taken from: https://github.com/rapidsai/dask-cuda/blob/branch-0.18/dask_cuda/benchmarks/local_cudf_merge.py - -This is the cuDF merge benchmarking application. It has been -modified to work for the MVAPICH2-based (http://mvapich.cse.ohio-state.edu) -communication backend for the Dask Distributed library using the -dask-mpi package. - -""" - -import math -from collections import defaultdict -from time import perf_counter as clock - -import numpy - -from dask.base import tokenize -from dask.dataframe.core import new_dd_object -from dask.distributed import Client, performance_report, wait -from dask.utils import format_bytes, format_time, parse_bytes -from dask_mpi import initialize - -import argparse -import os - -from dask.distributed import SSHCluster - -#adjust as per compute system -GPUS_PER_NODE = 1 # number of GPUs in the system -DASK_INTERFACE = 'ib0' # interface to use for communication -THREADS_PER_NODE = 28 # number of threads per node. - -rank = os.environ['MV2_COMM_WORLD_LOCAL_RANK'] -device_id = int(rank) % GPUS_PER_NODE -os.environ["CUDA_VISIBLE_DEVICES"]=str(device_id) - -#from dask_cuda.local_cuda_cluster import LocalCUDACluster - -#from dask_cuda import explicit_comms -#from utils import ( -# get_cluster_options, -# get_scheduler_workers, -# parse_benchmark_args, -# setup_memory_pool, -#) - -# Benchmarking cuDF merge operation based on -# - -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): - #print("generate_chunk") - # Setting a seed that triggers max amount of comm in the two-GPU case. - if gpu: - import cupy as xp - - import cudf as xdf - else: - import numpy as xp - import pandas as xdf - - xp.random.seed(2 ** 32 - 1) - - chunk_type = chunk_type or "build" - frac_match = frac_match or 1.0 - if chunk_type == "build": - # Build dataframe - # - # "key" column is a unique sample within [0, local_size * num_chunks) - # - # "shuffle" column is a random selection of partitions (used for shuffle) - # - # "payload" column is a random permutation of the chunk_size - - start = local_size * i_chunk - stop = start + local_size - - parts_array = xp.arange(num_chunks, dtype="int64") - suffle_array = xp.repeat(parts_array, math.ceil(local_size / num_chunks)) - - df = xdf.DataFrame( - { - "key": xp.arange(start, stop=stop, dtype="int64"), - "shuffle": xp.random.permutation(suffle_array)[:local_size], - "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), - } - ) - else: - #print("chunk type is other") - # Other dataframe - # - # "key" column matches values from the build dataframe - # for a fraction (`frac_match`) of the entries. The matching - # entries are perfectly balanced across each partition of the - # "base" dataframe. - # - # "payload" column is a random permutation of the chunk_size - - # Step 1. Choose values that DO match - sub_local_size = local_size // num_chunks - sub_local_size_use = max(int(sub_local_size * frac_match), 1) - arrays = [] - for i in range(num_chunks): - bgn = (local_size * i) + (sub_local_size * i_chunk) - end = bgn + sub_local_size - ar = xp.arange(bgn, stop=end, dtype="int64") - arrays.append(xp.random.permutation(ar)[:sub_local_size_use]) - key_array_match = xp.concatenate(tuple(arrays), axis=0) - - # Step 2. Add values that DON'T match - missing_size = local_size - key_array_match.shape[0] - start = local_size * num_chunks + local_size * i_chunk - stop = start + missing_size - key_array_no_match = xp.arange(start, stop=stop, dtype="int64") - - # Step 3. Combine and create the final dataframe chunk (dask_cudf partition) - key_array_combine = xp.concatenate( - (key_array_match, key_array_no_match), axis=0 - ) - - df = xdf.DataFrame( - { - "key": xp.random.permutation(key_array_combine), - "payload": xp.random.permutation(xp.arange(local_size, dtype="int64")), - } - ) - return df - - -def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): - - parts = [chunk_size for i in range(num_chunks)] - device_type = True if args.type == "gpu" else False - meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) - divisions = [None] * (len(parts) + 1) - - name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) - - graph = { - (name, i): ( - generate_chunk, - i, - part, - len(parts), - chunk_type, - frac_match, - device_type, - ) - for i, part in enumerate(parts) - } - - ddf = new_dd_object(graph, name, meta, divisions) - - if chunk_type == "build": - if not args.no_shuffle: - divisions = [i for i in range(num_chunks)] + [num_chunks] - return ddf.set_index("shuffle", divisions=tuple(divisions)) - else: - del ddf["shuffle"] - - return ddf - - -def merge(args, ddf1, ddf2, write_profile): - #print("merge called") - # Lazy merge/join operation - ddf_join = ddf1.merge(ddf2, on=["key"], how="inner") - if args.set_index: - ddf_join = ddf_join.set_index("key") - - # Execute the operations to benchmark - if write_profile is not None: - with performance_report(filename=args.profile): - t1 = clock() - wait(ddf_join.persist()) - took = clock() - t1 - else: - t1 = clock() - wait(ddf_join.persist()) - took = clock() - t1 - return took - - -#def merge_explicit_comms(args, ddf1, ddf2): -# t1 = clock() -# wait(explicit_comms.dataframe_merge(ddf1, ddf2, on="key").persist()) -# took = clock() - t1 -# return took - - -def run(client, args, n_workers, write_profile=None): - # Generate random Dask dataframes - ddf_base = get_random_ddf( - args.chunk_size, n_workers, args.frac_match, "build", args - ).persist() - - ddf_other = get_random_ddf( - args.chunk_size, n_workers, args.frac_match, "other", args - ).persist() - - wait(ddf_base) - wait(ddf_other) - client.wait_for_workers(n_workers) - - assert len(ddf_base.dtypes) == 2 - assert len(ddf_other.dtypes) == 2 - data_processed = len(ddf_base) * sum([t.itemsize for t in ddf_base.dtypes]) - data_processed += len(ddf_other) * sum([t.itemsize for t in ddf_other.dtypes]) - - if args.backend == "dask": - took = merge(args, ddf_base, ddf_other, write_profile) - else: - took = None#merge_explicit_comms(args, ddf_base, ddf_other) - - return (data_processed, took) - - -def main(args): - cluster_options = get_cluster_options(args) - Cluster = cluster_options["class"] - cluster_args = cluster_options["args"] - cluster_kwargs = cluster_options["kwargs"] - scheduler_addr = cluster_options["scheduler_addr"] - - if args.sched_addr: - initialize( - interface=DASK_INTERFACE, - protocol=args.protocol, - nanny=False, - nthreads=THREADS_PER_NODE - ) - import time - time.sleep(5) - - client = Client() - print(client) - else: - cluster = Cluster(*cluster_args, **cluster_kwargs) - if args.multi_node: - import time - - # Allow some time for workers to start and connect to scheduler - # TODO: make this a command-line argument? - time.sleep(15) - - client = Client(scheduler_addr if args.multi_node else cluster) - - if args.type == "gpu": - client.run(setup_memory_pool, disable_pool=args.no_rmm_pool) - # Create an RMM pool on the scheduler due to occasional deserialization - # of CUDA objects. May cause issues with InfiniBand otherwise. - client.run_on_scheduler(setup_memory_pool, 1e9, disable_pool=args.no_rmm_pool) - - scheduler_workers = client.run_on_scheduler(get_scheduler_workers) - n_workers = len(scheduler_workers) - - took_list = [] - for _ in range(args.runs - 1): - took_list.append(run(client, args, n_workers, write_profile=None)) - took_list.append( - run(client, args, n_workers, write_profile=args.profile) - ) # Only profiling the last run - - # Collect, aggregate, and print peer-to-peer bandwidths - incoming_logs = client.run(lambda dask_worker: dask_worker.incoming_transfer_log) - bandwidths = defaultdict(list) - total_nbytes = defaultdict(list) - for k, L in incoming_logs.items(): - for d in L: - if d["total"] >= args.ignore_size: - bandwidths[k, d["who"]].append(d["bandwidth"]) - total_nbytes[k, d["who"]].append(d["total"]) - bandwidths = { - (scheduler_workers[w1].name, scheduler_workers[w2].name): [ - "%s/s" % format_bytes(x) for x in numpy.quantile(v, [0.25, 0.50, 0.75]) - ] - for (w1, w2), v in bandwidths.items() - } - total_nbytes = { - (scheduler_workers[w1].name, scheduler_workers[w2].name,): format_bytes(sum(nb)) - for (w1, w2), nb in total_nbytes.items() - } - - t_runs = numpy.empty(len(took_list)) - if args.markdown: - print("```") - print("Merge benchmark") - print("-------------------------------") - print(f"backend | {args.backend}") - print(f"merge type | {args.type}") - print(f"rows-per-chunk | {args.chunk_size}") - print(f"protocol | {args.protocol}") - print(f"device(s) | {args.devs}") - print(f"rmm-pool | {(not args.no_rmm_pool)}") - print(f"frac-match | {args.frac_match}") - if args.protocol == "ucx": - print(f"tcp | {args.enable_tcp_over_ucx}") - print(f"ib | {args.enable_infiniband}") - print(f"nvlink | {args.enable_nvlink}") - print(f"data-processed | {format_bytes(took_list[0][0])}") - print("===============================") - print("Wall-clock | Throughput") - print("-------------------------------") - for idx, (data_processed, took) in enumerate(took_list): - throughput = int(data_processed / took) - m = format_time(took) - m += " " * (15 - len(m)) - print(f"{m}| {format_bytes(throughput)}/s") - t_runs[idx] = float(format_bytes(throughput).split(" ")[0]) - print("===============================") - if args.markdown: - print("\n```") - - #if args.plot is not None: - # plot_benchmark(t_runs, args.plot, historical=True) - - if args.backend == "dask": - if args.markdown: - print("
\nWorker-Worker Transfer Rates\n\n```") - print("(w1,w2) | 25% 50% 75% (total nbytes)") - print("-------------------------------") - for (d1, d2), bw in sorted(bandwidths.items()): - fmt = ( - "(%s,%s) | %s %s %s (%s)" - if args.multi_node or args.sched_addr - else "(%02d,%02d) | %s %s %s (%s)" - ) - print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) - if args.markdown: - print("```\n
\n") - - if args.multi_node: - client.shutdown() - client.close() - - -def parse_args(): - special_args = [ - { - "name": ["-b", "--backend",], - "choices": ["dask", "explicit-comms"], - "default": "dask", - "type": str, - "help": "The backend to use.", - }, - { - "name": ["-t", "--type",], - "choices": ["cpu", "gpu"], - "default": "gpu", - "type": str, - "help": "Do merge with GPU or CPU dataframes", - }, - { - "name": ["-c", "--chunk-size",], - "default": 1_000_000, - "metavar": "n", - "type": int, - "help": "Chunk size (default 1_000_000)", - }, - { - "name": "--ignore-size", - "default": "1 MiB", - "metavar": "nbytes", - "type": parse_bytes, - "help": "Ignore messages smaller than this (default '1 MB')", - }, - { - "name": "--frac-match", - "default": 0.3, - "type": float, - "help": "Fraction of rows that matches (default 0.3)", - }, - { - "name": "--no-shuffle", - "action": "store_true", - "help": "Don't shuffle the keys of the left (base) dataframe.", - }, - { - "name": "--markdown", - "action": "store_true", - "help": "Write output as markdown", - }, - {"name": "--runs", "default": 3, "type": int, "help": "Number of runs",}, - { - "name": ["-s", "--set-index",], - "action": "store_true", - "help": "Call set_index on the key column to sort the joined dataframe.", - }, - ] - - return parse_benchmark_args( - description="Distributed merge (dask/cudf) benchmark", args_list=special_args - ) - -def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]): - parser = argparse.ArgumentParser(description=description) - parser.add_argument( - "-d", "--devs", default="0", type=str, help='GPU devices to use (default "0").' - ) - parser.add_argument( - "-p", - "--protocol", - choices=["tcp", "ucx", "mpi"], - default="tcp", - type=str, - help="The communication protocol to use.", - ) - parser.add_argument( - "--profile", - metavar="PATH", - default=None, - type=str, - help="Write dask profile report (E.g. dask-report.html)", - ) - parser.add_argument( - "--no-rmm-pool", action="store_true", help="Disable the RMM memory pool" - ) - parser.add_argument( - "--enable-tcp-over-ucx", - action="store_true", - dest="enable_tcp_over_ucx", - help="Enable tcp over ucx.", - ) - parser.add_argument( - "--enable-infiniband", - action="store_true", - dest="enable_infiniband", - help="Enable infiniband over ucx.", - ) - parser.add_argument( - "--enable-nvlink", - action="store_true", - dest="enable_nvlink", - help="Enable NVLink over ucx.", - ) - parser.add_argument( - "--disable-tcp-over-ucx", - action="store_false", - dest="enable_tcp_over_ucx", - help="Disable tcp over ucx.", - ) - parser.add_argument( - "--disable-infiniband", - action="store_false", - dest="enable_infiniband", - help="Disable infiniband over ucx.", - ) - parser.add_argument( - "--disable-nvlink", - action="store_false", - dest="enable_nvlink", - help="Disable NVLink over ucx.", - ) - parser.add_argument( - "--ucx-net-devices", - default=None, - type=str, - help="The device to be used for UCX communication, or 'auto'. " - "Ignored if protocol is 'tcp'", - ) - parser.add_argument( - "--single-node", - action="store_true", - dest="multi_node", - help="Runs a single-node cluster on the current host.", - ) - parser.add_argument( - "--multi-node", - action="store_true", - dest="multi_node", - help="Runs a multi-node cluster on the hosts specified by --hosts.", - ) - parser.add_argument( - "--scheduler-address", - default="Not Needed", # MPI4Dask related modification - type=str, - dest="sched_addr", - help="Scheduler Address -- assumes cluster is created outside of benchmark.", - ) - parser.add_argument( - "--hosts", - default=None, - type=str, - help="Specifies a comma-separated list of IP addresses or hostnames. " - "The list begins with the host where the scheduler will be launched " - "followed by any number of workers, with a minimum of 1 worker. " - "Requires --multi-node, ignored otherwise. " - "Usage example: --multi-node --hosts 'dgx12,dgx12,10.10.10.10,dgx13' . " - "In the example, the benchmark is launched with scheduler on host " - "'dgx12' (first in the list), and workers on three hosts being 'dgx12', " - "'10.10.10.10', and 'dgx13'. " - "Note: --devs is currently ignored in multi-node mode and for each host " - "one worker per GPU will be launched.", - ) - - for args in args_list: - name = args.pop("name") - if not isinstance(name, list): - name = [name] - parser.add_argument(*name, **args) - - parser.set_defaults( - enable_tcp_over_ucx=True, enable_infiniband=True, enable_nvlink=True - ) - args = parser.parse_args() - - if args.protocol == "tcp": - args.enable_tcp_over_ucx = False - args.enable_infiniband = False - args.enable_nvlink = False - - if args.multi_node and len(args.hosts.split(",")) < 2: - raise ValueError("--multi-node requires at least 2 hosts") - - return args - - -def get_cluster_options(args): - #print("get_cluster_options") - - if args.multi_node is True: - - #print("get_cluster_options: args.multi_node") - - Cluster = SSHCluster - cluster_args = [args.hosts.split(",")] - scheduler_addr = args.protocol + "://" + cluster_args[0][0] + ":8786" - - worker_options = {} - - # This looks counterintuitive but adding the variable name with - # an empty string is how we can enable CLI booleans currently, - # note that SSHCluster uses the dask-cuda-worker CLI. - if args.enable_tcp_over_ucx: - worker_options["enable_tcp_over_ucx"] = "" - if args.enable_nvlink: - worker_options["enable_nvlink"] = "" - if args.enable_infiniband: - worker_options["enable_infiniband"] = "" - - if args.ucx_net_devices: - worker_options["ucx_net_devices"] = args.ucx_net_devices - - cluster_kwargs = { - "connect_options": {"known_hosts": None}, - "scheduler_options": {"protocol": args.protocol}, - "worker_module": "dask_cuda.dask_cuda_worker", - "worker_options": worker_options, - # "n_workers": len(args.devs.split(",")), - # "CUDA_VISIBLE_DEVICES": args.devs, - } - else: - #print("get_cluster_options: else") - #Cluster = LocalCUDACluster - Cluster = None - scheduler_addr = None - cluster_args = [] - cluster_kwargs = { - "protocol": args.protocol, - "n_workers": len(args.devs.split(",")), - "CUDA_VISIBLE_DEVICES": args.devs, - "ucx_net_devices": args.ucx_net_devices, - "enable_tcp_over_ucx": args.enable_tcp_over_ucx, - "enable_infiniband": args.enable_infiniband, - "enable_nvlink": args.enable_nvlink, - } - - #print("returning cluster object") - return { - "class": Cluster, - "args": cluster_args, - "kwargs": cluster_kwargs, - "scheduler_addr": scheduler_addr, - } - - -def get_scheduler_workers(dask_scheduler=None): - return dask_scheduler.workers - - -def setup_memory_pool(pool_size=None, disable_pool=False): - import cupy - - os.environ['RMM_NO_INITIALIZE'] = 'True' - import rmm - - rmm.reinitialize( - pool_allocator=not disable_pool, devices=0, initial_pool_size=pool_size, - ) - cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) - -if __name__ == "__main__": - main(parse_args()) - diff --git a/dask-apps/cupy_sum_mpi.py b/dask-apps/cupy_sum_mpi.py deleted file mode 100644 index d2d9c88caf..0000000000 --- a/dask-apps/cupy_sum_mpi.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -This is the sum of cuPy array and its transpose application. It has been -modified to work for the MVAPICH2-based (http://mvapich.cse.ohio-state.edu) -communication backend for the Dask Distributed library using the -dask-mpi package. - -""" - -import os -import asyncio -from collections import defaultdict -from time import perf_counter as clock - -import dask.array as da -from dask.distributed import Client, performance_report, wait -from dask.utils import format_bytes, format_time, parse_bytes -from dask_mpi import initialize - -import cupy -import numpy -import time -import sys - -#adjust as per compute system -GPUS_PER_NODE = 1 # number of GPUs in the system -RUNS = 20 # repititions for the benchmark -DASK_INTERFACE = 'ib0' # interface to use for communication -DASK_PROTOCOL = 'mpi' # protocol for Dask Distributed. Options include ['mpi', 'tcp'] -THREADS_PER_NODE = 28 # number of threads per node. - -rank = os.environ['MV2_COMM_WORLD_LOCAL_RANK'] -device_id = int(rank) % GPUS_PER_NODE -os.environ["CUDA_VISIBLE_DEVICES"]=str(device_id) - -async def run(): - async with Client(asynchronous=True) as client: - - print(client) - - scheduler_workers = await client.run_on_scheduler(get_scheduler_workers) - - took_list = [] - - for i in range(RUNS): - start = time.time() - rs = da.random.RandomState(RandomState=cupy.random.RandomState) - a = rs.normal(100, 1, (int(30000), int(30000)), \ - chunks=(int(1000), int(1000))) - x = a + a.T - - xx = await client.compute(x) - - duration = time.time() - start - took_list.append( (duration, a.npartitions) ) - print("Time for iteration", i, ":", duration) - sys.stdout.flush() - - sums = [] - - for (took, npartitions) in took_list: - sums.append(took) - t = format_time(took) - t += " " * (11 - len(t)) - - print ("Average Wall-clock Time: ", format_time(sum(sums)/len(sums))) - - # Collect, aggregate, and print peer-to-peer bandwidths - incoming_logs = await client.run( - lambda dask_worker: dask_worker.incoming_transfer_log - ) - - outgoing_logs = await client.run( - lambda dask_worker: dask_worker.outgoing_transfer_log - ) - - -def get_scheduler_workers(dask_scheduler=None): - return dask_scheduler.workers - -if __name__ == "__main__": - - initialize( - interface=DASK_INTERFACE, - protocol=DASK_PROTOCOL, - nanny=False, - nthreads=THREADS_PER_NODE - ) - - time.sleep(5) - - asyncio.run(run()) diff --git a/distributed/comm/__init__.py b/distributed/comm/__init__.py index 610b7129c4..752e9a1ca6 100644 --- a/distributed/comm/__init__.py +++ b/distributed/comm/__init__.py @@ -14,10 +14,14 @@ def _register_transports(): - from . import mpi from . import inproc from . import tcp + try: + from . import mpi + except ImportError: + pass + try: from . import ucx except ImportError: diff --git a/distributed/comm/mpi.py b/distributed/comm/mpi.py index 1c31a7a89b..10dd0d7788 100644 --- a/distributed/comm/mpi.py +++ b/distributed/comm/mpi.py @@ -7,23 +7,16 @@ import logging import struct -import weakref - -import collections -import dask import asyncio import socket - import random -import time import sys - import rmm from .addressing import parse_host_port, unparse_host_port from .core import Comm, Connector, Listener, CommClosedError from .registry import Backend, backends -from .utils import ensure_concrete_host, to_frames, from_frames +from .utils import ensure_concrete_host, to_frames, from_frames, get_array_types, init_rmm_pool from ..utils import ( ensure_ip, get_ip, @@ -99,52 +92,8 @@ def init_once(): init_tag_table() logger.debug("rank=%s, name=%s", MPI.COMM_WORLD.Get_rank(), name) - # Find the function, `host_array()`, to use when allocating new host arrays - try: - import numpy - - host_array = lambda n: numpy.empty((n,), dtype="u1") - except ImportError: - host_array = lambda n: bytearray(n) - - # Find the function, `cuda_array()`, to use when allocating new CUDA arrays - try: - import rmm - - if hasattr(rmm, "DeviceBuffer"): - device_array = lambda n: rmm.DeviceBuffer(size=n) - else: # pre-0.11.0 - import numba.cuda - - def rmm_device_array(n): - a = rmm.device_array(n, dtype="u1") - weakref.finalize(a, numba.cuda.current_context) - return a - - device_array = rmm_device_array - except ImportError: - try: - import numba.cuda - - def numba_device_array(n): - a = numba.cuda.device_array((n,), dtype="u1") - weakref.finalize(a, numba.cuda.current_context) - return a - - device_array = numba_device_array - except ImportError: - - def device_array(n): - raise RuntimeError( - "In order to send/recv CUDA arrays, Numba or RMM is required" - ) - - pool_size_str = dask.config.get("rmm.pool-size") - if pool_size_str is not None: - pool_size = parse_bytes(pool_size_str) - rmm.reinitialize( - pool_allocator=True, managed_memory=False, initial_pool_size=pool_size - ) + host_array, device_array = get_array_types() + init_rmm_pool() class MPI4Dask(Comm): """Comm object using MPI. diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 0d91b404ee..54126dffa4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -14,7 +14,7 @@ from .addressing import parse_host_port, unparse_host_port from .core import Comm, Connector, Listener, CommClosedError from .registry import Backend, backends -from .utils import ensure_concrete_host, to_frames, from_frames +from .utils import ensure_concrete_host, to_frames, from_frames, get_array_types, init_rmm_pool from ..utils import ( ensure_ip, get_ip, @@ -62,52 +62,8 @@ def init_once(): ucp.init(options=ucx_config, env_takes_precedence=True) - # Find the function, `host_array()`, to use when allocating new host arrays - try: - import numpy - - host_array = lambda n: numpy.empty((n,), dtype="u1") - except ImportError: - host_array = lambda n: bytearray(n) - - # Find the function, `cuda_array()`, to use when allocating new CUDA arrays - try: - import rmm - - if hasattr(rmm, "DeviceBuffer"): - device_array = lambda n: rmm.DeviceBuffer(size=n) - else: # pre-0.11.0 - import numba.cuda - - def rmm_device_array(n): - a = rmm.device_array(n, dtype="u1") - weakref.finalize(a, numba.cuda.current_context) - return a - - device_array = rmm_device_array - except ImportError: - try: - import numba.cuda - - def numba_device_array(n): - a = numba.cuda.device_array((n,), dtype="u1") - weakref.finalize(a, numba.cuda.current_context) - return a - - device_array = numba_device_array - except ImportError: - - def device_array(n): - raise RuntimeError( - "In order to send/recv CUDA arrays, Numba or RMM is required" - ) - - pool_size_str = dask.config.get("rmm.pool-size") - if pool_size_str is not None: - pool_size = parse_bytes(pool_size_str) - rmm.reinitialize( - pool_allocator=True, managed_memory=False, initial_pool_size=pool_size - ) + host_array, device_array = get_array_types() + init_rmm_pool() try: from ucp.endpoint_reuse import EndpointReuse diff --git a/distributed/comm/utils.py b/distributed/comm/utils.py index b3ac85feed..bd2bf60d50 100644 --- a/distributed/comm/utils.py +++ b/distributed/comm/utils.py @@ -118,3 +118,69 @@ def ensure_concrete_host(host): return get_ipv6() else: return host + +def get_array_types(): + """ + Find the correct library and object type for declaring new host and + device arrays. + """ + # Find the function, `host_array()`, to use when allocating new host arrays + try: + import numpy + + host_array = lambda n: numpy.empty((n,), dtype="u1") + except ImportError: + host_array = lambda n: bytearray(n) + + # Find the function, `cuda_array()`, to use when allocating new CUDA arrays + try: + import rmm + + if hasattr(rmm, "DeviceBuffer"): + device_array = lambda n: rmm.DeviceBuffer(size=n) + else: # pre-0.11.0 + import numba.cuda + import weakref + + def rmm_device_array(n): + a = rmm.device_array(n, dtype="u1") + weakref.finalize(a, numba.cuda.current_context) + return a + + device_array = rmm_device_array + except ImportError: + try: + import numba.cuda + import weakref + + def numba_device_array(n): + a = numba.cuda.device_array((n,), dtype="u1") + weakref.finalize(a, numba.cuda.current_context) + return a + + device_array = numba_device_array + except ImportError: + def device_array(n): + raise RuntimeError( + "In order to send/recv CUDA arrays, Numba or RMM is required" + ) + + return host_array, device_array + +def init_rmm_pool(): + """ + Initialize an RMM pool based on Dask configuration parameters. + """ + try: + import rmm + + pool_size_str = dask.config.get("rmm.pool-size") + if pool_size_str is not None: + pool_size = parse_bytes(pool_size_str) + rmm.reinitialize( + pool_allocator=True, managed_memory=False, initial_pool_size=pool_size + ) + except ImportError: + raise RuntimeError( + "RMM import failed. RMM library is required to create an RMM pool." + ) From 4e7fb30f85e37f494dd62a1fa279c03ddb07b64c Mon Sep 17 00:00:00 2001 From: ZackWRyan Date: Mon, 5 Jul 2021 01:05:26 -0400 Subject: [PATCH 3/3] fixing unnecessary MPI initialization --- distributed/comm/mpi.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/distributed/comm/mpi.py b/distributed/comm/mpi.py index 10dd0d7788..a720793077 100644 --- a/distributed/comm/mpi.py +++ b/distributed/comm/mpi.py @@ -27,12 +27,11 @@ parse_bytes, ) -from mpi4py import MPI - logger = logging.getLogger(__name__) host_array = None device_array = None +MPI = None INITIAL_TAG_OFFSET = 100 TAG_QUOTA_PER_CONNECTION = 200 @@ -79,13 +78,16 @@ def init_tag_table(): logger.debug(tag_table) def init_once(): - global initialized, host_array, device_array + global initialized, host_array, device_array, MPI if initialized is True: return initialized = True + from mpi4py import MPI as _MPI + MPI = _MPI + random.seed(random.randrange(sys.maxsize)) name = MPI.Get_processor_name()