diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index d7e7e7aa06..4a5d55c94b 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -1495,14 +1495,6 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v) "Delayed task Moving Reader stream %p to status %s\n", CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]); CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed); - - if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == 0 && - CP_WSR_Stream->DP_WSR_Stream) - { - CP_WSR_Stream->ParentStream->DP_Interface->destroyWriterPerReader( - &Svcs, CP_WSR_Stream->DP_WSR_Stream); - CP_WSR_Stream->DP_WSR_Stream = NULL; - } STREAM_MUTEX_UNLOCK(ParentStream); } @@ -1554,18 +1546,6 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream, CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0, CloseWSRStream, CP_WSR_Stream)); } - else - { - if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == - 0 && - CP_WSR_Stream->DP_WSR_Stream) - { - CP_WSR_Stream->ParentStream->DP_Interface - ->destroyWriterPerReader(&Svcs, - CP_WSR_Stream->DP_WSR_Stream); - CP_WSR_Stream->DP_WSR_Stream = NULL; - } - } } CP_verbose(ParentStream, PerStepVerbose, "Moving Reader stream %p to status %s\n", CP_WSR_Stream, diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c index 7604adf59e..e6b89e21a3 100644 --- a/source/adios2/toolkit/sst/dp/mpi_dp.c +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -243,6 +244,11 @@ static FMStructDescRec MpiWriterContactStructs[] = { /*****Internal functions*****************************************************/ +static uint64_t GetUniqueTaskId() +{ + return ((uint32_t)getpid() * (1ll<<32ll)) | (uint32_t)gethostid(); +} + static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); @@ -309,7 +315,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, CMFormat F; Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.PID = GetUniqueTaskId(); Stream->Link.Stats = Stats; SMPI_Comm_rank(comm, &Stream->Stream.Rank); @@ -362,7 +368,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, SMPI_Comm_rank(comm, &Stream->Stream.Rank); Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.PID = GetUniqueTaskId(); STAILQ_INIT(&Stream->TimeSteps); TAILQ_INIT(&Stream->Readers); @@ -420,6 +426,8 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, StreamWPR->CohortMpiComms[i] = MPI_COMM_NULL; } + printf("WPR init called %d (rank: %d)\n", readerCohortSize, StreamWR->Stream.Rank); + pthread_mutex_lock(&StreamWR->MutexReaders); TAILQ_INSERT_TAIL(&StreamWR->Readers, StreamWPR, entries); pthread_mutex_unlock(&StreamWR->MutexReaders); @@ -890,6 +898,33 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, const int CohortSize = StreamWPR->Link.CohortSize; + printf("WPR soft Destroy called %d (rank: %d)\n", CohortSize, StreamWR->Stream.Rank); + + for (int i = 0; i < CohortSize; i++) + { + if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]); + } + } +} + +/** + * MpiDestroyWriterPerReader. + * + * This is called whenever a reader disconnect from a writer. This function + * also removes the StreamWPR from its own StreamWR. + */ +static void MpiFullyDestroyWriterPerReader(CP_Services Svcs, + DP_WSR_Stream WSR_Stream_v) +{ + MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v; + MpiStreamWR StreamWR = StreamWPR->StreamWR; + + const int CohortSize = StreamWPR->Link.CohortSize; + + printf("WPR Destroy called %d (rank: %d)\n", CohortSize, StreamWR->Stream.Rank); + for (int i = 0; i < CohortSize; i++) { if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) @@ -908,6 +943,7 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, free(StreamWPR); } + /** * MpiDestroyWriter */ @@ -919,7 +955,7 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) while (!TAILQ_EMPTY(&StreamWR->Readers)) { MpiStreamWPR Stream = TAILQ_FIRST(&StreamWR->Readers); - MpiDestroyWriterPerReader(Svcs, Stream); + MpiFullyDestroyWriterPerReader(Svcs, Stream); } pthread_mutex_unlock(&StreamWR->MutexReaders);