Skip to content

Commit

Permalink
Merge pull request #3055 from pnorbert/bp5-AggregatorRatio
Browse files Browse the repository at this point in the history
Added AggregatioRatio option for BP5. Added SetWithinLimits(value, mi…
  • Loading branch information
pnorbert authored Feb 15, 2022
2 parents 35becfd + 1a05ff0 commit 89165bd
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 25 deletions.
4 changes: 1 addition & 3 deletions examples/basics/globalArray/globalArray_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ int main(int argc, char *argv[])
adios2::IO io = adios.DeclareIO("Output");
io.SetEngine("BP5");
io.SetParameter("AggregationType", "TwoLevelShm");
io.SetParameter("NumAggregators", "1");
io.SetParameter("NumSubFiles", "1");
io.SetParameter("AsyncWrite", "false");
io.SetParameter("AggregatorRatio", "4");

/*
* Define global array: type, name, global dimensions
Expand Down
1 change: 1 addition & 0 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class BP5Engine
MACRO(verbose, Int, int, 0) \
MACRO(CollectiveMetadata, Bool, bool, true) \
MACRO(NumAggregators, UInt, unsigned int, 0) \
MACRO(AggregatorRatio, UInt, unsigned int, 0) \
MACRO(NumSubFiles, UInt, unsigned int, 999999) \
MACRO(StripeSize, UInt, unsigned int, 4096) \
MACRO(DirectIO, Bool, bool, false) \
Expand Down
55 changes: 33 additions & 22 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/helper/adiosMath.h" // SetWithinLimit
#include "adios2/toolkit/format/buffer/chunk/ChunkV.h"
#include "adios2/toolkit/format/buffer/malloc/MallocV.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
Expand Down Expand Up @@ -581,27 +582,37 @@ void BP5Writer::InitParameters()
m_WriteToBB = !(m_Parameters.BurstBufferPath.empty());
m_DrainBB = m_WriteToBB && m_Parameters.BurstBufferDrain;

if (m_Parameters.NumAggregators > static_cast<unsigned int>(m_Comm.Size()))
unsigned int nproc = (unsigned int)m_Comm.Size();
m_Parameters.NumAggregators =
helper::SetWithinLimit(m_Parameters.NumAggregators, 0U, nproc);
m_Parameters.NumSubFiles =
helper::SetWithinLimit(m_Parameters.NumSubFiles, 0U, nproc);
m_Parameters.AggregatorRatio =
helper::SetWithinLimit(m_Parameters.AggregatorRatio, 0U, nproc);
if (m_Parameters.NumAggregators == 0)
{
m_Parameters.NumAggregators = static_cast<unsigned int>(m_Comm.Size());
}

if (m_Parameters.NumSubFiles > m_Parameters.NumAggregators)
{
m_Parameters.NumSubFiles = m_Parameters.NumAggregators;
if (m_Parameters.AggregatorRatio > 0)
{
m_Parameters.NumAggregators = helper::SetWithinLimit(
nproc / m_Parameters.AggregatorRatio, 0U, nproc);
}
else if (m_Parameters.NumSubFiles > 0)
{
m_Parameters.NumAggregators =
helper::SetWithinLimit(m_Parameters.NumSubFiles, 0U, nproc);
}
}
m_Parameters.NumSubFiles = helper::SetWithinLimit(
m_Parameters.NumSubFiles, 0U, m_Parameters.NumAggregators);

// Limiting to max 64MB page size
m_Parameters.StripeSize =
helper::SetWithinLimit(m_Parameters.StripeSize, 0U, 67108864U);
if (m_Parameters.StripeSize == 0)
{
m_Parameters.StripeSize = 4096;
}

if (m_Parameters.StripeSize > 67108864)
{
// Limiting to max 64MB page size
m_Parameters.StripeSize = 67108864;
}

if (m_Parameters.DirectIO)
{
if (m_Parameters.DirectIOAlignBuffer == 0)
Expand Down Expand Up @@ -799,8 +810,8 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)

if (currentStep == targetStep)
{
// we need the very first (smallest) write position to each subfile
// Offsets and sizes, 2*FlushCount + 1 per writer
// we need the very first (smallest) write position to each
// subfile Offsets and sizes, 2*FlushCount + 1 per writer
for (uint64_t i = 0; i < m_AppendWriterCount; i++)
{
// first flush/write position will do
Expand All @@ -810,8 +821,8 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
position +=
sizeof(uint64_t) * 2 * FlushCount; // no need to read
/* std::cout << "Writer " << i << " subfile " <<
writerToFileMap[i] << " first data loc:" << FirstDataPos <<
std::endl; */
writerToFileMap[i] << " first data loc:" <<
FirstDataPos << std::endl; */
if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]])
{
m_AppendDataPos[writerToFileMap[i]] = FirstDataPos;
Expand Down Expand Up @@ -863,9 +874,7 @@ void BP5Writer::InitAggregator()
<< " aggr size = " << m_AggregatorTwoLevelShm.m_Size
<< " rank = " << m_AggregatorTwoLevelShm.m_Rank
<< " subfile = " << m_AggregatorTwoLevelShm.m_SubStreamIndex
<< " type = " << m_Parameters.AggregationType
<< std::endl;*/
<< " type = " << m_Parameters.AggregationType << std::endl;*/

m_IAmDraining = m_AggregatorTwoLevelShm.m_IsMasterAggregator;
m_IAmWritingData = m_AggregatorTwoLevelShm.m_IsAggregator;
Expand Down Expand Up @@ -1492,7 +1501,8 @@ void BP5Writer::FlushProfiler()
std::string profileFileName;
if (m_DrainBB)
{
// auto bpTargetNames = m_BP4Serializer.GetBPBaseNames({m_Name});
// auto bpTargetNames =
// m_BP4Serializer.GetBPBaseNames({m_Name});
std::vector<std::string> bpTargetNames = {m_Name};
if (fileTransportIdx > -1)
{
Expand All @@ -1509,7 +1519,8 @@ void BP5Writer::FlushProfiler()
else
{
transport::FileFStream profilingJSONStream(m_Comm);
// auto bpBaseNames = m_BP4Serializer.GetBPBaseNames({m_BBName});
// auto bpBaseNames =
// m_BP4Serializer.GetBPBaseNames({m_BBName});
std::vector<std::string> bpBaseNames = {m_Name};
if (fileTransportIdx > -1)
{
Expand Down
10 changes: 10 additions & 0 deletions source/adios2/helper/adiosMath.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ void GetMinMaxSubblocks(const T *values, const Dims &count,
const BlockDivisionInfo &info, std::vector<T> &MinMaxs,
T &bmin, T &bmax, const unsigned int threads) noexcept;

/**
* @brief Return a value within the min/max limits
* @param value The value to be returned
* @param minValue Minimum limit returned if value is below this
* @param maxValue Maximum limit returned if value is above this
* @return A value withing the limits.
*/
template <class T>
T SetWithinLimit(const T value, const T minValue, const T maxValue);

} // end namespace helper
} // end namespace adios2

Expand Down
8 changes: 8 additions & 0 deletions source/adios2/helper/adiosMath.inl
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,14 @@ std::vector<T> VectorsOp(BinaryOperation op, const std::vector<T> &vector1,
return result;
}

template <class T>
T SetWithinLimit(const T value, const T minValue, const T maxValue)
{
T v = (value < minValue ? minValue : value);
v = (v > maxValue ? maxValue : v);
return v;
}

} // end namespace helper
} // end namespace adios2

Expand Down

0 comments on commit 89165bd

Please sign in to comment.