Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPI4Dask - MVAPICH2-GDR based Communication Backend for the Dask Distributed Library #4461

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

aamirshafi
Copy link

This document outlines installation and usage of MPI4Dask, which provides an MVAPICH2-GDR based MPI communication backend for the Dask Distributed library. MPI4Dask embodies enhancements to the Dask Distributed package.

A paper titled "Efficient MPI-based Communication for GPU-Accelerated Dask Applications" describes MPI4Dask and its performance against existing communication devices in detail.

Install MPI4Dask

We start off with the installation of MPI4Dask.

Installation Pre-Requisites

Install Miniconda, which is a free minimal installer for the conda package manager. The following commands download and install latest version of Miniconda3 for Python 3.x.

wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
chmod a+x Miniconda3-latest-Linux-x86_64.sh
./Miniconda3-latest-Linux-x86_64.sh

After installing the conda package manager, create a new conda environment named mpi4dask (suggested name).

conda create --name mpi4dask
conda activate mpi4dask

Make sure that relevant/required modules are loaded in the environment. One of the required module is the CUDA toolkit (10.2 in this case). A list of all installed modules on the system can be seen as follows. Obviously the version of CUDA toolkit might be different on the user system.

module list
Currently Loaded Modulefiles:
  1) cuda/10.2

Install pre-requisite libraries including build tools, Python (version 3.8 here), Cython, CUDA tookit (version 10.2 here), Dask packages, RAPIDS libraries, and array packages. Users may install specific versions of these packages. Also, these packages are installed from three conda channels, including conda-forge, rapidsai, and nvidia, as follows:

conda install -c conda-forge -c rapidsai -c nvidia automake make libtool pkg-config libhwloc psutil python=3.8 setuptools cython cudatoolkit=10.2 cupy dask-cudf dask distributed numpy rmm

Install MVAPICH2-GDR

Download the MVAPICH2-GDR library. Detailed installation instructions are
available from here. Please become familiar with advanced configurations of the MVAPICH2-GDR software --- this is important to get maximum performance from the hardware system. This guide assumes that an environment variable $MPILIB points to the installation directory of the MVAPICH2-GDR software.

export MPILIB=/path/to/MVAPICH2-GDR/install/directory

It is important to make sure that the MVAPICH2-GDR is the MPI library used with MPI4Dask. Multiple installations of MPI libraries can result in errors. In order to avoid such situations, it is important to make sure that the correct version of the MVAPICH2-GDR library is available on the $PATH variable. This can be checked as follows.

$MPILIB/bin/mpiname -a

If the $PATH is set correctly, the output of the following command should match the previous command output.

mpiname -a

Install mpi4py

Download and install the mpi4py library. Details about mpi4py can be seen here. Download the software as follows.

git clone https://github.com/mpi4py/mpi4py.git
cd mpi4py

As part of the installation, the configuration file (mpi.cfg) in the root directory of mpi4py software needs to be updated to point to the MVAPICH2-GDR library. It is recommended to make a new section---in the mpi.cfg file---for MVAPICH2-GDR as follows:

# ------------
[MVAPICH2-GDR]
mpi_dir              = /path/to/MVAPICH2-GDR/install/directory
mpicc                = %(mpi_dir)s/bin/mpicc
mpicxx               = %(mpi_dir)s/bin/mpicxx
include_dirs         = %(mpi_dir)s/include
library_dirs         = %(mpi_dir)s/lib
runtime_library_dirs = %(library_dirs)s

After updating the mpi.cfg configuration file, install mpi4py with following commands:

python setup.py build --mpi=MVAPICH2-GDR
pip install .

Install Dask-MPI

Download and Install Dask-MPI package from the github repository. MPI4Dask relies on Dask-MPI to start execution of Dask scheduler, client, and workers. Details about Dask-MPI package can be seen here. It is recommended not to install Dask-MPI through the conda package manager as it installs some conflicting packages as dependencies.

git clone https://github.com/dask/dask-mpi.git
cd dask-mpi
python setup.py build
pip install .

Download MPI4Dask

MPI4Dask has been shared as a pull request. Enhancements to the Dask Distributed library can be summarized as follows.

  1. The main enhancement to the the distributed package is the distributed/comm/mpi.py file
  2. The dask-apps folder that contains two representative Dask test/benchmarking applications to show the MPI backend

Install MPI4Dask

MPI4Dask can be installed using the following commands:

python setup.py build
pip install .

At this time, MPI4Dask with all its dependencies should be installed. Verify this by running the following commands:

conda list
conda list | grep distributed
conda list | grep dask

Basic Usage Instructions

The MPI4Dask package contains two sample applications that can be used to test and benchmark performance of the MPI backend. These applications include: 1) Sum of cuPy Array and its Transpose, and 2) cuDF Merge.

Sum of cuPy Array and its Transpose

This benchmark creates a cuPy array and distributes its chunks across Dask workers. The benchmark adds these
distributed chunks to their transpose. This benchmark is originally taken from the dask-cuda package repository.

The next step is to start execution of the Dask program using Dask-MPI.

Write the hosts files. This file contains names of the compute nodes, with GPUs, that will execute the parallel Dask program.

cd dask-apps
vim hosts
.. write name of the compute nodes ..
cat hosts
machine1
machine2
machine3
machine4

Note that users need to customize the hosts file as per their environment. This benchmark can be executed as follows:

LD_PRELOAD=$MPILIB/lib/libmpi.so $MPILIB/bin/mpirun_rsh -export-all -np 4 -hostfile hosts MV2_USE_CUDA=1 MV2_USE_GDRCOPY=1 MV2_GPUDIRECT_GDRCOPY_LIB=/opt/gdrcopy2.0/lib64/libgdrapi.so MV2_CPU_BINDING_LEVEL=SOCKET MV2_CPU_BINDING_POLICY=SCATTER python cupy_sum_mpi.py

This command is starting the execution of the Dask application using MPI --- this is enabled by the Dask-MPI package. A total of 4 MPI processes are started with process 0 and 1 assuming the role of Dask scheduler and client respectively. All other processes with ranks greater than 1 becoming Dask workers. Here processes 2 and 3 are Dask workers. The LD_PRELOAD environment variable is required for Python applications to run correctly with the MVAPICH2-GDR library.

Details on various options, passed to the MVAPICH2-GDR library, are as follows:

  • -np 4 --- specifies the total number of parallel processes started by the MPI library
  • -hostfile hosts --- specifies the names of machines where to start parallel processes in the ``hosts'' file
  • MV2_USE_CUDA=1 --- ensures that MVAPICH2-GDR library supports GPU-to-GPU communication
  • MV2_USE_GDRCOPY=1 --- turns on the GDRCopy protocol
  • MV2_GPUDIRECT_GDRCOPY_LIB --- specifies path to the GDRCopy dynamic library
  • MV2_CPU_BINDING_LEVEL=SOCKET MV2_CPU_BINDING_POLICY=SCATTER --- selects a socket-level scatter process binding policy

This is not an exhaustive list of options that can be passed to the MVAPICH2-GDR library. Details on these --- and other flags for the MVAPICH2-GDR library --- can be seen here. As mentioned earlier, it is important to configure the MVAPICH2-GDR installation correctly to ensure optimal performance.

The cupy_sum_mpi.py script has some configuration parameters that can be modified, in the source-code, by the users. These include:

  • DASK_PROTOCOL = 'mpi' --- specifies the protocol used by the Dask Distributed library. Options include mpi', tcp', `ucx'
  • DASK_INTERFACE = 'ib0' --- specifies the interface used by the Dask Distributed library
  • GPUS_PER_NODE = 1 --- specifies the number of GPUs in the system
  • THREADS_PER_NODE = 28 --- specifies the number of threads (cores) in the system

cuDF Merge

cuDF DataFrames are table-like data-structure that are stored in the GPU memory. As part of this application, the merge operation is carried out for multiple cuDF data frames. This benchmark is originally taken from the
dask-cuda package repository.

This application can be executed as follows.

cd dask-apps
LD_PRELOAD=$MPILIB/lib/libmpi.so $MPILIB/bin/mpirun_rsh -export-all -np 4 -hostfile hosts MV2_USE_CUDA=1 MV2_USE_GDRCOPY=1 MV2_GPUDIRECT_GDRCOPY_LIB=/opt/gdrcopy2.0/lib64/libgdrapi.so MV2_CPU_BINDING_LEVEL=SOCKET MV2_CPU_BINDING_POLICY=SCATTER python cudf_merge_mpi.py --type gpu --protocol mpi --runs 20 --chunk-size 1_000_000_00

The cudf_merge_mpi.py script accepts some runtime arguments. Some of the important ones include:

  • --type gpu --- specifies that the data frame is GPU-based. Currently only GPU has been tested
  • --protocol mpi --- specifies the communication backend used by Dask Distributed. Options include mpi', tcp', `ucx'
  • --runs 20 --- specifies the number of repetitions for the experiment
  • --chunk-size 1_000_000_00 --- specifies the chunk size for the data frame

In addition, the cudf_merge_mpi.py script has some configuration parameters that can be modified by the users. These include:

  • DASK_INTERFACE = 'ib0' --- specifies the interface used by the Dask Distributed library
  • GPUS_PER_NODE = 1 --- specifies the number of GPUs in the system
  • THREADS_PER_NODE = 28 --- specifies the number of threads (cores) in the system

(http://mvapich.cse.ohio-state.edu) library.

There are couple of benchmarks added to showcase this device too.

Details on the communication device can be seen in
https://arxiv.org/abs/2101.08878.
@quasiben
Copy link
Member

I'm sorry, I don't see a PR/Link/Repo for MPI4Dask. Can you please add this ?

@aamirshafi
Copy link
Author

@quasiben https://github.com/aamirshafi/distributed/tree/mpi-comm-device. Does that answer your question?

@aamirshafi
Copy link
Author

@hashmij FYI.

@quasiben
Copy link
Member

@aamirshafi , sorry I guess I missed the codes changes when the PR was first submitted

@jakirkham
Copy link
Member

It might be worth comparing notes with the implementation in PR ( #4142 )

Base automatically changed from master to main March 8, 2021 19:04
@aamirshafi
Copy link
Author

Sure @jakirkham. I can have a go at comparing notes with PR (#4142).

@ianthomas23: you are welcome to chime in please.

It will be good to have this PR, called MPI4Dask, or Ian's MPI device merged in distributed so we can plan future implementations based on these.

MPI4Dask has been implemented and testing using the MVAPICH2 family of MPI libraries.

MPI4Dask is based on the UCX communication device and hopefully that makes it easier to review.

Dependency on dask-mpi and Initializing MPI Library

Both PRs depend on dask-mpi for starting the scheduler, client, and workers. The native MPI library has already been initialized by the dask-mpi package.

Connection Establishment

One major difference between the two implementations is managing connection establishment in the MPI communication device.

The API dictated by the Dask Distributed communication device is based on the client-server model. MPI, at the application-level, does not follow this model where all processes are started together. Both implementations have not used Dynamic Process Management (DPM) feature of MPI. For this reason, both devices will support static number of workers and dynamic addition and removal of workers is not supported. This feature can be added in the future.

In this PR (MPI4Dask), we have used out-of-band connection establishment using asyncio.open_connection() and asyncio.start_server(). Every time a worker/client/scheduler makes or accepts a connection to its peer, a core.Comm object is created and returned. This is done in, for example, the connect() method:

async def connect(self, address: str, \
deserialize=True, \
**connection_args) -> MPI4Dask:

The newly created core.Comm object reads and store the MPI rank of its peer. This is important since everytime a message is communicated through this object, MPI's send and receive method will require this peer rank in the MPI's COMM_WORLD communicator.

Ian's code does not use any out-of-band connection establishment. I had a quick glance and as far as I can tell, the first message received with the source wildcard MPI.ANY_SOURCE helps determine which comm object is connected to which remote process (it's rank). This is obtained from the status object.

Message Sending/Receiving:

Both PRs use the COMM_WORLD communicator for all communication. It uses mpi4py's Send() and Recv() methods. Ian's MPI code uses mpi4py's send() and recv() functions. mpi4py's Send()/Recv() methods communicate buffer-like data compared to send()/recv(), which are used to communicate Python objects. Communicating Python objects will include serialization/de-serialization of data (pickle/unpickle) and hence will have overhead. Ian's code has comments about that:

# Various MPI message types to experiment with; not expected to be in
# production code.
class MessageType(Enum):
PythonObjects = 1 # Pickled/unpickled using mpi4py
Frames = 2 # Byte string using to_frames and from_frames

Large Messages with MPI

While developing MPI4Dask, we observed that Dask can exchange really large messages (multiples of GBs). The MPI point-to-point communication uses an int count to express the size of data being sent and received in bytes. Hence the actual data sent/received using MPI cannot be larger than the largest positive integer that fits in an int. For this reason, we had to implementing chunking of data if the message is larger than an adjustable threshold.

Ian's MPI device does not handle that because it has not been tested for such applications.

The functions mpi_send_large and mpi_recv_large, in MPI4Dask, exist because of this.

async def mpi_recv_large(self, buf, size, _tag):
me = MPI.COMM_WORLD.Get_rank()
you = self.peer_rank
logger.debug("mpi_recv_large: host=%s, me=%s, you=%s, tag=%s, size=%s, type(buf)=%s", \
socket.gethostname(), me, you, _tag, size, type(buf))
blk_size = CHUNK_SIZE
num_of_blks = int(size / blk_size)
last_blk_size = size % blk_size
logger.debug("mpi_recv_large: blk_size=%s, num_of_blks=%s, last_blk_size=%s", \
blk_size, num_of_blks, last_blk_size)
num_of_reqs = num_of_blks
if last_blk_size is not 0:
num_of_reqs = num_of_reqs + 1
reqs = []
for i in range(num_of_blks):
s_idx = (i) * blk_size
e_idx = (i+1) * blk_size
if type(buf) == rmm._lib.device_buffer.DeviceBuffer:
# need this if because rmm.DeviceBuffer is not subscriptable
shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \
size=blk_size)
r = MPI.COMM_WORLD.Irecv([shadow_buf, MPI.BYTE], source=you, tag=_tag)
else:
r = MPI.COMM_WORLD.Irecv([buf[s_idx:e_idx], blk_size], source=you, \
tag=_tag)
_tag = _tag + 1
reqs.append(r)
if last_blk_size is not 0:
s_idx = num_of_blks*blk_size
e_idx = s_idx+last_blk_size
if type(buf) == rmm._lib.device_buffer.DeviceBuffer:
# need this if because rmm.DeviceBuffer is not subscriptable
shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \
size=last_blk_size)
r = MPI.COMM_WORLD.Irecv([shadow_buf, MPI.BYTE], source=you, tag=_tag)
else:
r = MPI.COMM_WORLD.Irecv([buf[s_idx:e_idx], last_blk_size], \
source=you, tag=_tag)
_tag = _tag + 1
reqs.append(r)
assert len(reqs) == num_of_reqs
flag = MPI.Request.Testall(reqs)
while flag is False:
await asyncio.sleep(0)
flag = MPI.Request.Testall(reqs)

Dask-based GPU Applications

MPI4Dask was tested with GPU-based applications.

As far as I can tell, Ian's device currently does not have support for GPU-based applications.

Polling Interval

Overall the MPI send and receive co-routine works as follows:

request = comm.Isend([buf, size], dest, tag)
status = request.Test()

while status is False:
    await asyncio.sleep(0)
    status = request.Test()

There is a discussion on polling interval at PR (#4142). We have not found this to be an overhead. Essentially having await asyncio.sleep(0) is yielding the CPU for other tasks that might be executed by the asyncio main loop. Not having this line or simply having sleep(0) will obviously lead to a deadlock. I do realize though there might be applications where this polling interval might become an important configuration parameter. More investigation needs to be done with applications to figure out the impact.

Ian's MPI device has 5 or 10 ms polling interval.

Tags per Connection

MPI uses tags to segregate messages communicated on a connection. Here connection refers to a point-to-point connection between a specific sender and a receiver. In order to differentiate these messages, tags are used.

In the MPI backends, one connection is used to send multiple messages. Also, there can be multiple connections between MPI processes (workers/clients/scheduler). Hence in order to provide safe-space to these concurrent messages between a pair of workers (for example), it is important to assign unique tags.

Both MPI backends assign dynamic tags everytime a new connection is built. MPI4Dask assigns a fixed number tags per connection. This is defined here:

TAG_QUOTA_PER_CONNECTION = 200

The reason for doing this is related to implementing chunking for large messages. This needs a better solution in the future since it is possible that one connection attempts to send more messages than this constant.

Ian's code has a different tag generation mechanism that I do not fully understand.

MPI4Dask Performance

Couple of application-level comparisons for MPI4Dask can be seen here:

These evaluations were done on an internal cluster (at OSU) and TACC Frontera. Graphs for both pages can be seen at links above.

A paper with these results can be seen here.

There are also some performance numbers between UCX-Py and async send/recv co-routines on top of MPI. Figures 5 and 6 in the MPI4Dask paper have those results.

@jakirkham
Copy link
Member

Thanks for writing that up Aamir! 😄 Let's see what Ian adds to that 🙂

Just to touch on a few things, generally I think there is interest in having some sort of MPI based communication layer for Dask. So I think there is agreement on that. Ideally this is something that both yourself, Ian, and colleagues of both of you are happy with. Hoping this discussion can help us bridge these two implementations and determine what that final result looks like.

It's interesting that you have made use of asyncio to handle connection establishment. We currently use Tornado for this on TCP, but have recently been looking into removing that and using asyncio directly ( #4513 ). A question that comes from this is whether we can establish a general framework for different communication protocols to use that is more asyncio-centric. Again we are still early in thinking about this problem. So would be curious to hear any thoughts you or others may have on this. Feel free to chime in on that issue as well

IIUC you are referring to Dask's serialization mechanisms to_frames and from_frames to handle object serialization and collect out-of-band frames for transmission. Is that right? If so, yes using this is preferable. We've been making some improvements to cutdown overhead here where possible ( mainly by handling more stuff as part of MsgPack serialization #4531 ). There are likely more improvements to come. Though would expect communication using to_frames and from_frames is agnostic to these changes. Just some information that may be of interest to you.

Thanks for pointing that out. So how are you handling GPU base communication? Does this use NVLink and IB? Do you need to check whether something is on device? Also worth noting there's a special "cuda" serializer (see usage here and framework here) that will keep objects on device as part of serialization. The frames will then be device buffers. Depending on how MPI is handling the serialization it may be possible to avoid synchronization (for example we often use RMM to back GPU memory, which is stream ordered).

Interesting would be curious to know more about the applications Ian had in mind where polling overhead is an issue. Something else to point out here is we added support for using uvloop for the event loop ( #4448 ). So if one has uvloop installed and enable it in the Distributed config, uvloop will be used for the event loop. Not sure if this is relevant for the overhead Ian observed, but thought it worth mentioning.

Anyways not sure how relevant that context is for you or Ian, but I hope that sheds some light on what is going on under-the-hood in Dask. Hope that helps.

Excited to see this work progress! 😄

@aamirshafi
Copy link
Author

Thanks @jakirkham for your response.

Just to touch on a few things, generally I think there is interest in having some sort of MPI based communication layer for Dask. So I think there is agreement on that. Ideally this is something that both yourself, Ian, and colleagues of both of you are happy with. Hoping this discussion can help us bridge these two implementations and determine what that final result looks like.

We are happy to contribute.

IIUC you are referring to Dask's serialization mechanisms to_frames and from_frames to handle object serialization and collect out-of-band frames for transmission. Is that right? If so, yes using this is preferable. We've been making some improvements to cutdown overhead here where possible ( mainly by handling more stuff as part of MsgPack serialization #4531 ). There are likely more improvements to come. Though would expect communication using to_frames and from_frames is agnostic to these changes. Just some information that may be of interest to you.

Both implementations make use of to_frames and `from_frames' to serialize/de-serialize messages. In that sense, these devices should be able to make use of new developments (#4531).

Out-of-band in the context of MPI4Dask meant that we used the connection provided by asyncio.open_connection() and asyncio.start_server() to exchange messages needed for initial connection setup. In-band messages for MPI4Dask are obviously data exchanged using MPI's send and receive methods.

Another related point was that mpi4py provides two variants of send and receive methods. The first variant begins with capital letters (like Send() and Recv()) and the second variant begins with small letters (like send() and recv()). The second variant is used to communicate data to/from Python objects and hence would naturally have overheads associated to serialization/de-serialization. My point was not related to to_frames and from_frames -- both implementations support that.

Thanks for pointing that out. So how are you handling GPU base communication? Does this use NVLink and IB? Do you need to check whether something is on device?

For the native MPI library, we have used the MVAPICH-GDR library that supports GPUDirect RDMA, CUDA IPC, and GDRCopy. This library figures out the optimal communication path (NVLink or IB) between GPUs.

Also worth noting there's a special "cuda" serializer (see usage here and framework here) that will keep objects on device as part of serialization. The frames will then be device buffers. Depending on how MPI is handling the serialization it may be possible to avoid synchronization (for example we often use RMM to back GPU memory, which is stream ordered).

MPI4Dask is using this mechanism since we used UCX device as the starting point and tried to make minimal changes to it.

Interesting would be curious to know more about the applications Ian had in mind where polling overhead is an issue. Something else to point out here is we added support for using uvloop for the event loop ( #4448 ). So if one has uvloop installed and enable it in the Distributed config, uvloop will be used for the event loop. Not sure if this is relevant for the overhead Ian observed, but thought it worth mentioning.

Anyways not sure how relevant that context is for you or Ian, but I hope that sheds some light on what is going on under-the-hood in Dask. Hope that helps.

Thanks for your comments.

Excited to see this work progress! 😄

@ianthomas23
Copy link

@jakirkham and @aamirshafi I did post a comment on my PR a few weeks ago that you may not have read but is relevant here. Since writing that PR I have moved on to other things and all of my available open source contribution time is spent on those (non-dask) projects. So I am happy to answer a few questions, but I don't envisage returning to this work.

Firstly I should say that my implementation was something that I wrote one week sat on my sofa to prove that it was possible. There is no corresponding academic paper and I had no co-workers to help me. It certainly isn't as polished, documented or tested as this PR.

MPI4Dask is based on the UCX communication device and hopefully that makes it easier to review.

Mine is based on the TCP comms code.

Ian's code does not use any out-of-band connection establishment.

If I understand you correctly then I think mine works in exactly the same way, just like TCP that it is based on.

Ian's MPI code uses mpi4py's send() and recv() functions

That's right, so I am using mpi4py's pickling and unpickling. I have the option of using dask's to_frames and from_frames (like TCP comms) but that proved to be slower.

Ian's MPI device does not handle that because it has not been tested for such applications.

Correct, not tested using big data sets requiring large messages.

As far as I can tell, Ian's device currently does not have support for GPU-based applications.

Correct, mine in "plain vanilla" MPI.

Ian's MPI device has 5 or 10 ms polling interval.

I don't know where these numbers come from. Sending has a 1 ms polling interval (but would probably need to be tuned):

# To identify the completion of asynchronous MPI sends and receives, need to
# keep polling them. This is the (minimum) time between polls, in seconds.
_polling_time = 0.001

Receiving is either blocking where this is acceptable, or non-blocking with a polling interval that starts at 1 ms and then doubles each time

yield from asyncio.sleep(self._sleep_time).__await__()
self._sleep_time *= 2 # Backoff.

I have a debug option to record the time taken between receiving an MPI message by the single MessageReceiver and its consumption by the appropriate recipient. In my tests this was usually less than a millisecond, but occasionally would be ~15 ms, which would definitely need investigation.

Ian's code has a different tag generation mechanism that I do not fully understand.

MPIConnectors obtain a unique tag (from that MPI rank's point of view) from the TagGenerator and pass this to the MPIListener so that both ends use the same tag. Hence the combination of (tag, other_rank) is unique from each MPI rank's point of view.

I hope this helps clarify!

@aamirshafi
Copy link
Author

@jakirkham can you please advise on next steps to have this merged? Any particular requirements?

@@ -0,0 +1,594 @@
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this somewhere else? Maybe dask-examples would make sense?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can do that. The only reason to put them here was to have sample code for the MPI backend.

My colleague @ZackWRyan will help with this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove both of these examples from this PR, we can make a separate PR on dask-examples to go along with this one later.

@@ -0,0 +1,91 @@
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved as well.

@ZackWRyan

@@ -14,6 +14,7 @@


def _register_transports():
from . import mpi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be in a try...except... like ucx below as mpi4py is likely not always available. Also would suggest moving this next to ucx below since inproc and tcp are always available and should probably be registered first

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

@ZackWRyan

Comment on lines +67 to +73
def synchronize_stream(stream=0):
import numba.cuda

ctx = numba.cuda.current_context()
cu_stream = numba.cuda.driver.drvapi.cu_stream(stream)
stream = numba.cuda.driver.Stream(ctx, cu_stream, None)
stream.synchronize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does MPI need to synchronize on the stream before sending? Many things with RMM and cuDF are stream-ordered. So this might not be needed if MPI respects that

Copy link
Author

@aamirshafi aamirshafi May 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are investigating and will update here.

@ZackWRyan


logger.debug(tag_table)

def init_once():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a lot of this repeats code in the UCX module, maybe it makes sense to refactor all the common stuff into a utility function that both can call on initialization

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that can be done as well. The common code inside init_once is mostly related to initializing host_array and device_array.

@ZackWRyan


initialized = True

random.seed(random.randrange(sys.maxsize))
Copy link
Member

@jakirkham jakirkham May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Also what happens if a user changes the seed after startup? Could this module track its own random object if it needs a particular send for some reason to make things a bit more robust?

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to a followup comment. The MPI backend currently has a random wait before connecting to other workers. This should not be needed as far as I can tell. We are investigating this further and will get back here.

@ZackWRyan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial tests are showing that the random seed and connection delay are indeed not necessary, once more results come back and we're confident it still works I'll make the commit to remove them.

s_idx = (i) * blk_size
e_idx = (i+1) * blk_size

if type(buf) == rmm._lib.device_buffer.DeviceBuffer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check the type? This also seems to be reaching into a private module

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MPI's point-to-point functions cannot send messages larger than 2GB - 1 when sending MPI.BYTE because it uses a signed integer to specify the number of bytes sent/received. For this reason, the MPI device needs to chunk data into blocks and communicate.

This chunking works as long as the datatype communicated is sub-scriptable (like cuPy for example). Sub-scriptable here means using the : operator to index into specific regions of the buffer. However if the data being communicated is rmm._lib.device_buffer.DeviceBuffer, then its not sub-scriptable and hence a special case in the code. In this case, the device directly initializes a shadow buffer (of type rmm.DeviceBuffer) and communicate the next chunk.

Is there a better way to do this?

@ZackWRyan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A brief search didn't find any other ways around the special case for RMM buffers. One thought would be to convert RMM's buffer to some other type of buffer (e.g. PyBuffer, which is sub-scriptable/sliceable), but as far as I can tell, the other buffer types that I considered require extra communication and/or fully transferring the data back to CPU memory. That would have negative impacts on performance, so I'd advice just leaving it like this.


if type(buf) == rmm._lib.device_buffer.DeviceBuffer:
# need this if because rmm.DeviceBuffer is not subscriptable
shadow_buf = rmm.DeviceBuffer(ptr=(buf.ptr+s_idx), \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use device_array here as that is defined above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above. If there's a suggestion to improve, please let us know.

@ZackWRyan

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to what Aamir said, to replace subscripting/slicing for RMM buffers it's necessary to create these 'shadow' buffers that are actually just chunks of the larger RMM buffer, and in order to do that without copying we need to specify a pointer to an intermediate spot in the buffer. device_array doesn't allow us to specify a pointer, only a size, partially because that device_array might actually be a Numba array (not RMM), and as far as I can tell, Numba doesn't support creating device buffers at specific pointer locations.

Comment on lines +587 to +588
rand_num = 0.001 * random.randint(0, 1000)
await asyncio.sleep(rand_num)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a random sleep needed?

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained this earlier. We should be able to avoid this sleep. I will update here on this.

@ZackWRyan

# we exclude in for inproc.
return self.get_host_port()

class MPI4DaskBackend(Backend):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to create an abstract base class that handles MPI and TCP's needs? This should avoid the duplication here

Copy link
Author

@aamirshafi aamirshafi May 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicate comment is coming from UCX device. We can come to this after addressing other comments.

@ZackWRyan

@aamirshafi aamirshafi requested a review from fjetter as a code owner January 23, 2024 10:57
@sanjeev-one
Copy link

Bump on this. Is mpi able to be used for dask to communicate between workers?

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants