Skip to content

Commit

Permalink
wipwipwip
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed Jul 18, 2023
1 parent 9b279b6 commit 8dd19d4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 23 deletions.
20 changes: 0 additions & 20 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1495,14 +1495,6 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v)
"Delayed task Moving Reader stream %p to status %s\n",
CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]);
CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed);

if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) == 0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface->destroyWriterPerReader(
&Svcs, CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
STREAM_MUTEX_UNLOCK(ParentStream);
}

Expand Down Expand Up @@ -1554,18 +1546,6 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
CMfree(CMadd_delayed_task(ParentStream->CPInfo->SharedCM->cm, 2, 0,
CloseWSRStream, CP_WSR_Stream));
}
else
{
if (strncmp("mpi", ParentStream->ConfigParams->DataTransport, 3) ==
0 &&
CP_WSR_Stream->DP_WSR_Stream)
{
CP_WSR_Stream->ParentStream->DP_Interface
->destroyWriterPerReader(&Svcs,
CP_WSR_Stream->DP_WSR_Stream);
CP_WSR_Stream->DP_WSR_Stream = NULL;
}
}
}
CP_verbose(ParentStream, PerStepVerbose,
"Moving Reader stream %p to status %s\n", CP_WSR_Stream,
Expand Down
42 changes: 39 additions & 3 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>

#include <pthread.h>
Expand Down Expand Up @@ -243,6 +244,11 @@ static FMStructDescRec MpiWriterContactStructs[] = {

/*****Internal functions*****************************************************/

static uint64_t GetUniqueTaskId()
{
return ((uint32_t)getpid() * (1ll<<32ll)) | (uint32_t)gethostid();
}

static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs);

Expand Down Expand Up @@ -309,7 +315,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
CMFormat F;

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.PID = GetUniqueTaskId();
Stream->Link.Stats = Stats;

SMPI_Comm_rank(comm, &Stream->Stream.Rank);
Expand Down Expand Up @@ -362,7 +368,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
SMPI_Comm_rank(comm, &Stream->Stream.Rank);

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.PID = GetUniqueTaskId();
STAILQ_INIT(&Stream->TimeSteps);
TAILQ_INIT(&Stream->Readers);

Expand Down Expand Up @@ -420,6 +426,8 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
StreamWPR->CohortMpiComms[i] = MPI_COMM_NULL;
}

printf("WPR init called %d (rank: %d)\n", readerCohortSize, StreamWR->Stream.Rank);

pthread_mutex_lock(&StreamWR->MutexReaders);
TAILQ_INSERT_TAIL(&StreamWR->Readers, StreamWPR, entries);
pthread_mutex_unlock(&StreamWR->MutexReaders);
Expand Down Expand Up @@ -890,6 +898,33 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,

const int CohortSize = StreamWPR->Link.CohortSize;

printf("WPR soft Destroy called %d (rank: %d)\n", CohortSize, StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
}
}

/**
* MpiDestroyWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* also removes the StreamWPR from its own StreamWR.
*/
static void MpiFullyDestroyWriterPerReader(CP_Services Svcs,
DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

printf("WPR Destroy called %d (rank: %d)\n", CohortSize, StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
Expand All @@ -908,6 +943,7 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,
free(StreamWPR);
}


/**
* MpiDestroyWriter
*/
Expand All @@ -919,7 +955,7 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
while (!TAILQ_EMPTY(&StreamWR->Readers))
{
MpiStreamWPR Stream = TAILQ_FIRST(&StreamWR->Readers);
MpiDestroyWriterPerReader(Svcs, Stream);
MpiFullyDestroyWriterPerReader(Svcs, Stream);
}
pthread_mutex_unlock(&StreamWR->MutexReaders);

Expand Down

0 comments on commit 8dd19d4

Please sign in to comment.