Skip to content

Commit

Permalink
Move async file open functionality to transports
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Mar 4, 2020
1 parent 57235fe commit ab62204
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 117 deletions.
16 changes: 7 additions & 9 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,14 @@ void BP3Writer::InitTransports()
{
if (m_BP3Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.m_IsActive);
}
}

Expand Down
16 changes: 7 additions & 9 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,14 @@ void BP4Writer::InitTransports()
{
if (m_BP4Serializer.m_Parameters.AsyncTasks)
{
m_FileDataManager.OpenFilesAsync(
bpSubStreamNames, m_OpenMode, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}
else
{
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["asynctasks"] = "true";
}
}
m_FileDataManager.OpenFiles(bpSubStreamNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);
}

if (m_BP4Serializer.m_RankMPI == 0)
Expand Down
7 changes: 5 additions & 2 deletions source/adios2/toolkit/transport/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ class Transport
void InitProfiler(const Mode openMode, const TimeUnit timeUnit);

/**
* Opens transport, required before SetBuffer, Write, Read, Flush, Close
* Opens transport, possibly asynchronously, required before SetBuffer,
* Write, Read, Flush, Close
* @param name
* @param openMode
* @param async
*/
virtual void Open(const std::string &name, const Mode openMode) = 0;
virtual void Open(const std::string &name, const Mode openMode,
const bool async = false) = 0;

/**
* If OS buffered (FILE* or fstream), sets the buffer size
Expand Down
59 changes: 51 additions & 8 deletions source/adios2/toolkit/transport/file/FileFStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,51 @@ FileFStream::FileFStream(helper::Comm const &comm, const bool debugMode)
{
}

void FileFStream::Open(const std::string &name, const Mode openMode)
void FileFStream::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FileFStream::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> void {
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
};
m_Name = name;
CheckName();
m_OpenMode = openMode;

switch (m_OpenMode)
{
case (Mode::Write):
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileStream.open(name, std::fstream::out | std::fstream::binary |
std::fstream::trunc);
ProfilerStop("open");
}
break;

case (Mode::Append):
Expand All @@ -59,9 +91,13 @@ void FileFStream::Open(const std::string &name, const Mode openMode)
", in call to stream open");
}

CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to fstream open");
m_IsOpen = true;
}
}

void FileFStream::SetBuffer(char *buffer, size_t size)
Expand All @@ -81,6 +117,7 @@ void FileFStream::Write(const char *buffer, size_t size, size_t start)
", in call to fstream write");
};

WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekp(start);
Expand Down Expand Up @@ -117,6 +154,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)
", in call to fstream read");
};

WaitForOpen();
if (start != MaxSizeT)
{
m_FileStream.seekg(start);
Expand Down Expand Up @@ -145,6 +183,7 @@ void FileFStream::Read(char *buffer, size_t size, size_t start)

size_t FileFStream::GetSize()
{
WaitForOpen();
const auto currentPosition = m_FileStream.tellg();
m_FileStream.seekg(0, std::ios_base::end);
const std::streampos size = m_FileStream.tellg();
Expand All @@ -159,6 +198,7 @@ size_t FileFStream::GetSize()

void FileFStream::Flush()
{
WaitForOpen();
ProfilerStart("write");
m_FileStream.flush();
ProfilerStart("write");
Expand All @@ -168,6 +208,7 @@ void FileFStream::Flush()

void FileFStream::Close()
{
WaitForOpen();
ProfilerStart("close");
m_FileStream.close();
ProfilerStop("close");
Expand All @@ -186,13 +227,15 @@ void FileFStream::CheckFile(const std::string hint) const

void FileFStream::SeekToEnd()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::end);
CheckFile("couldn't move to the end of file " + m_Name +
", in call to fstream seekp");
}

void FileFStream::SeekToBegin()
{
WaitForOpen();
m_FileStream.seekp(0, std::ios_base::beg);
CheckFile("couldn't move to the beginning of file " + m_Name +
", in call to fstream seekp");
Expand Down
7 changes: 6 additions & 1 deletion source/adios2/toolkit/transport/file/FileFStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILESTREAM_H_

#include <fstream>
#include <future> //std::async, std::future

#include "adios2/common/ADIOSConfig.h"
#include "adios2/helper/adiosComm.h"
Expand All @@ -31,7 +32,8 @@ class FileFStream : public Transport

~FileFStream() = default;

void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;

void SetBuffer(char *buffer, size_t size) final;

Expand All @@ -52,12 +54,15 @@ class FileFStream : public Transport
private:
/** file stream using fstream library */
std::fstream m_FileStream;
bool m_IsOpening = false;
std::future<void> m_OpenFuture;

/**
* Check if m_FileStream is false after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};

} // end namespace transport
Expand Down
61 changes: 52 additions & 9 deletions source/adios2/toolkit/transport/file/FilePOSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,52 @@ FilePOSIX::~FilePOSIX()
}
}

void FilePOSIX::Open(const std::string &name, const Mode openMode)
void FilePOSIX::WaitForOpen()
{
if (m_IsOpening)
{
if (m_OpenFuture.valid())
{
m_FileDescriptor = m_OpenFuture.get();
}
m_IsOpening = false;
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FilePOSIX::Open(const std::string &name, const Mode openMode,
const bool async)
{
auto lf_AsyncOpenWrite = [&](const std::string &name) -> int {
ProfilerStart("open");
int FD = open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
return FD;
};

m_Name = name;
CheckName();
m_OpenMode = openMode;
switch (m_OpenMode)
{

case (Mode::Write):
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
if (async)
{
m_IsOpening = true;
m_OpenFuture =
std::async(std::launch::async, lf_AsyncOpenWrite, name);
}
else
{
ProfilerStart("open");
m_FileDescriptor =
open(m_Name.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
ProfilerStop("open");
}
break;

case (Mode::Append):
Expand All @@ -71,10 +104,13 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode)
", in call to POSIX open");
}

CheckFile("couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");

m_IsOpen = true;
if (!m_IsOpening)
{
CheckFile(
"couldn't open file " + m_Name +
", check permissions or path existence, in call to POSIX open");
m_IsOpen = true;
}
}

void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
Expand Down Expand Up @@ -103,6 +139,7 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start)
}
};

WaitForOpen();
if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
Expand Down Expand Up @@ -161,6 +198,8 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
}
};

WaitForOpen();

if (start != MaxSizeT)
{
const auto newPosition = lseek(m_FileDescriptor, start, SEEK_SET);
Expand Down Expand Up @@ -197,6 +236,7 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
size_t FilePOSIX::GetSize()
{
struct stat fileStat;
WaitForOpen();
if (fstat(m_FileDescriptor, &fileStat) == -1)
{
throw std::ios_base::failure("ERROR: couldn't get size of file " +
Expand All @@ -209,6 +249,7 @@ void FilePOSIX::Flush() {}

void FilePOSIX::Close()
{
WaitForOpen();
ProfilerStart("close");
const int status = close(m_FileDescriptor);
ProfilerStop("close");
Expand All @@ -232,6 +273,7 @@ void FilePOSIX::CheckFile(const std::string hint) const

void FilePOSIX::SeekToEnd()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_END);
if (status == -1)
{
Expand All @@ -243,6 +285,7 @@ void FilePOSIX::SeekToEnd()

void FilePOSIX::SeekToBegin()
{
WaitForOpen();
const int status = lseek(m_FileDescriptor, 0, SEEK_SET);
if (status == -1)
{
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/toolkit/transport/file/FilePOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
#define ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_

#include <future> //std::async, std::future

#include "adios2/common/ADIOSConfig.h"
#include "adios2/toolkit/transport/Transport.h"

Expand All @@ -32,7 +34,8 @@ class FilePOSIX : public Transport

~FilePOSIX();

void Open(const std::string &name, const Mode openMode) final;
void Open(const std::string &name, const Mode openMode,
const bool async = false) final;

void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final;

Expand All @@ -52,12 +55,15 @@ class FilePOSIX : public Transport
private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
bool m_IsOpening = false;
std::future<int> m_OpenFuture;

/**
* Check if m_FileDescriptor is -1 after an operation
* @param hint exception message
*/
void CheckFile(const std::string hint) const;
void WaitForOpen();
};

} // end namespace transport
Expand Down
Loading

0 comments on commit ab62204

Please sign in to comment.