Skip to content

Commit

Permalink
Merge pull request trilinos#8045 from Tech-XCorp/fixDirectoryComm
Browse files Browse the repository at this point in the history
Zoltan2: Refactor directory to use Teuchos comm
  • Loading branch information
MicheldeMessieres authored Sep 24, 2020
2 parents 7ce6221 + 82f8b7b commit 09c3481
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 149 deletions.
12 changes: 3 additions & 9 deletions packages/zoltan2/core/src/directory/Zoltan2_Directory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#define ZOLTAN2_DIRECTORY_H_

#include <Teuchos_DefaultComm.hpp> // currently using Teuchos comm throughout
#include <Teuchos_CommHelpers.hpp>

#ifndef HAVE_MPI
// support mpi serial - directory currently has a mix of Teuchos mpi commands
Expand Down Expand Up @@ -214,7 +215,8 @@ class Zoltan2_Directory {
typedef long long mpi_t;
mpi_t nDDEntries = static_cast<mpi_t>(node_map.size());
mpi_t firstIdx;
MPI_Scan(&nDDEntries, &firstIdx, 1, MPI_LONG_LONG, MPI_SUM, getRawComm());
Teuchos::scan(*comm, Teuchos::REDUCE_SUM,
1, &nDDEntries, &firstIdx);
firstIdx -= nDDEntries; // do not include this rank's entries in prefix sum
size_t cnt = 0;
for(size_t n = 0; n < node_map.capacity(); ++n) {
Expand Down Expand Up @@ -296,14 +298,6 @@ class Zoltan2_Directory {
Zoltan2_DD_Find_Msg<gid_t,lid_t>* msg) const { return 0; };

private:
MPI_Comm getRawComm() {
#ifdef HAVE_MPI
return Teuchos::getRawMpiComm(*comm);
#else
return MPI_COMM_WORLD;
#endif
}

void rehash_node_map(size_t new_hash_size) {
node_map.rehash(new_hash_size);
}
Expand Down
160 changes: 68 additions & 92 deletions packages/zoltan2/core/src/directory/Zoltan2_Directory_Comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ Zoltan2_Directory_Comm::Zoltan2_Directory_Comm(

if (MPI_RECV_LIMIT > 0) {

throw std::logic_error("UNTESTED COMM 2"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (1)"); // needs unit testing

/* If we have a limit to the number of posted receives we are allowed,
** and our plan has exceeded that, then switch to an MPI_Alltoallv so
Expand All @@ -346,9 +346,6 @@ Zoltan2_Directory_Comm::Zoltan2_Directory_Comm(
if (plan_forward->maxed_recvs == 0) {
// See notes in header for MPI_Request
plan_forward->request.resize(plan_forward->nrecvs);
plan_forward->status.resize(plan_forward->nrecvs);
// plan_forward->request = Teuchos::arcp(new MPI_Request[plan_forward->nrecvs], 0, plan_forward->nrecvs, true);
// plan_forward->status = Teuchos::arcp(new MPI_Status[plan_forward->nrecvs], 0, plan_forward->nrecvs, true);
}

nrec = total_recv_size;
Expand Down Expand Up @@ -399,8 +396,7 @@ int Zoltan2_Directory_Comm::invert_map(

int nrecvs = 0; /* number of messages I'll receive */

// TODO: Teuchos::scatterImpl(...)
MPI_Scatter(&(counts[0]), 1, MPI_INT, &nrecvs, 1, MPI_INT, 0, getRawComm());
Teuchos::scatter<int, int>(&(counts[0]), 1, &nrecvs, 1, 0, *comm);

int max_nrecvs = 0;
if (my_proc == 0) {
Expand All @@ -411,9 +407,7 @@ int Zoltan2_Directory_Comm::invert_map(
}
}

// TODO: Teuchos::MpiComm<Ordinal>::broadcast(...)

MPI_Bcast(&max_nrecvs, 1, MPI_INT, 0, getRawComm());
Teuchos::broadcast(*comm, 0, &max_nrecvs);

if(nrecvs > 0) {
lengths_from.resize(nrecvs); /* number of items I'm receiving */
Expand All @@ -422,47 +416,45 @@ int Zoltan2_Directory_Comm::invert_map(

if (MPI_RECV_LIMIT == 0 || max_nrecvs <= MPI_RECV_LIMIT) {
// See notes in header for MPI_Request
std::vector<MPI_Request> req(nrecvs);
//Teuchos::ArrayRCP<MPI_Request> req(new MPI_Request[nrecvs], 0, nrecvs, true);
Teuchos::ArrayRCP<Teuchos::RCP<Teuchos::CommRequest<int> > > requests(nrecvs);

/* Note: I'm counting on having a unique tag or some of my incoming */
/* messages might get confused with others. */

// TODO: Teuchos::ireceiveImpl(...)
for (int i=0; i < nrecvs; i++) {
MPI_Irecv(&(lengths_from[0]) + i, 1, MPI_INT, MPI_ANY_SOURCE,
tag, getRawComm(), &(req[i]));
#ifdef HAVE_MPI // Teuchos::ireceive not implemented for Serial - Serial is just for debugging
Teuchos::ArrayRCP<int> single_elem(&lengths_from[i], 0, 1, false);
requests[i] = Teuchos::ireceive(single_elem, MPI_ANY_SOURCE, tag, *comm);
#endif
}

// TODO: Teuchos::sendImpl(...)
for (int i=0; i < nsends+self_msg; i++) {
// MPI_Send takes non-const for buf (1st param)
// Apparently in future versions this will become true const
// Then the const_cast could be removed
MPI_Send(const_cast<int*>(&lengths_to[i]), 1, MPI_INT, procs_to[i], tag,
getRawComm());
#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
Teuchos::send(&lengths_to[i], 1, procs_to[i], tag, *comm);
#endif
}

for (int i=0; i < nrecvs; i++) {
MPI_Status status;
MPI_Wait(&(req[i]), &status);
procs_from[i] = status.MPI_SOURCE;
#ifdef HAVE_MPI
procs_from[i] = requests[i]->wait()->getSourceRank();
#else
// above Teuchos MPI calls not supported for Serial so manually do the transfer.
// We don't really need Serial for this class but it helps with debugging to have a serial test that can run.
lengths_from[i] = lengths_to[i];
#endif
}

}
else { /* some large HPC machines have a limit on number of posted receives */
throw std::logic_error("UNTESTED COMM 3"); // needs unit testing

Teuchos::ArrayRCP<int> sendbuf(new int[nprocs], 0, nprocs, true);
Teuchos::ArrayRCP<int> recvbuf(new int[nprocs], 0, nprocs, true);

for (int i=0; i < nsends + self_msg; i++) {
sendbuf[procs_to[i]] = lengths_to[i];
}

// TODO: Teuchos
MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
getRawComm());
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (2)"); // needs unit testing
// Did not refactor this - need Teuchos form but this is not tested code.
// MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT, getRawComm());

for (int i=0, j=0; i < nprocs; i++) {
if (recvbuf[i] > 0){
Expand Down Expand Up @@ -587,13 +579,13 @@ int Zoltan2_Directory_Comm::do_post(

/* If not point to point, currently we do synchroneous communications */
if (plan->maxed_recvs) {
throw std::logic_error("UNTESTED COMM 4"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (3)"); // needs unit testing
return do_all_to_all(plan, send_data, nbytes, recv_data);
}

int my_proc = plan->comm->getRank(); /* processor ID */
if ((plan->nsends + plan->self_msg) && !send_data.size()) {
throw std::logic_error("UNTESTED COMM 5"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (4)"); // needs unit testing
size_t sum = 0;
if (plan->sizes_to.size()) { /* Not an error if all sizes_to == 0 */
for (int i = 0; i < (plan->nsends + plan->self_msg); i++) {
Expand All @@ -605,7 +597,7 @@ int Zoltan2_Directory_Comm::do_post(
}
}
if ((plan->nrecvs + plan->self_msg) && recv_data == Teuchos::null) {
throw std::logic_error("UNTESTED COMM 6"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (5)"); // needs unit testing
size_t sum = 0;
if (plan->sizes_from != Teuchos::null) /* Not an error if all sizes_from == 0 */
for (int i = 0; i < (plan->nrecvs + plan->self_msg); i++)
Expand All @@ -630,12 +622,10 @@ int Zoltan2_Directory_Comm::do_post(
int k = 0;
for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
if (plan->procs_from[i] != my_proc) {
// TODO: Teuchos::ireceiveImpl(...)
MPI_Irecv((void *)
&plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from[i])*(size_t)nbytes],
plan->lengths_from[i] * nbytes,
(MPI_Datatype) MPI_BYTE, plan->procs_from[i], tag,
getRawComm(), &plan->request[k]);
Teuchos::ArrayRCP<char> subview(
&plan->getRecvBuff().getRawPtr()[plan->starts_from[i] * nbytes],
0, plan->lengths_from[i] * nbytes, false);
plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
k++;
}
else {
Expand All @@ -648,16 +638,15 @@ int Zoltan2_Directory_Comm::do_post(
for (int i = 0; i < plan->nrecvs + plan->self_msg; i++) {
if (plan->procs_from[i] != my_proc) {
if (plan->sizes_from[i]) {
// TODO: Teuchos::ireceiveImpl(...)
MPI_Irecv((void *)
Teuchos::ArrayRCP<char> subview(
&plan->getRecvBuff().getRawPtr()[(size_t)(plan->starts_from_ptr[i])
* (size_t)nbytes],
plan->sizes_from[i] * nbytes,
(MPI_Datatype) MPI_BYTE, plan->procs_from[i],
tag, getRawComm(), &plan->request[k]);
0, plan->sizes_from[i] * nbytes, false);
plan->request[k] = Teuchos::ireceive(subview, plan->procs_from[i], tag, *comm_);
}
else {
plan->request[k] = Teuchos::null;
}
else
plan->request[k] = MPI_REQUEST_NULL;
k++;
}
else {
Expand Down Expand Up @@ -704,11 +693,10 @@ int Zoltan2_Directory_Comm::do_post(
int self_num = 0; /* where in send list my_proc appears */
for (int i = proc_index, j = 0; j < nblocks; j++) {
if (plan->procs_to[i] != my_proc) {
// TODO: Teuchos::readySend(...)
MPI_Rsend(
(void *) &send_data[(size_t)(plan->starts_to[i])*(size_t)nbytes],
plan->lengths_to[i] * nbytes, (MPI_Datatype) MPI_BYTE,
plan->procs_to[i], tag, getRawComm());
Teuchos::ArrayRCP<char> subview(
&send_data[plan->starts_to[i] * nbytes],
0, plan->lengths_to[i] * nbytes, false);
Teuchos::readySend(subview.getRawPtr(), static_cast<int>(subview.size()), plan->procs_to[i], tag, *comm_);
}
else {
self_num = i;
Expand Down Expand Up @@ -744,10 +732,8 @@ int Zoltan2_Directory_Comm::do_post(
&send_data[(size_t)(plan->indices_to[j++]) * (size_t)nbytes], nbytes);
offset += nbytes;
}
// TODO: Teuchos::readySend(...)
MPI_Rsend((void *) &(send_buff[0]), plan->lengths_to[i] * nbytes,
(MPI_Datatype) MPI_BYTE, plan->procs_to[i], tag,
getRawComm());
Teuchos::readySend(&send_buff[0], plan->lengths_to[i] * nbytes,
plan->procs_to[i], tag, *comm_);
}
else {
self_num = i;
Expand All @@ -774,12 +760,8 @@ int Zoltan2_Directory_Comm::do_post(
for (int i = proc_index, j = 0; j < nblocks; j++) {
if (plan->procs_to[i] != my_proc) {
if (plan->sizes_to[i]) {
// TODO: Teuchos::readySend(...)
MPI_Rsend((void *) &send_data[(size_t)(
plan->starts_to_ptr[i]) * (size_t)nbytes],
plan->sizes_to[i] * nbytes,
(MPI_Datatype) MPI_BYTE, plan->procs_to[i],
tag, getRawComm());
Teuchos::readySend(&send_data[plan->starts_to_ptr[i] * nbytes], plan->sizes_to[i] * nbytes,
plan->procs_to[i], tag, *comm_);
}
}
else
Expand Down Expand Up @@ -819,9 +801,8 @@ int Zoltan2_Directory_Comm::do_post(
j++;
}
if (plan->sizes_to[i]) {
// TODO: Teuchos::readySend(...)
MPI_Rsend((void *) &(send_buff[0]), plan->sizes_to[i] * nbytes,
(MPI_Datatype) MPI_BYTE, plan->procs_to[i], tag, getRawComm());
Teuchos::readySend(&send_buff[0], plan->sizes_to[i] * nbytes,
plan->procs_to[i], tag, *comm_);
}
}
else
Expand Down Expand Up @@ -875,9 +856,7 @@ int Zoltan2_Directory_Comm::do_wait(
/* Note: since request is in plan, could wait in later routine. */
if (plan->indices_from == Teuchos::null) { /* No copying required */
if (plan->nrecvs > 0) {
// TODO: Teuchos::waitAllImpl(...)
MPI_Waitall(plan->nrecvs, &plan->request[0], &plan->status[0]);
//MPI_Waitall(plan->nrecvs, plan->request.getRawPtr(), plan->status.getRawPtr());
Teuchos::waitAll(*comm_, plan->request());
}
}
else { /* Need to copy into recv_data. */
Expand Down Expand Up @@ -917,14 +896,17 @@ int Zoltan2_Directory_Comm::do_wait(
}

for (int jj = 0; jj < plan->nrecvs; jj++) {
MPI_Status status; /* return from Waitany */
int index;
MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);
//MPI_Waitany(plan->nrecvs, plan->request.getRawPtr(), &index, &status);

if (index == MPI_UNDEFINED) {
break; /* No more receives */
}
// TODO: Refactored directory to use Teuchos comm but we have no Teuchos::waitAny
// Short term fix is we just call wait() on each request in serial.
// When we add Teuchos::waitAny we can replace it for the old version
// which is commented out below.
plan->request[jj]->wait();
int index = jj;

// Old form with MPI_Waitany
// MPI_Status status; /* return from Waitany */
// int index;
// MPI_Waitany(plan->nrecvs, &plan->request[0], &index, &status);

if (index >= self_num) {
index++;
Expand Down Expand Up @@ -967,7 +949,7 @@ int Zoltan2_Directory_Comm::do_all_to_all(
int nbytes, /* msg size */
Teuchos::ArrayRCP<char> &recv_data) /* array of data I'll own after comm */
{
throw std::logic_error("UNTESTED COMM 10"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (6)"); // needs unit testing

int sm = (plan->self_msg > 0) ? 1 : 0;

Expand Down Expand Up @@ -1234,13 +1216,9 @@ int Zoltan2_Directory_Comm::do_all_to_all(
}

/* EXCHANGE DATA */
MPI_Alltoallv(&(outbuf[0]), &(outbufCounts[0]), &(outbufOffsets[0]), MPI_BYTE,
&(inbuf[0]), &(inbufCounts[0]), &(inbufOffsets[0]), MPI_BYTE, getRawComm());

// TODO: Restorecan we just copy optimizations - original just set ptr
//if (outbuf != send_data){
// ZOLTAN_FREE(outbuf);
//}
// Did not refactor this - need Teuchos form but not tested code
// MPI_Alltoall(&(sendbuf[0]), 1, MPI_INT, &(recvbuf[0]), 1, MPI_INT,
// getRawComm());

/* WRITE RECEIVED DATA INTO USER'S BUFFER WHERE IT'S EXPECTED */

Expand Down Expand Up @@ -1315,7 +1293,7 @@ int Zoltan2_Directory_Comm::do_reverse(

if (plan_forward->plan_reverse->maxed_recvs) {

throw std::logic_error("UNTESTED COMM 11"); // needs unit testing
throw std::logic_error("Zoltan2_Directory_Comm.coom untested refactored code (7)"); // needs unit testing

/* use MPI_Alltoallv to implement plan->plan_reverse, because comm_do_post
* would post more receives that allowed on this machine
Expand Down Expand Up @@ -1382,7 +1360,6 @@ int Zoltan2_Directory_Comm::create_reverse_plan(
// plan_forward->plan_reverse->request = Teuchos::arcp(new MPI_Request[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
// plan_forward->plan_reverse->status = Teuchos::arcp(new MPI_Status[plan_forward->plan_reverse->nrecvs], 0, plan_forward->plan_reverse->nrecvs, true);
plan_forward->plan_reverse->request.resize(plan_forward->plan_reverse->nrecvs);
plan_forward->plan_reverse->status.resize(plan_forward->plan_reverse->nrecvs);
}

int sum_recv_sizes;
Expand Down Expand Up @@ -1421,8 +1398,7 @@ int Zoltan2_Directory_Comm::resize(
int has_sizes = (sizes.size() != 0);
int var_sizes; /* items have variable sizes? */

// I think we'll need to do this raw, not with Teuchos
MPI_Allreduce(&has_sizes, &var_sizes, 1, MPI_INT, MPI_LOR, getRawComm());
Teuchos::reduceAll(*comm_, Teuchos::REDUCE_BOR, 1, &has_sizes, &var_sizes);

if (var_sizes && plan->indices_from != Teuchos::null) {
// NEW METHOD
Expand Down Expand Up @@ -1659,8 +1635,9 @@ int Zoltan2_Directory_Comm::exchange_sizes(
int self_index_to = -1; /* location of self in procs_to */
for (int i = 0; i < nsends + self_msg; i++) {
if (procs_to[i] != my_proc) {
// TODO: Teuchos::send(...)
MPI_Send((void *) &sizes_to[i], 1, MPI_INT, procs_to[i], tag, getRawComm());
#ifdef HAVE_MPI // Teuchos::send not implemented for Serial - Serial is just for debugging
Teuchos::send(*comm_, 1, &sizes_to[i], procs_to[i]);
#endif
}
else {
self_index_to = i;
Expand All @@ -1671,10 +1648,9 @@ int Zoltan2_Directory_Comm::exchange_sizes(

for (int i = 0; i < nrecvs + self_msg; i++) {
if (procs_from[i] != my_proc) {
MPI_Status status; /* status of commuication operation */
// TODO: Teuchos::receive(...)
MPI_Recv((void *) &(sizes_from[i]), 1, MPI_INT, procs_from[i],
tag, getRawComm(), &status);
#ifdef HAVE_MPI // Teuchos::receive not implemented for Serial - Serial is just for debugging
Teuchos::receive(*comm_, procs_from[i], 1, &sizes_from[i]);
#endif
}
else {
sizes_from[i] = sizes_to[self_index_to];
Expand Down
Loading

0 comments on commit 09c3481

Please sign in to comment.