Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for shared memory provider of libfabric in SST + simple support for manual data progress via threading #3964

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
1099d05
First attempt at shm support
franzpoeschel Nov 30, 2023
86809ee
Keep mr_mode
franzpoeschel Dec 8, 2023
9c4e6e4
Hardcode address length lol
franzpoeschel Dec 8, 2023
7aafa45
Tracing output
franzpoeschel Dec 8, 2023
80904b8
Revert "Hardcode address length lol"
franzpoeschel Dec 8, 2023
1cfb02b
Guard against too small address buffers
franzpoeschel Dec 8, 2023
e6f8179
some stuff
franzpoeschel Dec 8, 2023
674dfe4
Use manual progress via thread
franzpoeschel Dec 8, 2023
15be730
Fix wrong memory access mode?
franzpoeschel Dec 8, 2023
7443356
Tentatively working
franzpoeschel Dec 11, 2023
3f37d28
Fixes
franzpoeschel Dec 11, 2023
37d7337
Bit better backoff for background thread
franzpoeschel Dec 11, 2023
de90225
Remove some debugging output
franzpoeschel Dec 11, 2023
aa6cdee
Revert "Tracing output"
franzpoeschel Dec 11, 2023
75be33d
Cleanup
franzpoeschel Dec 11, 2023
df26306
Do not specify data_progress
franzpoeschel Dec 11, 2023
b284dc7
No longer use FI_MR_BASIC
franzpoeschel Dec 11, 2023
0995c7f
Try enqueuing the fi_read() on the thread
franzpoeschel Dec 11, 2023
5b90dd2
Request n items at once
franzpoeschel Dec 12, 2023
3e14cd3
Revert last two commits
franzpoeschel Dec 12, 2023
97f4d36
Memory management
franzpoeschel Dec 12, 2023
9878ba5
Use blocking fi_cq_sread() in the worker thread
franzpoeschel Dec 14, 2023
594b2e6
Batch processing in fi_cq_sread()
franzpoeschel Dec 15, 2023
fe839f2
Yield to scheduler in busy loop
franzpoeschel Dec 15, 2023
6fc2b2e
Add FABRIC_PROVIDER environment variable
franzpoeschel Dec 15, 2023
e347a5e
Use infinite timeout together with signal
franzpoeschel Jan 22, 2024
009972c
Verbose logging
franzpoeschel Jan 17, 2024
3fc8de4
Revert "Verbose logging"
franzpoeschel Jan 22, 2024
cb7808f
Use progress thread on writer side
franzpoeschel Jan 22, 2024
5fd6470
Add missing key destroy
franzpoeschel Jan 23, 2024
dfc9437
Replace sleeping with condition variables
franzpoeschel Jan 23, 2024
756bb2e
Remove sched_yield() logic
franzpoeschel Mar 7, 2024
01fc7a1
Enqueue remote reads in batches
franzpoeschel Mar 7, 2024
a17ab8d
Tmp: Ignore unreachable endpoints in UCX
franzpoeschel Apr 4, 2024
c57a7b2
Only use thread in libfabric DP when needed
franzpoeschel Jul 22, 2024
dbb1ace
Parameterize progress thread in UCX
franzpoeschel Jul 23, 2024
b98059a
Request multithreading support from libfabric
franzpoeschel Jul 23, 2024
3213575
Use progress thread only on writer side in libfabric
franzpoeschel Jul 23, 2024
5b05c2e
More seamless batching for BP5
franzpoeschel Jul 23, 2024
1d37205
Same for BP
franzpoeschel Jul 23, 2024
644915f
Some comments
franzpoeschel Jul 23, 2024
7efc224
Make this configurable via environment variable in libfabric
franzpoeschel Jul 24, 2024
3de00c0
Some cleanup in UCX
franzpoeschel Jul 24, 2024
dc83736
Some error resistance in CXI key retrieval
franzpoeschel Jul 25, 2024
5ee3ec7
Some final fixes
Jul 26, 2024
c5ba305
Add an inline comment
franzpoeschel Jul 29, 2024
4b63824
Enable manual progress without thread on reader side
franzpoeschel Jul 29, 2024
5a89794
Merge branch 'master' into shm
eisenhauer Aug 11, 2024
6498e1e
Format
eisenhauer Aug 11, 2024
7f17c0c
warning
eisenhauer Aug 11, 2024
ba7d6c1
Fix env var handling for CXI key retrieval
franzpoeschel Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 86 additions & 13 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,29 +669,70 @@ bool SstReader::VariableMinMax(const VariableBase &Var, const size_t Step, MinMa
return m_BP5Deserializer->VariableMinMax(Var, Step, MinMax);
}

void *SstReader::performDeferredReadRemoteMemory(DeferredReadRemoteMemory const &params)
{
return SstReadRemoteMemory(m_Input, (int)params.rank, CurrentStep(), params.payloadStart,
params.payloadSize, params.buffer, params.dp_info);
}

constexpr static size_t BATCH_SIZE = 10;

void SstReader::BP5PerformGets()
{
size_t maxReadSize;
auto ReadRequests = m_BP5Deserializer->GenerateReadRequests(true, &maxReadSize);
std::vector<void *> sstReadHandlers;
for (const auto &Req : ReadRequests)
{
std::vector<void *> nextSstReadHandlers;
sstReadHandlers.reserve(BATCH_SIZE);
nextSstReadHandlers.reserve(BATCH_SIZE);

auto iterator = ReadRequests.cbegin();
auto end = ReadRequests.cend();

auto enqueue_next = [&](std::vector<void *>& sstReadHandlers_lambda) {
if (iterator == end)
{
return false;
}
auto const &Req = *iterator;

void *dp_info = NULL;
if (m_CurrentStepMetaData->DP_TimestepInfo)
{
dp_info = m_CurrentStepMetaData->DP_TimestepInfo[Req.WriterRank];
}
auto ret = SstReadRemoteMemory(m_Input, (int)Req.WriterRank, Req.Timestep, Req.StartOffset,
Req.ReadLength, Req.DestinationAddr, dp_info);
sstReadHandlers.push_back(ret);
sstReadHandlers_lambda.push_back(ret);
++iterator;
return true;
};

// Initiate request queue with first BATCH_SIZE requests
for (size_t i = 0; i < BATCH_SIZE; ++i)
{
if (!enqueue_next(sstReadHandlers))
{
break;
}
}
for (const auto &i : sstReadHandlers)

// Drain current request queue
// For each fulfilled request, enqueue the next into the next queue
// poor man's asynchrony
while (!sstReadHandlers.empty())
{
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
nextSstReadHandlers.clear();
for (const auto &i : sstReadHandlers)
{
helper::Throw<std::runtime_error>("Engine", "SstReader", "BP5PerformGets",
"Writer failed before returning data");
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
{
helper::Throw<std::runtime_error>("Engine", "SstReader", "BP5PerformGets",
"Writer failed before returning data");
}
enqueue_next(nextSstReadHandlers);
}
sstReadHandlers.swap(nextSstReadHandlers);
}

m_BP5Deserializer->FinalizeGets(ReadRequests);
Expand All @@ -710,7 +751,7 @@ void SstReader::PerformGets()
}
else if (m_WriterMarshalMethod == SstMarshalBP)
{
std::vector<void *> sstReadHandlers;
std::vector<DeferredReadRemoteMemory> sstReadHandlers;
std::vector<std::vector<char>> buffers;
size_t iter = 0;

Expand Down Expand Up @@ -739,14 +780,46 @@ void SstReader::PerformGets()
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
}
// wait for all SstRead requests to finish
for (const auto &i : sstReadHandlers)
// run read requests in batches and wait for them to finish
auto iterator = sstReadHandlers.cbegin();
auto end = sstReadHandlers.cend();
std::vector<void *> enqueuedHandlers;
std::vector<void *> nextEnqueuedHandlers;
enqueuedHandlers.reserve(BATCH_SIZE);
nextEnqueuedHandlers.reserve(BATCH_SIZE);

auto enqueue_next = [&](std::vector<void *> &enqueuedHandlers) {
if (iterator == end)
{
return false;
}
enqueuedHandlers.push_back(performDeferredReadRemoteMemory(*iterator));
++iterator;
return true;
};

// Initiate request queue with first BATCH_SIZE requests
for (size_t i = 0; i < BATCH_SIZE; ++i)
{
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
if (!enqueue_next(enqueuedHandlers))
{
helper::Throw<std::runtime_error>("Engine", "SstReader", "PerformGets",
"Writer failed before returning data");
break;
}
}

while (!enqueuedHandlers.empty())
{
nextEnqueuedHandlers.clear();
for (const auto &i : enqueuedHandlers)
{
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
{
helper::Throw<std::runtime_error>("Engine", "SstReader", "PerformGets",
"Writer failed before returning data");
}
enqueue_next(nextEnqueuedHandlers);
}
enqueuedHandlers.swap(nextEnqueuedHandlers);
}

for (const std::string &name : m_BP3Deserializer->m_DeferredVariables)
Expand Down
13 changes: 12 additions & 1 deletion source/adios2/engine/sst/SstReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,19 @@ class SstReader : public Engine
bool VariableMinMax(const VariableBase &, const size_t Step, MinMaxStruct &MinMax);

private:
struct DeferredReadRemoteMemory
{
size_t rank;
size_t payloadStart;
size_t payloadSize;
char *buffer;
void *dp_info;
};
void * performDeferredReadRemoteMemory(DeferredReadRemoteMemory const &);

template <class T>
void ReadVariableBlocksRequests(Variable<T> &variable, std::vector<void *> &sstReadHandlers,
void ReadVariableBlocksRequests(Variable<T> &variable,
std::vector<DeferredReadRemoteMemory> &sstReadHandlers,
std::vector<std::vector<char>> &buffers);

template <class T>
Expand Down
21 changes: 9 additions & 12 deletions source/adios2/engine/sst/SstReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace engine

template <class T>
void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
std::vector<void *> &sstReadHandlers,
std::vector<DeferredReadRemoteMemory> &sstReadHandlers,
std::vector<std::vector<char>> &buffers)
{
PERFSTUBS_SCOPED_TIMER_FUNC();
Expand Down Expand Up @@ -65,9 +65,8 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
std::stringstream ss;
ss << "SST Bytes Read from remote rank " << rank;
PERFSTUBS_SAMPLE_COUNTER(ss.str().c_str(), payloadSize);
auto ret = SstReadRemoteMemory(m_Input, (int)rank, CurrentStep(), payloadStart,
payloadSize, buffer, dp_info);
sstReadHandlers.push_back(ret);
sstReadHandlers.push_back(
DeferredReadRemoteMemory{rank, payloadStart, payloadSize, buffer, dp_info});
}
// if remote data buffer is not compressed
else
Expand All @@ -87,10 +86,9 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
subStreamInfo.IntersectionBox, m_BP3Deserializer->m_IsRowMajor,
elementOffset))
{
auto ret = SstReadRemoteMemory(m_Input, (int)rank, CurrentStep(),
writerBlockStart, writerBlockSize,
blockInfo.Data + elementOffset, dp_info);
sstReadHandlers.push_back(ret);
sstReadHandlers.push_back(DeferredReadRemoteMemory{
rank, writerBlockStart, writerBlockSize,
reinterpret_cast<char *>(blockInfo.Data + elementOffset), dp_info});
}
// if either input or output is not contiguous memory then
// find all contiguous parts.
Expand All @@ -99,10 +97,9 @@ void SstReader::ReadVariableBlocksRequests(Variable<T> &variable,
// batch all read requests
buffers.emplace_back();
buffers.back().resize(writerBlockSize);
auto ret =
SstReadRemoteMemory(m_Input, (int)rank, CurrentStep(), writerBlockStart,
writerBlockSize, buffers.back().data(), dp_info);
sstReadHandlers.push_back(ret);
sstReadHandlers.push_back(
DeferredReadRemoteMemory{rank, writerBlockStart, writerBlockSize,
buffers.back().data(), dp_info});
}
}
++threadID;
Expand Down
Loading