From 19d2c792d3ab1a485f352f29336f93e2a50a5f14 Mon Sep 17 00:00:00 2001 From: Vicente Adolfo Bolea Sanchez Date: Thu, 24 Aug 2023 21:01:28 -0400 Subject: [PATCH] SST,MPI,DP: soft handle peer error --- source/adios2/toolkit/sst/dp/mpi_dp.c | 52 ++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c index e465773463..1b43831092 100644 --- a/source/adios2/toolkit/sst/dp/mpi_dp.c +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -589,7 +589,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v) } else { - Svcs->verbose(Handle->CPStream, DPTraceVerbose, + Svcs->verbose(Handle->CPStream, DPCriticalVerbose, "Remote memory read to rank %d with condition %d has FAILED" "because of " "writer failure\n", @@ -628,7 +628,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v if (!RequestedData) { PERFSTUBS_TIMER_STOP_FUNC(timer); - Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose, + Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose, "Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep); return; } @@ -833,11 +833,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int Fa FailedPeerRank); } +/** MpiDisconnectWriterPerReader. + * + * This is called whenever a reader disconnect from a writer. This function + * simply disconnect the mpi communicator, it does not frees any data + * structure. We must do it in this way since: + * + * - There is the possibility of the failed peer to re-enter in the network. + * - We must disconnect the MPI port for that particular mpi reader task since + * otherwise it the reader task might hung in mpi_finalize, in the case the + * the failure leads to a application graceful exit. + */ +static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) +{ + MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v; + MpiStreamWR StreamWR = StreamWPR->StreamWR; + + const int CohortSize = StreamWPR->Link.CohortSize; + + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize, + StreamWR->Stream.Rank); + + for (int i = 0; i < CohortSize; i++) + { + if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]); + } + } +} + /** * MpiDestroyWriterPerReader. * - * This is called whenever a reader disconnect from a writer. This function - * also removes the StreamWPR from its own StreamWR. + * This is called by the MpiDestroyWriter function. This function will free any resource + * allocated to the particulare WriterPerReader instance (StreamWPR). */ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) { @@ -846,6 +877,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream const int CohortSize = StreamWPR->Link.CohortSize; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize, + StreamWR->Stream.Rank); + for (int i = 0; i < CohortSize; i++) { if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) @@ -871,6 +906,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank); + pthread_mutex_lock(&StreamWR->MutexReaders); while (!TAILQ_EMPTY(&StreamWR->Readers)) { @@ -900,6 +938,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v; + + Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank); + const int CohortSize = StreamRS->Link.CohortSize; for (int i = 0; i < CohortSize; i++) @@ -930,7 +972,7 @@ extern CP_DP_Interface LoadMpiDP() .getPriority = MpiGetPriority, .destroyReader = MpiDestroyReader, .destroyWriter = MpiDestroyWriter, - .destroyWriterPerReader = MpiDestroyWriterPerReader, + .destroyWriterPerReader = MpiDisconnectWriterPerReader, .notifyConnFailure = MpiNotifyConnFailure, };