Skip to content

Commit

Permalink
[Draft] mpi4py
Browse files Browse the repository at this point in the history
Add support for MPI-parallel I/O in python3 bindings.
  • Loading branch information
ax3l committed Jan 30, 2019
1 parent 207ec33 commit b1b0905
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 7 deletions.
29 changes: 23 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ if(openPMD_HAVE_PYTHON)
INTERFACE pybind11::pybind11)
endif()

# TODO: Check if mpi4py is available
# ref: https://github.com/ornladios/ADIOS2/blob/v2.3.0/cmake/FindPythonModule.cmake
# find_package(PythonModule REQUIRED COMPONENTS mpi4py mpi4py/mpi4py.h)
# find_package(PythonModule REQUIRED COMPONENTS numpy)


# Targets #####################################################################
#
Expand Down Expand Up @@ -585,6 +590,7 @@ set(openPMD_EXAMPLE_NAMES
set(openPMD_PYTHON_EXAMPLE_NAMES
2_read_serial
3_write_serial
5_write_parallel
7_extended_write_serial
9_particle_write_serial
)
Expand Down Expand Up @@ -859,12 +865,23 @@ if(openPMD_HAVE_PYTHON)
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
)
if(BUILD_TESTING)
add_test(NAME Example.py.${examplename}
COMMAND ${PYTHON_EXECUTABLE}
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
if(${examplename} MATCHES "^.*_parallel$")
if(openPMD_HAVE_MPI)
# see https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
add_test(NAME Example.py.${examplename}
COMMAND ${MPI_TEST_EXE} ${PYTHON_EXECUTABLE} -m mpi4py
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
endif()
else()
add_test(NAME Example.py.${examplename}
COMMAND ${PYTHON_EXECUTABLE}
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
endif()
if(WIN32)
string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BASEDIR ${openPMD_BINARY_DIR})
string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BINDIR ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})
Expand Down
80 changes: 80 additions & 0 deletions examples/5_write_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python
"""
This file is part of the openPMD-api.
Copyright 2019 openPMD contributors
Authors: Axel Huebl
License: LGPLv3+
"""
import openpmd_api
import numpy as np

# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI


if __name__ == "__main__":
# also works with any other MPI communicator
comm = MPI.COMM_WORLD

# allocate a data set to write
global_data = np.arange(comm.size, dtype=np.double)
if 0 == comm.rank:
print("Set up a 1D array with one element per MPI rank ({}) "
"that will be written to disk".format(comm.size))

local_data = np.array([0, ], dtype=np.double)
local_data[0] = global_data[comm.rank]
if 0 == comm.rank:
print("Set up a 1D array with one element, representing the view of "
"the MPI rank")

# open file for writing
series = openpmd_api.Series(
"../samples/5_parallel_write_py.h5",
openpmd_api.Access_Type.create,
comm
)
if 0 == comm.rank:
print("Created an empty series in parallel with {} MPI ranks".format(
comm.size))

id = series.iterations[1]. \
meshes["id"][openpmd_api.Mesh_Record_Component.SCALAR]

datatype = openpmd_api.Datatype.DOUBLE
# datatype = determineDatatype(local_data)
dataset_extent = [comm.size, ]
dataset = openpmd_api.Dataset(datatype, dataset_extent)

if 0 == comm.rank:
print("Created a Dataset of size {} and Datatype {}".format(
dataset.extent[0], dataset.dtype))

id.reset_dataset(dataset)
if 0 == comm.rank:
print("Set the global on-disk Dataset properties for the scalar field "
"id in iteration 1")

series.flush()
if 0 == comm.rank:
print("File structure has been written to disk")

chunk_offset = [comm.rank, ]
chunk_extent = [1, ]
id.store_chunk(local_data, chunk_offset, chunk_extent)
if 0 == comm.rank:
print("Stored a single chunk per MPI rank containing its contribution,"
" ready to write content to disk")

series.flush()
if 0 == comm.rank:
print("Dataset content has been fully written to disk")

# The files in 'series' are still open until the object is destroyed, on
# which it cleanly flushes and closes all open file handles.
# One can delete the object explicitly (or let it run out of scope) to
# trigger this.
del series
65 changes: 64 additions & 1 deletion src/binding/python/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,80 @@
*/
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#if openPMD_HAVE_MPI
// include <mpi4py/mpi4py.h>
# include <mpi.h>
#endif

#include "openPMD/Series.hpp"
#include <string>

namespace py = pybind11;
using namespace openPMD;

/** mpi4py communicator wrapper
*
* refs:
* - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/libmpi.pxd#L35-L36
* - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/MPI.pxd#L100-L105
* - installed: include/mpi4py/mpi4py.MPI.h
*/
struct openPMD_PyMPICommObject
{
PyObject_HEAD
MPI_Comm* ob_mpi;
unsigned int flags;
};
using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject;


void init_Series(py::module &m) {
py::class_<Series, Attributable>(m, "Series")

.def(py::init<std::string const&, AccessType>())
.def(py::init<std::string const&, AccessType>(),
py::arg("filepath"), py::arg("access_type"))
#if openPMD_HAVE_MPI
.def(py::init([](std::string const& filepath, AccessType at, py::object &comm){
//! @todo perform mpi4py import test?
//! required C-API init?
//! https://bitbucket.org/mpi4py/mpi4py/src/3.0.0/demo/wrap-c/helloworld.c
// if( import_mpi4py() < 0 ) { here be dragons }

if( comm.ptr() == Py_None )
throw std::runtime_error("Series: MPI communicator cannot be None.");
if( comm.ptr() == nullptr )
throw std::runtime_error("Series: MPI communicator is a nullptr.");

// check type string to see if this is mpi4py
// __str__ (pretty)
// __repr__ (unambiguous)
// mpi4py: <mpi4py.MPI.Intracomm object at 0x7f998e6e28d0>
// pyMPI: ... (todo)
py::str const comm_pystr = py::repr(comm);
std::string const comm_str = std::string(comm_pystr);
if( comm_str.substr(0, 12) != std::string("<mpi4py.MPI.") )
throw std::runtime_error("Series: comm is not an mpi4py communicator: " +
comm_str);

//! @todo other possible implementations:
// - pyMPI (inactive since 2008?): import mpi; mpi.WORLD

// reimplementation of mpi4py's:
// MPI_Comm* mpiCommPtr = PyMPIComm_Get(comm.ptr());
MPI_Comm* mpiCommPtr = (MPI_Comm*)&((openPMD_PyMPIIntracommObject*)(comm.ptr()))->ob_mpi;

if( PyErr_Occurred() )
throw std::runtime_error("Series: MPI Communicator access error.");
if( mpiCommPtr == nullptr ) {
throw std::runtime_error("Series: MPI communicator cast failed. "
"(Mismatched MPI at compile vs. runtime?)");
}

return new Series(filepath, at, *mpiCommPtr);
}),
py::arg("filepath"), py::arg("access_type"), py::arg("mpi_communicator")
)
#endif

.def_property_readonly("openPMD", &Series::openPMD)
.def("set_openPMD", &Series::setOpenPMD)
Expand Down

0 comments on commit b1b0905

Please sign in to comment.