Skip to content

Commit

Permalink
Merge pull request #2324 from JasonRuonanWang/ssc-async
Browse files Browse the repository at this point in the history
[SSC] Hide data transfer from application activities
  • Loading branch information
JasonRuonanWang authored Jun 16, 2020
2 parents bca1632 + ecf78fb commit 1fbe2cc
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 32 deletions.
45 changes: 28 additions & 17 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ void SscReader::GetOneSidedPostPush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin);
MPI_Win_wait(m_MpiWin);
}

void SscReader::GetOneSidedFencePush()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
MPI_Win_fence(0, m_MpiWin);
}

void SscReader::GetOneSidedPostPull()
Expand All @@ -75,7 +73,6 @@ void SscReader::GetOneSidedPostPull()
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
MPI_Win_complete(m_MpiWin);
}

void SscReader::GetOneSidedFencePull()
Expand All @@ -87,21 +84,17 @@ void SscReader::GetOneSidedFencePull()
MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, i.second.second, MPI_CHAR, m_MpiWin);
}
MPI_Win_fence(0, m_MpiWin);
}

void SscReader::GetTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
std::vector<MPI_Request> requests;
for (const auto &i : m_AllReceivingWriterRanks)
{
requests.emplace_back();
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, m_StreamComm, &requests.back());
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
MPI_Status statuses[requests.size()];
MPI_Waitall(requests.size(), requests.data(), statuses);
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
Expand All @@ -121,23 +114,25 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
++m_CurrentStep;
if (m_MpiMode == "twosided")
{
GetTwoSided();
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
GetOneSidedFencePush();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
GetOneSidedPostPush();
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
GetOneSidedFencePull();
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
GetOneSidedPostPull();
MPI_Win_complete(m_MpiWin);
}
}

Expand Down Expand Up @@ -227,10 +222,26 @@ void SscReader::EndStep()
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_Verbosity >= 5)

if (m_MpiMode == "twosided")
{
std::cout << "SscReader::EndStep, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
GetTwoSided();
}
else if (m_MpiMode == "onesidedfencepush")
{
GetOneSidedFencePush();
}
else if (m_MpiMode == "onesidedpostpush")
{
GetOneSidedPostPush();
}
else if (m_MpiMode == "onesidedfencepull")
{
GetOneSidedFencePull();
}
else if (m_MpiMode == "onesidedpostpull")
{
GetOneSidedPostPull();
}
}

Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class SscReader : public Engine
MPI_Group m_MpiAllWritersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;

int m_StreamRank;
int m_StreamSize;
Expand Down
58 changes: 43 additions & 15 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();

MpiWait();

if (m_InitialStep)
{
m_InitialStep = false;
Expand Down Expand Up @@ -89,7 +91,7 @@ void SscWriter::PutOneSidedPostPush()
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
MPI_Win_complete(m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePush()
Expand All @@ -101,35 +103,33 @@ void SscWriter::PutOneSidedFencePush()
MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first,
i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin);
}
MPI_Win_fence(0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutOneSidedPostPull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
MPI_Win_wait(m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutOneSidedFencePull()
{
TAU_SCOPED_TIMER_FUNC();
MPI_Win_fence(0, m_MpiWin);
MPI_Win_fence(0, m_MpiWin);
m_NeedWait = true;
}

void SscWriter::PutTwoSided()
{
TAU_SCOPED_TIMER_FUNC();
std::vector<MPI_Request> requests;
for (const auto &i : m_AllSendingReaderRanks)
{
requests.emplace_back();
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, 0,
m_StreamComm, &requests.back());
m_StreamComm, &m_MpiRequests.back());
}
MPI_Status statuses[requests.size()];
MPI_Waitall(requests.size(), requests.data(), statuses);
m_NeedWait = true;
}

void SscWriter::EndStep()
Expand All @@ -147,7 +147,8 @@ void SscWriter::EndStep()
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
PutOneSidedPostPull();
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
MPI_Win_wait(m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
Expand Down Expand Up @@ -182,6 +183,36 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

// PRIVATE

void SscWriter::MpiWait()
{
if (m_NeedWait)
{
if (m_MpiMode == "twosided")
{
MPI_Status statuses[m_MpiRequests.size()];
MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
m_NeedWait = false;
}
}

void SscWriter::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
Expand Down Expand Up @@ -380,11 +411,8 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
void SscWriter::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::DoClose, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}

MpiWait();

m_Buffer[0] = 1;

Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class SscWriter : public Engine
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
bool m_NeedWait = false;
std::vector<MPI_Request> m_MpiRequests;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -70,6 +72,7 @@ class SscWriter : public Engine
void PutOneSidedFencePull();
void PutOneSidedPostPull();
void PutTwoSided();
void MpiWait();

#define declare_type(T) \
void DoPutSync(Variable<T> &, const T *) final; \
Expand Down

0 comments on commit 1fbe2cc

Please sign in to comment.