Skip to content

Commit

Permalink
HDF5: flush option independent_stores
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jun 12, 2024
1 parent a207118 commit fe9cc05
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 8 deletions.
3 changes: 3 additions & 0 deletions include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#pragma once

#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/config.hpp"
#if openPMD_HAVE_HDF5
#include "openPMD/IO/AbstractIOHandlerImpl.hpp"
Expand Down Expand Up @@ -109,6 +110,8 @@ class HDF5IOHandlerImpl : public AbstractIOHandlerImpl
hid_t m_H5T_LONG_DOUBLE_80_LE;
hid_t m_H5T_CLONG_DOUBLE_80_LE;

std::future<void> flush(internal::ParsedFlushParams &);

protected:
#if openPMD_HAVE_MPI
/*
Expand Down
74 changes: 72 additions & 2 deletions src/IO/HDF5/HDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include "openPMD/IO/HDF5/HDF5IOHandler.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/AbstractIOHandlerImpl.hpp"
#include "openPMD/IO/FlushParametersInternal.hpp"
#include "openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
Expand Down Expand Up @@ -2933,6 +2936,73 @@ HDF5IOHandlerImpl::getFile(Writable *writable)
res.id = it2->second;
return std::make_optional(std::move(res));
}

std::future<void> HDF5IOHandlerImpl::flush(internal::ParsedFlushParams &params)
{
std::optional<H5FD_mpio_xfer_t> old_value;
if (params.backendConfig.json().contains("hdf5"))
{
auto hdf5_config = params.backendConfig["hdf5"];

if (hdf5_config.json().contains("independent_stores"))
{
auto independent_stores_json = hdf5_config["independent_stores"];
if (!independent_stores_json.json().is_boolean())
{
throw error::BackendConfigSchema(
{"hdf5", "independent_stores"}, "Requires boolean value.");
}
bool independent_stores =
independent_stores_json.json().get<bool>();
old_value = std::make_optional<H5FD_mpio_xfer_t>();
herr_t status =
H5Pget_dxpl_mpio(m_datasetTransferProperty, &*old_value);
VERIFY(
status >= 0,
"[HDF5] Internal error: Failed to query the global data "
"transfer mode before flushing.");
H5FD_mpio_xfer_t new_value = independent_stores
? H5FD_MPIO_INDEPENDENT
: H5FD_MPIO_COLLECTIVE;
status = H5Pset_dxpl_mpio(m_datasetTransferProperty, new_value);
VERIFY(
status >= 0,
"[HDF5] Internal error: Failed to set the local data "
"transfer mode before flushing.");
}

if (auto shadow = hdf5_config.invertShadow(); shadow.size() > 0)
{
switch (hdf5_config.originallySpecifiedAs)
{
case json::SupportedLanguages::JSON:
std::cerr << "Warning: parts of the backend configuration for "
"HDF5 remain unused:\n"
<< shadow << std::endl;
break;
case json::SupportedLanguages::TOML: {
auto asToml = json::jsonToToml(shadow);
std::cerr << "Warning: parts of the backend configuration for "
"HDF5 remain unused:\n"
<< asToml << std::endl;
break;
}
}
}
}
auto res = AbstractIOHandlerImpl::flush();

if (old_value.has_value())
{
herr_t status = H5Pset_dxpl_mpio(m_datasetTransferProperty, *old_value);
VERIFY(
status >= 0,
"[HDF5] Internal error: Failed to reset the global data "
"transfer mode after flushing.");
}

return res;
}
#endif

#if openPMD_HAVE_HDF5
Expand All @@ -2944,9 +3014,9 @@ HDF5IOHandler::HDF5IOHandler(

HDF5IOHandler::~HDF5IOHandler() = default;

std::future<void> HDF5IOHandler::flush(internal::ParsedFlushParams &)
std::future<void> HDF5IOHandler::flush(internal::ParsedFlushParams &params)
{
return m_impl->flush();
return m_impl->flush(params);
}
#else

Expand Down
11 changes: 6 additions & 5 deletions src/IO/HDF5/ParallelHDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ ParallelHDF5IOHandler::ParallelHDF5IOHandler(

ParallelHDF5IOHandler::~ParallelHDF5IOHandler() = default;

std::future<void> ParallelHDF5IOHandler::flush(internal::ParsedFlushParams &)
std::future<void>
ParallelHDF5IOHandler::flush(internal::ParsedFlushParams &params)
{
return m_impl->flush();
return m_impl->flush(params);
}

ParallelHDF5IOHandlerImpl::ParallelHDF5IOHandlerImpl(
Expand Down Expand Up @@ -121,14 +122,14 @@ ParallelHDF5IOHandlerImpl::ParallelHDF5IOHandlerImpl(
}

H5FD_mpio_xfer_t xfer_mode = H5FD_MPIO_COLLECTIVE;
auto const hdf5_collective =
auto const hdf5_independent =
auxiliary::getEnvString("OPENPMD_HDF5_INDEPENDENT", "ON");
if (hdf5_collective == "ON")
if (hdf5_independent == "ON")
xfer_mode = H5FD_MPIO_INDEPENDENT;
else
{
VERIFY(
hdf5_collective == "OFF",
hdf5_independent == "OFF",
"[HDF5] Internal error: OPENPMD_HDF5_INDEPENDENT property must be "
"either ON or OFF");
}
Expand Down
4 changes: 3 additions & 1 deletion test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ TEST_CASE("hdf5_write_test", "[parallel][hdf5]")
"hdf5.dataset.chunks = [1]"));
e["position"]["x"].storeChunk(position_local, {mpi_rank}, {1});

o.flush("hdf5.independent_stores = true");

std::vector<uint64_t> positionOffset_global(mpi_size);
uint64_t posOff{0};
std::generate(
Expand All @@ -344,7 +346,7 @@ TEST_CASE("hdf5_write_test", "[parallel][hdf5]")
e["positionOffset"]["y"].storeChunk(
std::make_unique<float>(3.141592654), {0}, {1});

o.flush();
o.flush("hdf5.independent_stores = false");
}

TEST_CASE("hdf5_write_test_zero_extent", "[parallel][hdf5]")
Expand Down

0 comments on commit fe9cc05

Please sign in to comment.