Skip to content

Commit

Permalink
Workaround for GatherV >= 2Gb
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Oct 17, 2023
1 parent 107164d commit 853ff0d
Showing 1 changed file with 75 additions and 8 deletions.
83 changes: 75 additions & 8 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,16 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo, FFSTypeHa
* and displacements for each rank
*/

// How many ranks participate in one MPI_Gatherv call
// 1000 nodes on Frontier
#define ADIOS2_BATCH_SIZE 8000

size_t *Displs = NULL;
char *RecvBuffer = NULL;

if (Stream->Rank == 0)
{
int TotalLen = 0;
size_t TotalLen = 0;
Displs = malloc(Stream->CohortSize * sizeof(*Displs));

Displs[0] = 0;
Expand All @@ -718,19 +722,82 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo, FFSTypeHa
{
int RoundUp = (RecvCounts[i] + 7) & ~7;
Displs[i] = TotalLen;
TotalLen += RoundUp;
TotalLen += (size_t)RoundUp;
}

RecvBuffer = malloc(TotalLen * sizeof(char));
}

/*
* Now we have the receive buffer, counts, and displacements, and
* can gather the data
*/
// MPI_Comm_Split would be simpler but source/adios2/toolkit/sst/sst_comm.h doesnt have it and
// this is a workaround only anyway

size_t *current_RecvCounts = NULL;
size_t *current_Displs = NULL;
size_t currentOffset = 0;
if (Stream->Rank == 0)
{
current_Displs = malloc(Stream->CohortSize * sizeof(*Displs));
current_RecvCounts = malloc(Stream->CohortSize * sizeof(*RecvCounts));
}

for (size_t current_batch = 0; current_batch * ADIOS2_BATCH_SIZE < Stream->CohortSize;
++current_batch)
{
size_t current_batch_lower_bound = current_batch * ADIOS2_BATCH_SIZE;
size_t current_batch_upper_bound = (current_batch + 1) * ADIOS2_BATCH_SIZE;
if (current_batch_upper_bound > Stream->CohortSize)
{
current_batch_upper_bound = Stream->CohortSize;
}
size_t current_batch_size = current_batch_upper_bound - current_batch_lower_bound;

void *current_RecvBuffer = NULL;

size_t TotalLen = 0;
if (Stream->Rank == 0)
{
memset(current_Displs, 0, Stream->CohortSize * sizeof(*Displs));
memset(current_RecvCounts, 0, Stream->CohortSize * sizeof(*RecvCounts));

memcpy(current_RecvCounts + ADIOS2_BATCH_SIZE * current_batch,
RecvCounts + ADIOS2_BATCH_SIZE * current_batch,
current_batch_size * sizeof(*RecvCounts));

for (size_t i = current_batch_lower_bound; i < current_batch_upper_bound; i++)
{
int RoundUp = (RecvCounts[i] + 7) & ~7;
current_Displs[i] = TotalLen;
TotalLen += (size_t)RoundUp;
}
// All other values in current_Displs are now 0
// which doesnt matter since the current_RecvCounts are 0 there too
}

size_t current_DataSize = DataSize;
if (Stream->Rank < current_batch_lower_bound || Stream->Rank >= current_batch_upper_bound)
{
current_DataSize = 0;
}

/*
* Now we have the receive buffer, counts, and displacements, and
* can gather the data
*/
// if (Stream->Rank == 0)
// {
// printf("GatherV into offset %ld\n", currentOffset);
// }
SMPI_Gatherv(Buffer, current_DataSize, SMPI_CHAR, RecvBuffer + currentOffset,
current_RecvCounts, current_Displs, SMPI_CHAR, 0, Stream->mpiComm);
currentOffset += TotalLen;
}

if (Stream->Rank == 0)
{
free(current_Displs);
free(current_RecvCounts);
}

SMPI_Gatherv(Buffer, DataSize, SMPI_CHAR, RecvBuffer, RecvCounts, Displs, SMPI_CHAR, 0,
Stream->mpiComm);
free_FFSBuffer(Buf);

if (Stream->Rank == 0)
Expand Down

0 comments on commit 853ff0d

Please sign in to comment.