Skip to content

Commit

Permalink
String parameter SelectSteps added to BP5 so that a reader only sees …
Browse files Browse the repository at this point in the history
…the selected steps. The selection can contain multiple range selections in the form of start:end:step form, as a space-separated list. Unlimited selections can be defined using 'n' or 'N' for the end part of the range. Steps start from 0. For example: "0:n:3 2:n:3 1:n:3" is a valid selection spec for getting all steps. Memory requirement is std::vector<bool>(M) where M is the largest number in the spec (unlimited spec uses 16 bytes per unlimited range). There is NO protection from users specifying 64bit large numbers and run out of memory. Note that a reader always sees the available steps as step 0, 1, 2 and so on. This feature picks which steps in a file/stream will become visible.
  • Loading branch information
pnorbert committed Feb 5, 2022
1 parent fb70532 commit aa0b54c
Show file tree
Hide file tree
Showing 10 changed files with 668 additions and 63 deletions.
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_library(adios2_core
helper/adiosXMLUtil.cpp
helper/adiosYAML.cpp
helper/adiosLog.cpp
helper/adiosRangeFilter.cpp

#engine derived classes
engine/bp3/BP3Reader.cpp engine/bp3/BP3Reader.tcc
Expand Down
9 changes: 8 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ class BP5Engine
{
public:
int m_RankMPI = 0;
/* metadata index table*/
/* metadata index table
0: pos in memory for step (after filtered read)
1: size of metadata
2: flush count
3: pos in index where data offsets are enumerated
4: abs. pos in metadata File for step
*/
std::unordered_map<uint64_t, std::vector<uint64_t>> m_MetadataIndexTable;

struct Minifooter
Expand Down Expand Up @@ -129,6 +135,7 @@ class BP5Engine
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(AppendAfterSteps, Int, int, INT_MAX) \
MACRO(SelectSteps, String, std::string, (char *)(intptr_t)0) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
177 changes: 115 additions & 62 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ void BP5Reader::Init()
ParseParams(m_IO, m_Parameters);
m_ReaderIsRowMajor = (m_IO.m_ArrayOrder == ArrayOrdering::RowMajor);
InitTransports();
if (!m_Parameters.SelectSteps.empty())
{
m_SelectedSteps.ParseSelection(m_Parameters.SelectSteps);
}

/* Do a collective wait for the file(s) to appear within timeout.
Make sure every process comes to the same conclusion */
Expand Down Expand Up @@ -486,8 +490,8 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
const Seconds &pollSeconds,
const Seconds &timeoutSeconds)
{
/* Put all metadata in buffer and parse in random access mode */
size_t newIdxSize = 0;
// Put all metadata in buffer
if (m_Comm.Rank() == 0)
{
/* Read metadata index table into memory */
Expand All @@ -500,7 +504,30 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
"in call to BPFileReader Open");
m_MDIndexFileManager.ReadFile(m_MetadataIndex.m_Buffer.data(),
metadataIndexFileSize);
}
m_MDIndexFileAlreadyReadSize = metadataIndexFileSize;
newIdxSize = metadataIndexFileSize;
}

newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0);

if (newIdxSize > 0)
{
// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer);

/* Parse metadata index table */
ParseMetadataIndex(m_MetadataIndex, 0, true, false);
// now we are sure the index header has been parsed,
// first step parsing done
// m_FilteredMetadataInfo is created
m_IdxHeaderParsed = true;
}

if (newIdxSize > 0)
{
if (m_Comm.Rank() == 0)
{
/* Read metametadata into memory */
const size_t metametadataFileSize =
m_FileMetaMetadataManager.GetFileSize(0);
Expand All @@ -510,34 +537,43 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
m_FileMetaMetadataManager.ReadFile(m_MetaMetadata.m_Buffer.data(),
metametadataFileSize);

size_t fileSize = 0;
fileSize = m_MDFileManager.GetFileSize(0);
#ifdef NOTDEF
size_t fileFilteredSize = 0;
for (auto p : m_FilteredMetadataInfo)
{
fileFilteredSize += p.second;
}

/* Read metadata file into memory but first make sure
* it has the content that the index table refers to */
uint64_t expectedMinFileSize =
MetadataExpectedMinFileSize(m_Name, true);
auto p = m_FilteredMetadataInfo.back();
uint64_t expectedMinFileSize = p.first + p.second;
size_t actualFileSize = 0;
do
{
fileSize = m_MDFileManager.GetFileSize(0);
if (fileSize >= expectedMinFileSize)
actualFileSize = m_MDFileManager.GetFileSize(0);
if (actualFileSize >= expectedMinFileSize)
{
break;
}
} while (SleepOrQuit(timeoutInstant, pollSeconds));

if (fileSize >= expectedMinFileSize)
if (actualFileSize >= expectedMinFileSize)
{
#endif
m_Metadata.Resize(
fileSize,
"allocating metadata buffer, in call to BP5Reader Open");

m_MDFileManager.ReadFile(m_Metadata.m_Buffer.data(), fileSize);
m_MDFileAlreadyReadSize = fileSize;
m_MDIndexFileAlreadyReadSize = metadataIndexFileSize;
newIdxSize = metadataIndexFileSize;
#ifdef NOTDEF
m_Metadata.Resize(fileFilteredSize,
"allocating metadata buffer, "
"in call to BP5Reader Open");

size_t mempos = 0;
for (auto p : m_FilteredMetadataInfo)
{
/*std::cout << "Read metadata pos = " << p.first
<< " size = " << p.second
<< " to mempos = " << mempos << std::endl;*/
m_MDFileManager.ReadFile(
m_Metadata.m_Buffer.data() + mempos, p.second, p.first);
mempos += p.second;
}
m_MDFileAlreadyReadSize = expectedMinFileSize;
}
else
{
Expand All @@ -547,44 +583,28 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
"has not contained enough data within "
"the specified timeout of " +
std::to_string(timeoutSeconds.count()) +
" seconds. index size = " +
std::to_string(metadataIndexFileSize) +
" metadata size = " + std::to_string(fileSize) +
" seconds. index size = " + std::to_string(newIdxSize) +
" metadata size = " + std::to_string(actualFileSize) +
" expected size = " + std::to_string(expectedMinFileSize) +
". One reason could be if the reader finds old data while "
". One reason could be if the reader finds old data "
"while "
"the writer is creating the new files.");
}
#endif
}
}

newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0);

if (newIdxSize > 0)
{
// broadcast buffer to all ranks from zero
m_Comm.BroadcastVector(m_Metadata.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetaMetadata.m_Buffer);

/* Parse metadata index table */
ParseMetadataIndex(m_MetadataIndex, 0, true, false);
// now we are sure the index header has been parsed, first step parsing
// done

m_BP5Deserializer =
new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer->m_Engine = this;

InstallMetaMetaData(m_MetaMetadata);

m_IdxHeaderParsed = true;

if (m_OpenMode == Mode::ReadRandomAccess)
{
for (size_t Step = 0; Step < m_MetadataIndexTable.size(); Step++)
Expand Down Expand Up @@ -672,9 +692,13 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}

// Read each record now
uint64_t currentStep = 0;
uint64_t absStepInFile = 0;
uint64_t lastMapStep = 0;
uint64_t lastWriterCount = 0;
uint64_t MetadataPosTotalSkip = 0;
m_FilteredMetadataInfo.clear();
uint64_t minfo_pos = 0;
uint64_t minfo_size = 0;
do
{
std::vector<uint64_t> ptrs;
Expand All @@ -687,9 +711,15 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

if (!absStepInFile)
{
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (hasWriterMap)
{
auto p = m_WriterMap.emplace(currentStep, WriterMapStruct());
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
Expand All @@ -705,40 +735,63 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
s.RankToSubfile.push_back(subfileIdx);
}
lastMapStep = currentStep;
lastMapStep = m_StepsCount;
lastWriterCount = s.WriterCount;
}
m_WriterMapIndex.push_back(lastMapStep);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
m_MetadataIndexTable[currentStep] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
if (m_SelectedSteps.IsSelected(absStepInFile))
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
m_WriterMapIndex.push_back(lastMapStep);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
ptrs.push_back(MetadataPos); // absolute pos in file before read
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize << "; ";
std::cout << "loc:" << DataPos << std::endl;
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif
minfo_size += MetadataSize;
m_StepsCount++;
}
else
{
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
}
minfo_pos = MetadataPos;
minfo_size = 0;
}

// skip over the writer -> data file offset records
position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
absStepInFile++;
} while (!oneStepOnly && position < buffer.size());
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(std::make_pair(minfo_pos, minfo_size));
}
}

void BP5Reader::DoGetAbsoluteSteps(const VariableBase &variable,
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "adios2/core/Engine.h"
#include "adios2/engine/bp5/BP5Engine.h"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosRangeFilter.h"
#include "adios2/toolkit/format/bp5/BP5Deserializer.h"
#include "adios2/toolkit/transportman/TransportMan.h"

Expand Down Expand Up @@ -96,6 +97,12 @@ class BP5Reader : public BP5Engine, public Engine
bool m_FirstStep = true;
bool m_IdxHeaderParsed = false; // true after first index parsing

/** used to filter steps */
helper::RangeFilter m_SelectedSteps;

// offset/size pairs to read sections of metadata from file in InitBuffer
std::vector<std::pair<uint64_t, uint64_t>> m_FilteredMetadataInfo;

Minifooter m_Minifooter;

void Init();
Expand Down
Loading

0 comments on commit aa0b54c

Please sign in to comment.