Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue: Inconsistent communication between nodes #6584

Closed
YarShev opened this issue Jul 6, 2023 · 27 comments · Fixed by #6591
Closed

issue: Inconsistent communication between nodes #6584

YarShev opened this issue Jul 6, 2023 · 27 comments · Fixed by #6591

Comments

@YarShev
Copy link

YarShev commented Jul 6, 2023

Hi there,

I am a developer of Modin and unidist. We use MPI as a backend in unidist, and mpich can be used as one of the implementations in that backend. Modin uses unidist to distribute computation.

We have been looking at one of our benchmarks recently. The benchmark is HM, which has to do with data processing and uses Modin for this purpose. Modin distributes computation with unidist on MPI.

We have checked the benchmark in a single node with mpich, Open MPI and Intel MPI. Everything works well. Then, we tried running the benchmark in a cluster consisting of two nodes. Open MPI and Intel MPI work well, but mpich fails.

How does unidist work? We have Root process (rank 0) and multiple workers (rank 1, ..., N), which sit in infinite while loop and wait for
tasks to execute. Root sends out tasks to workers that start executing them. Workers themselves can communicate with each other to send/recv necessary data for task execution.

How does the HM fails? Root process goes up to the end of the benchmark, sends out a termination signal to workers, and ends up execution. Workers themselves have asyncronous pending requests got from Isends during the flow. Once workers receive the termination signal, they gradually finish all remaining tasks, particularly, call Wait on every request to make sure there is no pending isend operation, as well as call recv and irecv to receive/cancel communication. However, all of a sudden, a worker on one node receives wrong data from other worker from other node. We see the error in the workers.

I tried using mpich from conda and got the error on unidist side because mpi.recv returns wrong data, i.e., we expect to receive other data. Also, there is the following output from mpi internals

Abort(409119631) on node 1 (rank 1 in comm 0): Fatal error in internal_Wait: Other MPI error, error stack:
internal_Wait(89).............: MPI_Wait(request=0x7f31971a5910, status=0x1) failed
MPIR_Wait(948)................:
MPIR_Wait_state(886)..........:
MPID_Progress_wait(335).......:
MPIDI_progress_test(158)......:
MPIDI_OFI_handle_cq_error(625): OFI poll failed (ofi_events.c:627:MPIDI_OFI_handle_cq_error:Input/output error)

I also tried mpich built from source and only get the error on unidist side because mpi.recv returns wrong data, i.e., we expect to receive other data. There is no any output from mpi internals.

I would really appreciate if you could help handle the issue. Maybe there has already been a similar issue in internode communication using isend.

Thanks in advance.

@hzhou
Copy link
Contributor

hzhou commented Jul 6, 2023

@YarShev Could you guide me on how to set up the environment for reproducing the issue?

@YarShev
Copy link
Author

YarShev commented Jul 11, 2023

@hzhou,

Sure, here is the steps you can follow.

  1. Create a cluster consisting of two nodes. I use i3.metal instances on AWS, which have the following characteristics.
Instance type vCPUs Architecture Memory (GiB) Storage (GB) Storage type Network performance
i3.metal 72 x86_64 512 15200 ssd 25 Gigabit

Note that the benchmark consumes ~270 GB of RAM on each node.

  1. Make sure that the nodes can connect to each other via ssh.

  2. Install conda.

$ wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
$ sh Miniconda3-latest-Linux-x86_64.sh
  1. Create a conda environment and install necessary packages.
$ conda create -n mpich-test python=3.8
$ conda activate mpich-test
$ pip install modin
$ conda install -c conda-forge mpi4py mpich
$ pip install msgpack
$ pip install unidist
  1. Clone the repo containing the benchmark.
$ git clone https://github.com/intel-ai/timedf.git
$ cd timedf
  1. Make the following change in setup.py to not install unnecessary packages and not pollute already installed packages.
$ git diff
diff --git a/setup.py b/setup.py
index 99ff536..7c62da6 100644
--- a/setup.py
+++ b/setup.py
@@ -30,7 +30,7 @@ setup(
         *find_packages(include=["timedf*"]),
         *find_namespace_packages(include=["timedf_benchmarks.*"]),
     ],
-    install_requires=parse_reqs("base.txt"),
+    # install_requires=parse_reqs("base.txt"),
     extras_require={"reporting": reporting_reqs, "all": all_reqs},
     python_requires=">=3.8",
     entry_points={
  1. Install the benchmark repo as a package.
$ pip install -e .
  1. Load the datasets.
$ pip install kaggle
$ benchmark-load hm_fashion_recs ./datasets/hm_fashion_recs.
  1. Run the benchmark.
mpiexec -n 1 -env UNIDIST_CPUS 44 -env MODIN_CPUS 44 -env UNIDIST_MPI_HOSTS host1,host2 python timedf/scripts/benchmark_run.py hm_fashion_recs -data_file ./datasets/hm_fashion_recs/ -pandas_mode Modin_on_unidist_mpi -verbosity 1 -no_ml
  1. After each run hm_tmpdir folder is created. Make sure to remove it before each next run.

If you have any questions/issues on reproducing the issue, please let me know. You can also contact me directly via piterok123@gmail.com.

@YarShev
Copy link
Author

YarShev commented Jul 11, 2023

Note that we run the benchmark using mpiexec -n 1. Other workers are spawned dynamically via MPI.Comm.Spawn. That said, I also checked if the issue persists using mpiexec -n <N>, i.e., without dynamic spawn, and got the same error.

The full line of running the benchmark without dynamic spawn.

$ mpiexec -n 46 -env UNIDIST_CPUS 44 -env MODIN_CPUS 44 -host host1,host2 -env UNIDIST_IS_MPI_SPAWN_WORKERS False python timedf/scripts/benchmark_run.py hm_fashion_recs -data_file ./datasets/hm_fashion_recs/ -pandas_mode Modin_on_unidist_mpi -verbosity 1 -no_ml

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

We have been able to reproduce the issue using mpi4py code only.

from mpi4py import MPI
import time
import socket

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

hostname = socket.gethostname()
host = socket.gethostbyname(hostname)
with open('out.txt', 'w') as f:
    print(f"host: {host}", file=f)

tag = 1
if rank == 0:
    dest_rank = 1
    r1 = comm.isend(1.00001, dest=dest_rank, tag=tag)
    r2 = comm.isend(2.0002, dest=dest_rank, tag=tag)
    r1.wait()
    r2.wait()
elif rank == 1:
    source_rank = 0
    backoff=0.01
    while not comm.Iprobe(source=source_rank, tag=tag):
        time.sleep(0.01)
    r1 = comm.irecv(source=source_rank, tag=tag)
    r1.cancel()
    r1.wait()
    r2 = comm.irecv(source=source_rank, tag=tag)
    while True:
        status, data = r2.test()
        if status:
            with open('out2.txt', 'w') as f:
                print(f"data: {data}", file=f)
            break
        else:
            time.sleep(backoff)

In the second recv we expect to receive 2.0002, but actually receive 1.00001. The issue happens on two nodes, but in a single node it works well and we receive 2.0002 as expected. There is no such an issue on Intel MPI and Open MPI.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

The command to run is the following.

$ mpiexec -n 2 -host host1,host2 python mpich-repr.py

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

First of all, you should check the status after r1.cancel(). Second, if r1 is successfully cancelled, then the first message (1.00001) is still in the queue, shouldn't r2 receive it?

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

If the user cancels a communication request, why should he receive the data from the cancelled request? Seems wrong to me.

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

If you successfully cancelled an MPI_Irecv, it would be as if the MPI_Irecv never issued -- isn't that the meaning of cancel? If you don't mean to "cancel", you may simply "free" (MPI_Request_free) an active request and MPI library will complete the request when the message arrives and free the resource afterward.

EDIT: to clarify, you are not canceling the "request". You are canceling the operation represented by the request. A request object is simply a handle representing an asynchronous operation.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

If you successfully cancelled an MPI_Irecv, it would be as if the MPI_Irecv never issued -- isn't that the meaning of cancel?

I just read the standard again and yes, this is probably the case. I wonder if it is possible to cancel irecv at all so the subsequent irecv would get the data from a subsequent isend but not the previous one, which was canceled on the receiver side?

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

I just read the standard again and yes, this is probably the case. I wonder if it is possible to cancel irecv at all so the subsequent irecv would get the data from a subsequent isend but not the previous one, which was canceled on the receiver side?

You don't need to cancel the irecv; you can just free the request.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

In that case the communication finishes successfully, i.e., the receiver will get data eventually, which may bloat memory up a little if the data is huge. Is there a way to avoid such a case, i.e., as I said fully cancell the communication request?

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

I see. You may issue an irecv with a small buffer if you don't care about data truncation.

EDIT: I think you can even issue irecv with NULL buffer with count equal to 0.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

I don't quite get you. If I execute the following code, I will get the error. Please elaborate the point you are saying about.

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = numpy.arange(10000000, dtype='i')
    r = comm.Isend([data, MPI.INT], dest=1, tag=77)
    r.Wait()
elif rank == 1:
    data = numpy.empty(1, dtype='i')
    r = comm.Irecv([data, MPI.INT], source=0, tag=77)
    r.Wait()
Traceback (most recent call last):
  File "mpich-repr.py", line 14, in <module>
    r.Wait()
  File "mpi4py/MPI/Request.pyx", line 37, in mpi4py.MPI.Request.Wait
mpi4py.MPI.Exception: Message truncated, error stack:
PMPI_Wait(216): MPI_Wait(request=0x7f7576fc3f50, status=0x1) failed
MPIR_Wait(112):
do_cts(548)...: Message truncated; 40000000 bytes received but buffer size is 4

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

The Wait will return "truncation" error. You can set error handler to MPI_ERRORS_RETURN and ignore MPI_ERR_TRUNCATE. But since you don't care to receive the message, you don't need to Wait for it. You can simply free the request.

EDIT: I think that means you can just remove r.Wait(). Python should call the MPI_Request_free during garbage collection.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

Let me try this on the HM benchmark and get back to you.

@YarShev
Copy link
Author

YarShev commented Jul 12, 2023

Before running the HM I tried this simple example and it hangs. Is that some side effect of MPI_Request_free? Because if I change it to Wait, everything works well.

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = numpy.arange(1000000, dtype='i')
    r = comm.Isend([data, MPI.INT], dest=1, tag=77)
    print("before")
    r.Wait()
    print("after")
elif rank == 1:
    data = numpy.empty(1000000, dtype='i')
    r = comm.Irecv([data, MPI.INT], source=0, tag=77)
    r.Free()
    print(data)

EDIT: Intel MPI just crashed.

@hzhou
Copy link
Contributor

hzhou commented Jul 12, 2023

I'll check your example later. Meanwhile, let's consult an mpi4py expert --
@dalcinl Can I borrow some of your insight?

@hzhou
Copy link
Contributor

hzhou commented Jul 13, 2023

Reference example in C:

#include <stdio.h>
#include <mpi.h>

int main(int argc, char** argv)
{
    int tag = 10;
    MPI_Comm comm = MPI_COMM_WORLD;

    MPI_Init(NULL, NULL);

    int mpi_size, mpi_id;
    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_id);

    if (mpi_id == 0) {
        int send_buf[10];
        MPI_Request send_req;
        MPI_Isend(send_buf, 10, MPI_INT, 1, tag, comm, &send_req);
        MPI_Wait(&send_req, MPI_STATUS_IGNORE);
        puts("send 10 ints done.");
    } else if (mpi_id == 1) {
        MPI_Request recv_req;
        MPI_Irecv(NULL, 0, MPI_INT, 0, tag, comm, &recv_req);
        printf("recv_req = %x\n", recv_req);
        MPI_Request_free(&recv_req);
    }

    MPI_Finalize();
    return 0;
}

@YarShev
Copy link
Author

YarShev commented Jul 13, 2023

Does it hang in C too?

@dalcinl
Copy link
Contributor

dalcinl commented Jul 13, 2023

@hzhou This is a modification to your C code

#include <stdio.h>
#include <mpi.h>

#define N 1000000
int send_buf[N];

int main(int argc, char** argv)
{
    int tag = 10;
    MPI_Comm comm = MPI_COMM_WORLD;

    MPI_Init(NULL, NULL);

    int mpi_size, mpi_id;
    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &mpi_id);

    if (mpi_id == 0) {
        MPI_Request send_req;
        MPI_Isend(send_buf, N, MPI_INT, 1, tag, comm, &send_req);
        MPI_Wait(&send_req, MPI_STATUS_IGNORE);
        puts("send 10 ints done.");
    } else if (mpi_id == 1) {
        MPI_Request recv_req;
        MPI_Irecv(NULL, 0, MPI_INT, 0, tag, comm, &recv_req);
        printf("recv_req = %x\n", recv_req);
        MPI_Request_free(&recv_req);
    }
    MPI_Finalize();
    printf("[%d] done\n", mpi_id);
    return 0;
}

I'm running it on macOS Ventura with Homebrew MPICH 4.1.2 (ch4:ofi).
With N=1M as above, there is a deadlock after wait in process 0.
With N=10 as in your original code, the deadlock is gone.

This looks definitely like an issue in MPICH (eager vs. rendezvous protocol implementations?).

@YarShev Maybe you can workaround the message truncation error the usual Python way:

try:
    r.Wait()
except MPI.Exception as exc:
    if exc.error_class == MPI.ERR_TRUNCATE:
        pass  # ignore the expected truncation error
    else:
        raise # but do not ignore other MPI errors

@YarShev
Copy link
Author

YarShev commented Jul 13, 2023

@dalcinl, thanks for the workaround but it seems to me a little clumsy. Moreover, we have to cancel multiple recvs in a row. This workaround looks like the only way for now to completely cancel the communication request though. That said, we'll probably wait for a fix to use request.Free.

@hzhou
Copy link
Contributor

hzhou commented Jul 13, 2023

This looks definitely like an issue in MPICH (eager vs. rendezvous protocol implementations?).

Indeed. The issue is process 1 exit before finishing the rendezvous protocol with process 0. Adding an MPI_Barrier before MPI_Finalize can serve as a workaround.

The proper fix probably is to check for pending requests and try to complete them before a timeout at MPI_Finalize. Ref. #6513.

@hzhou
Copy link
Contributor

hzhou commented Jul 13, 2023

@YarShev If you can make sure the receiving processes do not exit before the sender processes finishing the sending, just free the request should work. Depend on your application, you may simply add a Barrier before exiting. If a collective Barrier is not practical, you can arrange the sender to send a small "ok-to-exit" message to the receiver, and have the receiver wait for that message before exit. Both will ensure the process finishing all progress before exiting.

@hzhou hzhou changed the title Inconsistent communication between nodes issue: Inconsistent communication between nodes Jul 13, 2023
@YarShev
Copy link
Author

YarShev commented Jul 13, 2023

For now we just allow remaining operations between workers to complete when the rank 0 exits the program. Look forward to seeing a fix for request.Free.

@YarShev
Copy link
Author

YarShev commented Aug 21, 2023

@hzhou, which nearest release will include the fix for this issue?

@hzhou
Copy link
Contributor

hzhou commented Aug 21, 2023

@hzhou, which nearest release will include the fix for this issue?

MPICH 4.2b1, it is planned to release this November.

@YarShev
Copy link
Author

YarShev commented Aug 21, 2023

I see, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants