Skip to content

Commit

Permalink
[Draft] mpi4py
Browse files Browse the repository at this point in the history
Add support for MPI-parallel I/O in python3 bindings.
  • Loading branch information
ax3l committed Jan 31, 2019
1 parent 207ec33 commit 9414398
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 11 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,12 @@ install:
py-numpy ~blas ~lapack ^python@$TRAVIS_PYTHON_VERSION
$CXXSPEC &&
spack load py-numpy ~blas ~lapack ^python@$TRAVIS_PYTHON_VERSION $CXXSPEC;
if [ $USE_MPI == "ON" ]; then
travis_wait spack install
py-mpi4py ^python@$TRAVIS_PYTHON_VERSION
$CXXSPEC &&
spack load py-mpi4py ^python@$TRAVIS_PYTHON_VERSION $CXXSPEC;
fi;
fi
- if [ $USE_HDF5 == "ON" ]; then
travis_wait spack install
Expand Down
40 changes: 30 additions & 10 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,6 @@ if(openPMD_HAVE_PYTHON)
PDB_OUTPUT_DIRECTORY ${CMAKE_PYTHON_OUTPUT_DIRECTORY}
COMPILE_PDB_OUTPUT_DIRECTORY ${CMAKE_PYTHON_OUTPUT_DIRECTORY}
)

#if(openPMD_HAVE_MPI)
# target_link_libraries(openPMD.py PRIVATE PythonModule::mpi4py)
#endif()
endif()

# tests
Expand All @@ -585,6 +581,7 @@ set(openPMD_EXAMPLE_NAMES
set(openPMD_PYTHON_EXAMPLE_NAMES
2_read_serial
3_write_serial
5_write_parallel
7_extended_write_serial
9_particle_write_serial
)
Expand Down Expand Up @@ -852,19 +849,42 @@ endif()
# Python Examples
if(openPMD_HAVE_PYTHON)
if(EXISTS "${openPMD_BINARY_DIR}/samples/git-sample/")
execute_process(COMMAND ${PYTHON_EXECUTABLE}
-m mpi4py
-c "import mpi4py.MPI"
RESULT_VARIABLE MPI4PY_RETURN)

if(NOT MPI4PY_RETURN EQUAL 0)
message(STATUS "Note: mpi4py not found. "
"Skipping MPI-parallel Python examples.")
endif()

foreach(examplename ${openPMD_PYTHON_EXAMPLE_NAMES})
add_custom_command(TARGET openPMD.py POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy
${openPMD_SOURCE_DIR}/examples/${examplename}.py
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
)
if(BUILD_TESTING)
add_test(NAME Example.py.${examplename}
COMMAND ${PYTHON_EXECUTABLE}
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
if(${examplename} MATCHES "^.*_parallel$")
if(openPMD_HAVE_MPI AND MPI4PY_RETURN EQUAL 0)
# see https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
add_test(NAME Example.py.${examplename}
COMMAND ${MPI_TEST_EXE} ${PYTHON_EXECUTABLE} -m mpi4py
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
else()
continue()
endif()
else()
add_test(NAME Example.py.${examplename}
COMMAND ${PYTHON_EXECUTABLE}
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${examplename}.py
WORKING_DIRECTORY
${CMAKE_RUNTIME_OUTPUT_DIRECTORY}
)
endif()
if(WIN32)
string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BASEDIR ${openPMD_BINARY_DIR})
string(REGEX REPLACE "/" "\\\\" WIN_BUILD_BINDIR ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})
Expand Down
80 changes: 80 additions & 0 deletions examples/5_write_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python
"""
This file is part of the openPMD-api.
Copyright 2019 openPMD contributors
Authors: Axel Huebl
License: LGPLv3+
"""
import openpmd_api
import numpy as np

# https://mpi4py.readthedocs.io/en/stable/mpi4py.run.html
# on import: calls MPI_Init_thread()
# exit hook: calls MPI_Finalize()
from mpi4py import MPI


if __name__ == "__main__":
# also works with any other MPI communicator
comm = MPI.COMM_WORLD

# allocate a data set to write
global_data = np.arange(comm.size, dtype=np.double)
if 0 == comm.rank:
print("Set up a 1D array with one element per MPI rank ({}) "
"that will be written to disk".format(comm.size))

local_data = np.array([0, ], dtype=np.double)
local_data[0] = global_data[comm.rank]
if 0 == comm.rank:
print("Set up a 1D array with one element, representing the view of "
"the MPI rank")

# open file for writing
series = openpmd_api.Series(
"../samples/5_parallel_write_py.h5",
openpmd_api.Access_Type.create,
comm
)
if 0 == comm.rank:
print("Created an empty series in parallel with {} MPI ranks".format(
comm.size))

id = series.iterations[1]. \
meshes["id"][openpmd_api.Mesh_Record_Component.SCALAR]

datatype = openpmd_api.Datatype.DOUBLE
# datatype = determineDatatype(local_data)
dataset_extent = [comm.size, ]
dataset = openpmd_api.Dataset(datatype, dataset_extent)

if 0 == comm.rank:
print("Created a Dataset of size {} and Datatype {}".format(
dataset.extent[0], dataset.dtype))

id.reset_dataset(dataset)
if 0 == comm.rank:
print("Set the global on-disk Dataset properties for the scalar field "
"id in iteration 1")

series.flush()
if 0 == comm.rank:
print("File structure has been written to disk")

chunk_offset = [comm.rank, ]
chunk_extent = [1, ]
id.store_chunk(local_data, chunk_offset, chunk_extent)
if 0 == comm.rank:
print("Stored a single chunk per MPI rank containing its contribution,"
" ready to write content to disk")

series.flush()
if 0 == comm.rank:
print("Dataset content has been fully written to disk")

# The files in 'series' are still open until the object is destroyed, on
# which it cleanly flushes and closes all open file handles.
# One can delete the object explicitly (or let it run out of scope) to
# trigger this.
del series
78 changes: 77 additions & 1 deletion src/binding/python/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,93 @@
*/
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#if openPMD_HAVE_MPI
// re-implemented signatures:
// include <mpi4py/mpi4py.h>
# include <mpi.h>
#endif

#include "openPMD/Series.hpp"
#include <string>

namespace py = pybind11;
using namespace openPMD;

#if openPMD_HAVE_MPI
/** mpi4py communicator wrapper
*
* refs:
* - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/libmpi.pxd#L35-L36
* - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/MPI.pxd#L100-L105
* - installed: include/mpi4py/mpi4py.MPI.h
*/
struct openPMD_PyMPICommObject
{
PyObject_HEAD
MPI_Comm ob_mpi;
unsigned int flags;
};
using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject;
#endif


void init_Series(py::module &m) {
py::class_<Series, Attributable>(m, "Series")

.def(py::init<std::string const&, AccessType>())
.def(py::init<std::string const&, AccessType>(),
py::arg("filepath"), py::arg("access_type"))
#if openPMD_HAVE_MPI
.def(py::init([](std::string const& filepath, AccessType at, py::object &comm){
//! @todo perform mpi4py import test and check min-version
//! careful: double MPI_Init risk? only import mpi4py.MPI?
//! required C-API init? probably just checks:
//! refs:
//! - https://bitbucket.org/mpi4py/mpi4py/src/3.0.0/demo/wrap-c/helloworld.c
//! - installed: include/mpi4py/mpi4py.MPI_api.h
// if( import_mpi4py() < 0 ) { here be dragons }

if( comm.ptr() == Py_None )
throw std::runtime_error("Series: MPI communicator cannot be None.");
if( comm.ptr() == nullptr )
throw std::runtime_error("Series: MPI communicator is a nullptr.");

// check type string to see if this is mpi4py
// __str__ (pretty)
// __repr__ (unambiguous)
// mpi4py: <mpi4py.MPI.Intracomm object at 0x7f998e6e28d0>
// pyMPI: ... (todo)
py::str const comm_pystr = py::repr(comm);
std::string const comm_str = comm_pystr.cast<std::string>();
if( comm_str.substr(0, 12) != std::string("<mpi4py.MPI.") )
throw std::runtime_error("Series: comm is not an mpi4py communicator: " +
comm_str);
// only checks same layout, e.g. an `int` in `PyObject` could pass this
if( !py::isinstance< py::class_<openPMD_PyMPIIntracommObject> >(comm.get_type()) )
//! @todo add mpi4py version from above import check to error message
throw std::runtime_error("Series: comm has unexpected type layout in " +
comm_str +
" (Mismatched MPI at compile vs. runtime? "
"Breaking mpi4py release?)");

//! @todo other possible implementations:
// - pyMPI (inactive since 2008?): import mpi; mpi.WORLD

// reimplementation of mpi4py's:
// MPI_Comm* mpiCommPtr = PyMPIComm_Get(comm.ptr());
MPI_Comm* mpiCommPtr = &((openPMD_PyMPIIntracommObject*)(comm.ptr()))->ob_mpi;

if( PyErr_Occurred() )
throw std::runtime_error("Series: MPI communicator access error.");
if( mpiCommPtr == nullptr ) {
throw std::runtime_error("Series: MPI communicator cast failed. "
"(Mismatched MPI at compile vs. runtime?)");
}

return new Series(filepath, at, *mpiCommPtr);
}),
py::arg("filepath"), py::arg("access_type"), py::arg("mpi_communicator")
)
#endif

.def_property_readonly("openPMD", &Series::openPMD)
.def("set_openPMD", &Series::setOpenPMD)
Expand Down

0 comments on commit 9414398

Please sign in to comment.