Skip to content

Commit

Permalink
SST,MPI,DP: soft handle peer error
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed Sep 4, 2023
1 parent 89b82b4 commit 458bcfa
Showing 1 changed file with 49 additions and 7 deletions.
56 changes: 49 additions & 7 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
}
else
{
Svcs->verbose(Handle->CPStream, DPTraceVerbose,
Svcs->verbose(Handle->CPStream, DPCriticalVerbose,
"Remote memory read to rank %d with condition %d has FAILED"
"because of "
"writer failure\n",
Expand Down Expand Up @@ -628,7 +628,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v
if (!RequestedData)
{
PERFSTUBS_TIMER_STOP_FUNC(timer);
Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
"Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep);
return;
}
Expand Down Expand Up @@ -833,19 +833,54 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int Fa
FailedPeerRank);
}

/** MpiDisconnectWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* simply disconnect the mpi communicator, it does not frees any data
* structure. We must do it in this way since:
*
* - There is the possibility of the failed peer to re-enter in the network.
* - We must disconnect the MPI port for that particular mpi reader task since
* otherwise it the reader task might hung in mpi_finalize, in the case the
* the failure leads to a application graceful exit.
*/
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
}
}

/**
* MpiDestroyWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* also removes the StreamWPR from its own StreamWR.
* This is called by the MpiDestroyWriter function. This function will free any resource
* allocated to the particulare WriterPerReader instance (StreamWPR).
*/
static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
static void MpiFullyDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiFullyDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
Expand All @@ -871,11 +906,14 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);

pthread_mutex_lock(&StreamWR->MutexReaders);
while (!TAILQ_EMPTY(&StreamWR->Readers))
{
MpiStreamWPR Stream = TAILQ_FIRST(&StreamWR->Readers);
MpiDestroyWriterPerReader(Svcs, Stream);
MpiFullyDestroyWriterPerReader(Svcs, Stream);
}
pthread_mutex_unlock(&StreamWR->MutexReaders);

Expand All @@ -900,6 +938,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;

Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);

const int CohortSize = StreamRS->Link.CohortSize;

for (int i = 0; i < CohortSize; i++)
Expand Down Expand Up @@ -930,7 +972,7 @@ extern CP_DP_Interface LoadMpiDP()
.getPriority = MpiGetPriority,
.destroyReader = MpiDestroyReader,
.destroyWriter = MpiDestroyWriter,
.destroyWriterPerReader = MpiDestroyWriterPerReader,
.destroyWriterPerReader = MpiDisconnectWriterPerReader,
.notifyConnFailure = MpiNotifyConnFailure,
};

Expand Down

0 comments on commit 458bcfa

Please sign in to comment.