Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Reader-side Profiling #3778

Merged
merged 5 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
#include "BP5Reader.tcc"

#include "adios2/helper/adiosMath.h" // SetWithinLimit
#include "adios2/toolkit/transport/file/FileFStream.h"
#include <adios2-perfstubs-interface.h>

#include <chrono>
#include <cstdio>
#include <errno.h>
#include <iostream>
#include <mutex>
#include <thread>

Expand All @@ -31,7 +34,8 @@ namespace engine
BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm)
: Engine("BP5Reader", io, name, mode, std::move(comm)), m_MDFileManager(io, m_Comm),
m_DataFileManager(io, m_Comm), m_MDIndexFileManager(io, m_Comm),
m_FileMetaMetadataManager(io, m_Comm), m_ActiveFlagFileManager(io, m_Comm), m_Remote()
m_FileMetaMetadataManager(io, m_Comm), m_ActiveFlagFileManager(io, m_Comm), m_Remote(),
m_JSONProfiler(m_Comm)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Open");
Init();
Expand Down Expand Up @@ -298,6 +302,7 @@ void BP5Reader::PerformLocalGets()

// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
m_JSONProfiler.Start("DataRead");
size_t maxReadSize;

// TP startGenerate = NOW();
Expand All @@ -317,6 +322,10 @@ void BP5Reader::PerformLocalGets()
reqidx = nextRequest;
++nextRequest;
}
if (reqidx <= nRequest)
{
m_JSONProfiler.AddBytes("dataread", ReadRequests[reqidx].ReadLength);
}
return reqidx;
};

Expand Down Expand Up @@ -406,11 +415,13 @@ void BP5Reader::PerformLocalGets()
{
Req.DestinationAddr = buf.data();
}
m_JSONProfiler.AddBytes("dataread", Req.ReadLength);
ReadData(m_DataFileManager, maxOpenFiles, Req.WriterRank, Req.Timestep, Req.StartOffset,
Req.ReadLength, Req.DestinationAddr);
m_BP5Deserializer->FinalizeGet(Req, false);
}
}
m_JSONProfiler.Stop("DataRead");
/*TP end = NOW();
double t1 = DURATION(start, end);
double t2 = DURATION(startRead, end);
Expand Down Expand Up @@ -789,16 +800,19 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol

if (actualFileSize >= expectedMinFileSize)
{
m_JSONProfiler.Start("MetaDataRead");
m_Metadata.Resize(fileFilteredSize, "allocating metadata buffer, "
"in call to BP5Reader Open");
size_t mempos = 0;
for (auto p : m_FilteredMetadataInfo)
{
m_JSONProfiler.AddBytes("metadataread", p.second);
m_MDFileManager.ReadFile(m_Metadata.m_Buffer.data() + mempos, p.second,
p.first);
mempos += p.second;
}
m_MDFileAlreadyReadSize = expectedMinFileSize;
m_JSONProfiler.Stop("MetaDataRead");
}
else
{
Expand All @@ -824,12 +838,15 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol
if (metametadataFileSize > m_MetaMetaDataFileAlreadyReadSize)
{
const size_t newMMDSize = metametadataFileSize - m_MetaMetaDataFileAlreadyReadSize;
m_JSONProfiler.Start("MetaMetaDataRead");
m_JSONProfiler.AddBytes("metametadataread", newMMDSize);
m_MetaMetadata.Resize(metametadataFileSize, "(re)allocating meta-meta-data buffer, "
"in call to BP5Reader Open");
m_FileMetaMetadataManager.ReadFile(m_MetaMetadata.m_Buffer.data() +
m_MetaMetaDataFileAlreadyReadSize,
newMMDSize, m_MetaMetaDataFileAlreadyReadSize);
m_MetaMetaDataFileAlreadyReadSize += newMMDSize;
m_JSONProfiler.Stop("MetaMetaDataRead");
}
}

Expand Down Expand Up @@ -1194,6 +1211,7 @@ void BP5Reader::DoClose(const int transportIndex)
{
EndStep();
}
FlushProfiler();
m_DataFileManager.CloseFiles();
m_MDFileManager.CloseFiles();
m_MDIndexFileManager.CloseFiles();
Expand All @@ -1204,6 +1222,48 @@ void BP5Reader::DoClose(const int transportIndex)
}
}

#if defined(_WIN32)
#include <windows.h>
#define getpid() GetCurrentProcessId();
#elif defined(__linux) || defined(__APPLE__) || defined(__OpenBSD__) || defined(__FreeBSD__) || \
defined(__NetBSD__) || defined(__DragonFly__) || defined(__CYGWIN__)
#include <sys/types.h>
#include <unistd.h>
#else
#define getpid() (long long)-1;
#endif

void BP5Reader::FlushProfiler()
{

auto LineJSON = m_JSONProfiler.GetRankProfilingJSON({}, {});
const std::vector<char> profilingJSON(m_JSONProfiler.AggregateProfilingJSON(LineJSON));

if (m_RankMPI == 0)
{
std::string profileFileName;
transport::FileFStream profilingJSONStream(m_Comm);
std::string bpBaseName = m_Name;

auto PID = getpid();
std::stringstream PIDstr;
PIDstr << std::hex << PID;
// write profile json in /tmp
profileFileName = "/tmp/" + bpBaseName + "_" + PIDstr.str() + "_profiling.json";

try
{
(void)remove(profileFileName.c_str());
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(), profilingJSON.size());
profilingJSONStream.Close();
}
catch (...)
{ // do nothing
}
}
}

size_t BP5Reader::DoSteps() const { return m_StepsCount; }

void BP5Reader::NotifyEngineNoVarsQuery()
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class BP5Reader : public BP5Engine, public Engine
transportman::TransportMan m_ActiveFlagFileManager;
Remote m_Remote;
bool m_WriterIsActive = true;
adios2::profiling::JSONProfiler m_JSONProfiler;

/** used for per-step reads, TODO: to be moved to BP5Deserializer */
size_t m_CurrentStep = 0;
Expand Down Expand Up @@ -200,6 +201,8 @@ class BP5Reader : public BP5Engine, public Engine

void DoClose(const int transportIndex = -1) final;

void FlushProfiler();

void GetSyncCommon(VariableBase &variable, void *data);

void GetDeferredCommon(VariableBase &variable, void *data);
Expand Down
22 changes: 19 additions & 3 deletions source/adios2/toolkit/profiling/iochrono/IOChrono.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ JSONProfiler::JSONProfiler(helper::Comm const &comm) : m_Comm(comm)
AddTimerWatch("PDW");

m_Profiler.m_Bytes.emplace("buffering", 0);
AddTimerWatch("DataRead");
m_Profiler.m_Bytes.emplace("dataread", 0);
AddTimerWatch("MetaDataRead");
m_Profiler.m_Bytes.emplace("metadataread", 0);
AddTimerWatch("MetaMetaDataRead");
m_Profiler.m_Bytes.emplace("metadmetaataread", 0);

m_RankMPI = m_Comm.Rank();
}
Expand Down Expand Up @@ -92,11 +98,21 @@ std::string JSONProfiler::GetRankProfilingJSON(
for (const auto &timerPair : profiler.m_Timers)
{
const profiling::Timer &timer = timerPair.second;
// rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
// "\": " + std::to_string(timer.m_ProcessTime) + ", ";
timer.AddToJsonStr(rankLog);
if (timer.m_nCalls > 0)
{
rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
"\": " + std::to_string(timer.m_ProcessTime) + ", ";
timer.AddToJsonStr(rankLog);
}
}

size_t DataBytes = m_Profiler.m_Bytes["dataread"];
size_t MetaDataBytes = m_Profiler.m_Bytes["metadataread"];
size_t MetaMetaDataBytes = m_Profiler.m_Bytes["metametadataread"];
rankLog += ", \"databytes\":" + std::to_string(DataBytes);
rankLog += ", \"metadatabytes\":" + std::to_string(MetaDataBytes);
rankLog += ", \"metametadatabytes\":" + std::to_string(MetaMetaDataBytes);

const size_t transportsSize = transportsTypes.size();

for (unsigned int t = 0; t < transportsSize; ++t)
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/profiling/iochrono/IOChrono.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class JSONProfiler

void Start(const std::string process) { m_Profiler.Start(process); };
void Stop(const std::string process) { m_Profiler.Stop(process); };
void AddBytes(const std::string process, size_t bytes)
{
m_Profiler.m_Bytes[process] += bytes;
};

std::string GetRankProfilingJSON(
const std::vector<std::string> &transportsTypes,
Expand Down