From b1b09050321aba0eeebb534f8351e23fec913ff1 Mon Sep 17 00:00:00 2001 From: Axel Huebl Date: Sun, 27 Jan 2019 17:08:05 +0100 Subject: [PATCH] [Draft] mpi4py Add support for MPI-parallel I/O in python3 bindings. --- CMakeLists.txt | 29 ++++++++++--- examples/5_write_parallel.py | 80 +++++++++++++++++++++++++++++++++++ src/binding/python/Series.cpp | 65 +++++++++++++++++++++++++++- 3 files changed, 167 insertions(+), 7 deletions(-) create mode 100644 examples/5_write_parallel.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f3130c6b6..ef291711e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -276,6 +276,11 @@ if(openPMD_HAVE_PYTHON) INTERFACE pybind11::pybind11) endif() +# TODO: Check if mpi4py is available +# ref: https://github.com/ornladios/ADIOS2/blob/v2.3.0/cmake/FindPythonModule.cmake +# find_package(PythonModule REQUIRED COMPONENTS mpi4py mpi4py/mpi4py.h) +# find_package(PythonModule REQUIRED COMPONENTS numpy) + # Targets ##################################################################### # @@ -585,6 +590,7 @@ set(openPMD_EXAMPLE_NAMES set(openPMD_PYTHON_EXAMPLE_NAMES 2_read_serial 3_write_serial + 5_write_parallel 7_extended_write_serial 9_particle_write_serial ) @@ -859,12 +865,23 @@ if(openPMD_HAVE_PYTHON) ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py ) if(BUILD_TESTING) - add_test(NAME Example.py.${examplename} - COMMAND ${PYTHON_EXECUTABLE} - ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py - WORKING_DIRECTORY - ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} - ) + if(${examplename} MATCHES "^.*_parallel$") + if(openPMD_HAVE_MPI) + # see https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html + add_test(NAME Example.py.${examplename} + COMMAND ${MPI_TEST_EXE} ${PYTHON_EXECUTABLE} -m mpi4py + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py + WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} + ) + endif() + else() + add_test(NAME Example.py.${examplename} + COMMAND ${PYTHON_EXECUTABLE} + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py + WORKING_DIRECTORY + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} + ) + endif() if(WIN32) string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BASEDIR ${openPMD_BINARY_DIR}) string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BINDIR ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) diff --git a/examples/5_write_parallel.py b/examples/5_write_parallel.py new file mode 100644 index 0000000000..fb551d288b --- /dev/null +++ b/examples/5_write_parallel.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +""" +This file is part of the openPMD-api. + +Copyright 2019 openPMD contributors +Authors: Axel Huebl +License: LGPLv3+ +""" +import openpmd_api +import numpy as np + +# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html +# on import: calls MPI_Init_thread() +# exit hook: calls MPI_Finalize() +from mpi4py import MPI + + +if __name__ == "__main__": + # also works with any other MPI communicator + comm = MPI.COMM_WORLD + + # allocate a data set to write + global_data = np.arange(comm.size, dtype=np.double) + if 0 == comm.rank: + print("Set up a 1D array with one element per MPI rank ({}) " + "that will be written to disk".format(comm.size)) + + local_data = np.array([0, ], dtype=np.double) + local_data[0] = global_data[comm.rank] + if 0 == comm.rank: + print("Set up a 1D array with one element, representing the view of " + "the MPI rank") + + # open file for writing + series = openpmd_api.Series( + "../samples/5_parallel_write_py.h5", + openpmd_api.Access_Type.create, + comm + ) + if 0 == comm.rank: + print("Created an empty series in parallel with {} MPI ranks".format( + comm.size)) + + id = series.iterations[1]. \ + meshes["id"][openpmd_api.Mesh_Record_Component.SCALAR] + + datatype = openpmd_api.Datatype.DOUBLE + # datatype = determineDatatype(local_data) + dataset_extent = [comm.size, ] + dataset = openpmd_api.Dataset(datatype, dataset_extent) + + if 0 == comm.rank: + print("Created a Dataset of size {} and Datatype {}".format( + dataset.extent[0], dataset.dtype)) + + id.reset_dataset(dataset) + if 0 == comm.rank: + print("Set the global on-disk Dataset properties for the scalar field " + "id in iteration 1") + + series.flush() + if 0 == comm.rank: + print("File structure has been written to disk") + + chunk_offset = [comm.rank, ] + chunk_extent = [1, ] + id.store_chunk(local_data, chunk_offset, chunk_extent) + if 0 == comm.rank: + print("Stored a single chunk per MPI rank containing its contribution," + " ready to write content to disk") + + series.flush() + if 0 == comm.rank: + print("Dataset content has been fully written to disk") + + # The files in 'series' are still open until the object is destroyed, on + # which it cleanly flushes and closes all open file handles. + # One can delete the object explicitly (or let it run out of scope) to + # trigger this. + del series diff --git a/src/binding/python/Series.cpp b/src/binding/python/Series.cpp index 4dd6e61265..8ce5cc5c71 100644 --- a/src/binding/python/Series.cpp +++ b/src/binding/python/Series.cpp @@ -20,17 +20,80 @@ */ #include #include +#if openPMD_HAVE_MPI +// include +# include +#endif #include "openPMD/Series.hpp" +#include namespace py = pybind11; using namespace openPMD; +/** mpi4py communicator wrapper + * + * refs: + * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/libmpi.pxd#L35-L36 + * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/MPI.pxd#L100-L105 + * - installed: include/mpi4py/mpi4py.MPI.h + */ +struct openPMD_PyMPICommObject +{ + PyObject_HEAD + MPI_Comm* ob_mpi; + unsigned int flags; +}; +using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject; + void init_Series(py::module &m) { py::class_(m, "Series") - .def(py::init()) + .def(py::init(), + py::arg("filepath"), py::arg("access_type")) +#if openPMD_HAVE_MPI + .def(py::init([](std::string const& filepath, AccessType at, py::object &comm){ + //! @todo perform mpi4py import test? + //! required C-API init? + //! https://bitbucket.org/mpi4py/mpi4py/src/3.0.0/demo/wrap-c/helloworld.c + // if( import_mpi4py() < 0 ) { here be dragons } + + if( comm.ptr() == Py_None ) + throw std::runtime_error("Series: MPI communicator cannot be None."); + if( comm.ptr() == nullptr ) + throw std::runtime_error("Series: MPI communicator is a nullptr."); + + // check type string to see if this is mpi4py + // __str__ (pretty) + // __repr__ (unambiguous) + // mpi4py: + // pyMPI: ... (todo) + py::str const comm_pystr = py::repr(comm); + std::string const comm_str = std::string(comm_pystr); + if( comm_str.substr(0, 12) != std::string("ob_mpi; + + if( PyErr_Occurred() ) + throw std::runtime_error("Series: MPI Communicator access error."); + if( mpiCommPtr == nullptr ) { + throw std::runtime_error("Series: MPI communicator cast failed. " + "(Mismatched MPI at compile vs. runtime?)"); + } + + return new Series(filepath, at, *mpiCommPtr); + }), + py::arg("filepath"), py::arg("access_type"), py::arg("mpi_communicator") + ) +#endif .def_property_readonly("openPMD", &Series::openPMD) .def("set_openPMD", &Series::setOpenPMD)