Skip to content

Commit

Permalink
wipwipwip
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentebolea committed Apr 14, 2023
1 parent 240690a commit f6b1907
Showing 1 changed file with 12 additions and 26 deletions.
38 changes: 12 additions & 26 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#define MPI_DP_CONTACT_STRING_LEN 64
#define QUOTE(name) #name
#define MACRO_TO_STR(name) QUOTE(name)
#define MPI_MAX_COHORT_SIZE 1024

static pthread_once_t OnceMpiInitializer = PTHREAD_ONCE_INIT;

Expand Down Expand Up @@ -107,6 +108,7 @@ typedef struct _MpiStreamWR
TAILQ_HEAD(ReadersListHead, _MpiStreamWPR) Readers;
pthread_rwlock_t LockTS;
pthread_mutex_t MutexReaders;
MPI_Comm *CohortMpiComms;
} * MpiStreamWR;

/**
Expand All @@ -121,8 +123,7 @@ typedef struct _MpiStreamWPR
struct _MpiStreamWR *StreamWR;

struct _MpiWriterContactInfo MyContactInfo;
struct _MpiReaderContactInfo *CohortReaderInfo;
MPI_Comm *CohortMpiComms;
MPI_Comm CohortMpiComms;
char MpiPortName[MPI_MAX_PORT_NAME];

TAILQ_ENTRY(_MpiStreamWPR) entries;
Expand Down Expand Up @@ -363,6 +364,8 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->CohortMpiComms = malloc(sizeof(MPI_Comm) * MPI_MAX_COHORT_SIZE);
memset(Stream->CohortMpiComms, MPI_COMM_NULL, MPI_MAX_COHORT_SIZE);
STAILQ_INIT(&Stream->TimeSteps);
TAILQ_INIT(&Stream->Readers);

Expand Down Expand Up @@ -397,29 +400,17 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
{
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;
MpiStreamWPR StreamWPR = calloc(sizeof(struct _MpiStreamWPR), 1);
MpiReaderContactInfo *providedReaderInfo =
(MpiReaderContactInfo *)providedReaderInfo_v;

MPI_Open_port(MPI_INFO_NULL, StreamWPR->MpiPortName);

StreamWPR->StreamWR = StreamWR; /* pointer to writer struct */
StreamWPR->Link.PeerCohort = PeerCohort;
StreamWPR->Link.CohortSize = readerCohortSize;
StreamWPR->CohortMpiComms = MPI_COMM_NULL;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MPI dataplane WriterPerReader to be initialized\n");

/* * Copy of writer contact information (original will not be preserved) */
StreamWPR->CohortReaderInfo =
malloc(sizeof(struct _MpiReaderContactInfo) * readerCohortSize);
StreamWPR->CohortMpiComms = malloc(sizeof(MPI_Comm) * readerCohortSize);
for (int i = 0; i < readerCohortSize; i++)
{
memcpy(&StreamWPR->CohortReaderInfo[i], providedReaderInfo[i],
sizeof(struct _MpiReaderContactInfo));
StreamWPR->CohortMpiComms[i] = MPI_COMM_NULL;
}

pthread_mutex_lock(&StreamWR->MutexReaders);
TAILQ_INSERT_TAIL(&StreamWR->Readers, StreamWPR, entries);
pthread_mutex_unlock(&StreamWR->MutexReaders);
Expand Down Expand Up @@ -462,6 +453,7 @@ static void MpiProvideWriterDataToReader(CP_Services Svcs,
/* * Copy of writer contact information (original will not be preserved) */
StreamRS->CohortWriterInfo =
malloc(sizeof(struct _MpiWriterContactInfo) * writerCohortSize);

StreamRS->CohortMpiComms = malloc(sizeof(MPI_Comm) * writerCohortSize);
for (int i = 0; i < writerCohortSize; i++)
{
Expand Down Expand Up @@ -679,7 +671,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
&ReadReplyMsg);

// Send the actual Data using MPI
MPI_Comm *comm = &StreamWPR->CohortMpiComms[ReadRequestMsg->RequestingRank];
MPI_Comm *comm = &StreamWR->CohortMpiComms[ReadRequestMsg->RequestingRank];
MPI_Errhandler worldErrHandler;
MPI_Comm_get_errhandler(MPI_COMM_WORLD, &worldErrHandler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
Expand Down Expand Up @@ -888,19 +880,12 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

for (int i = 0; i < CohortSize; i++)
if (StreamWPR->CohortMpiComms != MPI_COMM_NULL)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms);
}
MPI_Close_port(StreamWPR->MpiPortName);

free(StreamWPR->CohortReaderInfo);
free(StreamWPR->CohortMpiComms);
MPI_Close_port(StreamWPR->MpiPortName);

pthread_mutex_lock(&StreamWR->MutexReaders);
TAILQ_REMOVE(&StreamWR->Readers, StreamWPR, entries);
Expand Down Expand Up @@ -935,6 +920,7 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)

pthread_mutex_destroy(&StreamWR->MutexReaders);
pthread_rwlock_destroy(&StreamWR->LockTS);
free(StreamWR->CohortMpiComms);
free(StreamWR);
}

Expand Down

0 comments on commit f6b1907

Please sign in to comment.