From 853ff0d1c92dc984fd571071282896f04d2c4844 Mon Sep 17 00:00:00 2001 From: Franz Poeschel Date: Tue, 17 Oct 2023 14:55:13 +0200 Subject: [PATCH] Workaround for GatherV >= 2Gb --- source/adios2/toolkit/sst/cp/cp_common.c | 83 +++++++++++++++++++++--- 1 file changed, 75 insertions(+), 8 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index f37124f202..f3d1a11e8a 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -703,12 +703,16 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo, FFSTypeHa * and displacements for each rank */ +// How many ranks participate in one MPI_Gatherv call +// 1000 nodes on Frontier +#define ADIOS2_BATCH_SIZE 8000 + size_t *Displs = NULL; char *RecvBuffer = NULL; if (Stream->Rank == 0) { - int TotalLen = 0; + size_t TotalLen = 0; Displs = malloc(Stream->CohortSize * sizeof(*Displs)); Displs[0] = 0; @@ -718,19 +722,82 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo, FFSTypeHa { int RoundUp = (RecvCounts[i] + 7) & ~7; Displs[i] = TotalLen; - TotalLen += RoundUp; + TotalLen += (size_t)RoundUp; } RecvBuffer = malloc(TotalLen * sizeof(char)); } - /* - * Now we have the receive buffer, counts, and displacements, and - * can gather the data - */ + // MPI_Comm_Split would be simpler but source/adios2/toolkit/sst/sst_comm.h doesnt have it and + // this is a workaround only anyway + + size_t *current_RecvCounts = NULL; + size_t *current_Displs = NULL; + size_t currentOffset = 0; + if (Stream->Rank == 0) + { + current_Displs = malloc(Stream->CohortSize * sizeof(*Displs)); + current_RecvCounts = malloc(Stream->CohortSize * sizeof(*RecvCounts)); + } + + for (size_t current_batch = 0; current_batch * ADIOS2_BATCH_SIZE < Stream->CohortSize; + ++current_batch) + { + size_t current_batch_lower_bound = current_batch * ADIOS2_BATCH_SIZE; + size_t current_batch_upper_bound = (current_batch + 1) * ADIOS2_BATCH_SIZE; + if (current_batch_upper_bound > Stream->CohortSize) + { + current_batch_upper_bound = Stream->CohortSize; + } + size_t current_batch_size = current_batch_upper_bound - current_batch_lower_bound; + + void *current_RecvBuffer = NULL; + + size_t TotalLen = 0; + if (Stream->Rank == 0) + { + memset(current_Displs, 0, Stream->CohortSize * sizeof(*Displs)); + memset(current_RecvCounts, 0, Stream->CohortSize * sizeof(*RecvCounts)); + + memcpy(current_RecvCounts + ADIOS2_BATCH_SIZE * current_batch, + RecvCounts + ADIOS2_BATCH_SIZE * current_batch, + current_batch_size * sizeof(*RecvCounts)); + + for (size_t i = current_batch_lower_bound; i < current_batch_upper_bound; i++) + { + int RoundUp = (RecvCounts[i] + 7) & ~7; + current_Displs[i] = TotalLen; + TotalLen += (size_t)RoundUp; + } + // All other values in current_Displs are now 0 + // which doesnt matter since the current_RecvCounts are 0 there too + } + + size_t current_DataSize = DataSize; + if (Stream->Rank < current_batch_lower_bound || Stream->Rank >= current_batch_upper_bound) + { + current_DataSize = 0; + } + + /* + * Now we have the receive buffer, counts, and displacements, and + * can gather the data + */ + // if (Stream->Rank == 0) + // { + // printf("GatherV into offset %ld\n", currentOffset); + // } + SMPI_Gatherv(Buffer, current_DataSize, SMPI_CHAR, RecvBuffer + currentOffset, + current_RecvCounts, current_Displs, SMPI_CHAR, 0, Stream->mpiComm); + currentOffset += TotalLen; + } + + if (Stream->Rank == 0) + { + free(current_Displs); + free(current_RecvCounts); + } - SMPI_Gatherv(Buffer, DataSize, SMPI_CHAR, RecvBuffer, RecvCounts, Displs, SMPI_CHAR, 0, - Stream->mpiComm); free_FFSBuffer(Buf); if (Stream->Rank == 0)