From 44197776e18a5a4c8e98c834a8f7b46bfe0306cc Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:30:44 +0000 Subject: [PATCH 01/12] Fast seek through file --- yt/frontends/ramses/io_utils.pyx | 56 +++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 7becee63486..3e57a79184c 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -14,6 +14,10 @@ ctypedef np.int32_t INT32_t ctypedef np.int64_t INT64_t ctypedef np.float64_t DOUBLE_t +cdef INT32_SIZE = sizeof(np.int32_t) +cdef INT64_SIZE = sizeof(np.int64_t) +cdef DOUBLE_SIZE = sizeof(np.float64_t) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @@ -134,6 +138,9 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n return offset, level_count +cdef inline int skip_len(int Nskip, int record_len): + return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @@ -154,16 +161,27 @@ def fill_hydro(FortranFile f, cdef dict tmp cdef str field cdef INT64_t twotondim - cdef int ilevel, icpu, ifield, nfields, nlevels, nc, ncpu_selected - cdef np.ndarray[np.uint8_t, ndim=1] mask + cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected twotondim = 2**ndim - nfields = len(all_fields) + nfields_selected = len(fields) nlevels = offsets.shape[1] ncpu_selected = len(cpu_enumerator) - mask = np.array([(field in fields) for field in all_fields], dtype=np.uint8) + cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) + cdef int jump_len + + jump_len = 0 + j = 0 + for i, field in enumerate(all_fields): + if field in fields: + jumps[j] = jump_len + j += 1 + jump_len = 0 + else: + jump_len += 1 + jumps[j] = jump_len # Loop over levels for ilevel in range(nlevels): @@ -176,19 +194,33 @@ def fill_hydro(FortranFile f, if offset == -1: continue f.seek(offset) - tmp = {} # Initialize temporary data container for io # note: we use Fortran ordering to reflect the in-file ordering - for field in all_fields: - tmp[field] = np.empty((nc, twotondim), dtype="float64", order='F') + buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') + jump_len = 0 for i in range(twotondim): # Read the selected fields - for ifield in range(nfields): - if not mask[ifield]: - f.skip() - else: - tmp[all_fields[ifield]][:, i] = f.read_vector('d') # i-th cell + for j in range(nfields_selected): + jump_len += jumps[j] + if jump_len > 0: + f.seek(skip_len(jump_len, nc), 1) + jump_len = 0 + buffer[:, i, j] = f.read_vector('d') + + jump_len += jumps[nfields_selected] + + # In principle, we may be left with some fields to skip + # but since we're doing an absolute seek above, + # we don't need to do anything here + ## if jump_len > 0: + ## f.seek(skip_len(jump_len, nc), 1) + + # Alias buffer into dictionary + tmp = {} + for i, field in enumerate(fields): + tmp[field] = buffer[:, :, i] + if ncpu_selected > 1: oct_handler.fill_level_with_domain( ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1) From bad7f6686da2ef9044e4fbf78a76fc6a2f7df718 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:41:13 +0000 Subject: [PATCH 02/12] Make performance-critical loop C-only --- yt/frontends/ramses/io_utils.pyx | 6 +++-- yt/utilities/cython_fortran_utils.pxd | 1 + yt/utilities/cython_fortran_utils.pyx | 32 +++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 3e57a79184c..5f8fbc284f9 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -138,7 +138,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n return offset, level_count -cdef inline int skip_len(int Nskip, int record_len): +cdef inline int skip_len(int Nskip, int record_len) noexcept: return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) @cython.cpow(True) @@ -162,6 +162,7 @@ def fill_hydro(FortranFile f, cdef str field cdef INT64_t twotondim cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected + cdef int i, j twotondim = 2**ndim nfields_selected = len(fields) @@ -171,6 +172,7 @@ def fill_hydro(FortranFile f, cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) cdef int jump_len + cdef np.ndarray[np.float64_t, ndim=3] buffer jump_len = 0 j = 0 @@ -206,7 +208,7 @@ def fill_hydro(FortranFile f, if jump_len > 0: f.seek(skip_len(jump_len, nc), 1) jump_len = 0 - buffer[:, i, j] = f.read_vector('d') + f.read_vector_inplace('d', &buffer[0, i, j]) jump_len += jumps[nfields_selected] diff --git a/yt/utilities/cython_fortran_utils.pxd b/yt/utilities/cython_fortran_utils.pxd index ab6c2a0dc59..71c35fe1210 100644 --- a/yt/utilities/cython_fortran_utils.pxd +++ b/yt/utilities/cython_fortran_utils.pxd @@ -13,6 +13,7 @@ cdef class FortranFile: cdef INT64_t get_size(self, str dtype) cpdef INT32_t read_int(self) except? -1 cpdef np.ndarray read_vector(self, str dtype) + cdef int read_vector_inplace(self, str dtype, void *data) cpdef INT64_t tell(self) except -1 cpdef INT64_t seek(self, INT64_t pos, INT64_t whence=*) except -1 cpdef void close(self) diff --git a/yt/utilities/cython_fortran_utils.pyx b/yt/utilities/cython_fortran_utils.pyx index aa26add5c16..a1edbc70f4b 100644 --- a/yt/utilities/cython_fortran_utils.pyx +++ b/yt/utilities/cython_fortran_utils.pyx @@ -142,6 +142,38 @@ cdef class FortranFile: return data + cdef int read_vector_inplace(self, str dtype, void *data): + """Reads a record from the file and return it as numpy array. + + Parameters + ---------- + d : data type + This is the datatype (from the struct module) that we should read. + data : void* + The pointer to the data to be read. + + """ + cdef INT32_t s1, s2, size + + if self._closed: + raise ValueError("I/O operation on closed file.") + + size = self.get_size(dtype) + + fread(&s1, INT32_SIZE, 1, self.cfile) + + # Check record is compatible with data type + if s1 % size != 0: + raise ValueError('Size obtained (%s) does not match with the expected ' + 'size (%s) of multi-item record' % (s1, size)) + + fread(data, size, s1 // size, self.cfile) + fread(&s2, INT32_SIZE, 1, self.cfile) + + if s1 != s2: + raise IOError('Sizes do not agree in the header and footer for ' + 'this record - check header dtype') + cpdef INT32_t read_int(self) except? -1: """Reads a single int32 from the file and return it. From ada5ddab4b82bcb9708b7a05a3260d723a35b69d Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 18:51:55 +0000 Subject: [PATCH 03/12] Also make reading offset faster --- yt/frontends/ramses/field_handlers.py | 2 +- yt/frontends/ramses/io_utils.pyx | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/yt/frontends/ramses/field_handlers.py b/yt/frontends/ramses/field_handlers.py index 10860b5afd5..edd0f612f7b 100644 --- a/yt/frontends/ramses/field_handlers.py +++ b/yt/frontends/ramses/field_handlers.py @@ -272,7 +272,7 @@ def offset(self): self.domain.domain_id, self.parameters["nvar"], self.domain.amr_header, - skip_len=nvars * 8, + Nskip=nvars * 8, ) self._offset = offset diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 5f8fbc284f9..831bc4e67c4 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -87,12 +87,16 @@ def read_amr(FortranFile f, dict headers, return max_level + +cdef inline int skip_len(int Nskip, int record_len) noexcept: + return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) + @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) @cython.nonecheck(False) -cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int skip_len): +cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int Nskip): cdef np.ndarray[np.int64_t, ndim=2] offset, level_count cdef INT64_t ndim, twotondim, nlevelmax, n_levels, nboundary, ncpu, ncpu_and_bound @@ -108,8 +112,8 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n ncpu_and_bound = nboundary + ncpu twotondim = 2**ndim - if skip_len == -1: - skip_len = twotondim * nvar + if Nskip == -1: + Nskip = twotondim * nvar # It goes: level, CPU, 8-variable (1 oct) offset = np.full((ncpu_and_bound, n_levels), -1, dtype=np.int64) @@ -134,13 +138,10 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n if ilevel >= min_level: offset_view[icpu, ilevel - min_level] = f.tell() level_count_view[icpu, ilevel - min_level] = file_ncache - f.skip(skip_len) + f.seek(skip_len(Nskip, file_ncache), 1) return offset, level_count -cdef inline int skip_len(int Nskip, int record_len) noexcept: - return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) - @cython.cpow(True) @cython.boundscheck(False) @cython.wraparound(False) From 50711d0f60d537975d30fd9d23317884b66e244e Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Tue, 31 Oct 2023 19:06:21 +0000 Subject: [PATCH 04/12] Make performance-critical functions cpdef'ed --- yt/geometry/oct_container.pxd | 23 +++++++++++++++++++++ yt/geometry/oct_container.pyx | 39 ++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/yt/geometry/oct_container.pxd b/yt/geometry/oct_container.pxd index 5ac9f9318a7..3891f8fdab0 100644 --- a/yt/geometry/oct_container.pxd +++ b/yt/geometry/oct_container.pxd @@ -82,6 +82,29 @@ cdef class OctreeContainer: cdef public object fill_style cdef public int max_level + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = ? + ) + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = ? + ) + cdef class SparseOctreeContainer(OctreeContainer): cdef OctKey *root_nodes cdef void *tree_root diff --git a/yt/geometry/oct_container.pyx b/yt/geometry/oct_container.pyx index 7d3118cb252..4b09d360e51 100644 --- a/yt/geometry/oct_container.pyx +++ b/yt/geometry/oct_container.pyx @@ -742,12 +742,16 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level(self, int level, - np.ndarray[np.uint8_t, ndim=1] levels, - np.ndarray[np.uint8_t, ndim=1] cell_inds, - np.ndarray[np.int64_t, ndim=1] file_inds, - dest_fields, source_fields, - np.int64_t offset = 0): + cpdef void fill_level( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + dict dest_fields, + dict source_fields, + np.int64_t offset = 0 + ): cdef np.ndarray[np.float64_t, ndim=2] source cdef np.ndarray[np.float64_t, ndim=1] dest cdef int i, lvl @@ -824,17 +828,18 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def fill_level_with_domain( - self, int level, - np.uint8_t[:] levels, - np.uint8_t[:] cell_inds, - np.int64_t[:] file_inds, - np.int32_t[:] domains, - dict dest_fields, - dict source_fields, - np.int32_t domain, - np.int64_t offset = 0 - ): + cpdef int fill_level_with_domain( + self, + const int level, + const np.uint8_t[:] levels, + const np.uint8_t[:] cell_inds, + const np.int64_t[:] file_inds, + const np.int32_t[:] domains, + dict dest_fields, + dict source_fields, + const np.int32_t domain, + np.int64_t offset = 0 + ): """Similar to fill_level but accepts a domain argument. This is particularly useful for frontends that have buffer zones at CPU boundaries. From 253f0ead1bf9581071e37413669b9372fa7f3b3e Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 11:04:48 +0000 Subject: [PATCH 05/12] Fix the docstring --- yt/utilities/cython_fortran_utils.pyx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yt/utilities/cython_fortran_utils.pyx b/yt/utilities/cython_fortran_utils.pyx index a1edbc70f4b..fc2008fc419 100644 --- a/yt/utilities/cython_fortran_utils.pyx +++ b/yt/utilities/cython_fortran_utils.pyx @@ -143,15 +143,15 @@ cdef class FortranFile: return data cdef int read_vector_inplace(self, str dtype, void *data): - """Reads a record from the file and return it as numpy array. + """Reads a record from the file. Parameters ---------- d : data type This is the datatype (from the struct module) that we should read. data : void* - The pointer to the data to be read. - + The pointer where to store the data. + It should be preallocated and have the correct size. """ cdef INT32_t s1, s2, size From 57fef2fd35af61d1738ad2458c87502f4150ee21 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 11:27:36 +0000 Subject: [PATCH 06/12] Avoid repetitive allocations --- yt/frontends/ramses/io_utils.pyx | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 831bc4e67c4..76b59d1cc47 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -14,9 +14,9 @@ ctypedef np.int32_t INT32_t ctypedef np.int64_t INT64_t ctypedef np.float64_t DOUBLE_t -cdef INT32_SIZE = sizeof(np.int32_t) -cdef INT64_SIZE = sizeof(np.int64_t) -cdef DOUBLE_SIZE = sizeof(np.float64_t) +cdef int INT32_SIZE = sizeof(np.int32_t) +cdef int INT64_SIZE = sizeof(np.int64_t) +cdef int DOUBLE_SIZE = sizeof(np.float64_t) @cython.cpow(True) @cython.boundscheck(False) @@ -163,7 +163,7 @@ def fill_hydro(FortranFile f, cdef str field cdef INT64_t twotondim cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected - cdef int i, j + cdef int i, j, ii twotondim = 2**ndim nfields_selected = len(fields) @@ -171,7 +171,9 @@ def fill_hydro(FortranFile f, nlevels = offsets.shape[1] ncpu_selected = len(cpu_enumerator) - cdef np.int64_t[:] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) + cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64) + + cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64) cdef int jump_len cdef np.ndarray[np.float64_t, ndim=3] buffer @@ -186,10 +188,12 @@ def fill_hydro(FortranFile f, jump_len += 1 jumps[j] = jump_len + cdef int nc_largest = 0 # Loop over levels for ilevel in range(nlevels): # Loop over cpu domains - for icpu in cpu_enumerator: + for ii in range(ncpu_selected): + icpu = cpu_list[ii] nc = level_count[icpu, ilevel] if nc == 0: continue @@ -199,7 +203,9 @@ def fill_hydro(FortranFile f, f.seek(offset) # Initialize temporary data container for io # note: we use Fortran ordering to reflect the in-file ordering - buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') + if nc > nc_largest: + nc_largest = nc + buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F') jump_len = 0 for i in range(twotondim): From d33e2b130c47a2325bbc7342fde9b84bba905403 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 17:23:19 +0000 Subject: [PATCH 07/12] Make comment more obvious --- yt/frontends/ramses/io_utils.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 76b59d1cc47..cb58ad98c57 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -220,8 +220,8 @@ def fill_hydro(FortranFile f, jump_len += jumps[nfields_selected] # In principle, we may be left with some fields to skip - # but since we're doing an absolute seek above, - # we don't need to do anything here + # but since we're doing an absolute seek at the beginning of + # the loop on CPUs, we can spare one seek here ## if jump_len > 0: ## f.seek(skip_len(jump_len, nc), 1) From a16bda2a47ea639dfdb76233a3e4204164e8e236 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Wed, 1 Nov 2023 17:31:24 +0000 Subject: [PATCH 08/12] Making the skip_len nogil, just to be safe --- yt/frontends/ramses/io_utils.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index cb58ad98c57..335176eff2e 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -88,7 +88,7 @@ def read_amr(FortranFile f, dict headers, return max_level -cdef inline int skip_len(int Nskip, int record_len) noexcept: +cdef inline int skip_len(int Nskip, int record_len) noexcept nogil: return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE) @cython.cpow(True) From a8d7767fd3fac06e04365c0cdf35fdaa4cd3a956 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 11:06:06 +0000 Subject: [PATCH 09/12] Allow parallel I/O for hydro and particle reading --- yt/frontends/ramses/io.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/yt/frontends/ramses/io.py b/yt/frontends/ramses/io.py index a74a24cf1fa..90c7733993d 100644 --- a/yt/frontends/ramses/io.py +++ b/yt/frontends/ramses/io.py @@ -14,6 +14,7 @@ ) from yt.utilities.io_handler import BaseIOHandler from yt.utilities.logger import ytLogger as mylog +from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects from yt.utilities.physical_ratios import cm_per_km, cm_per_mpc @@ -176,11 +177,12 @@ def _read_fluid_selection(self, chunks, selector, fields, size): # Set of field types ftypes = {f[0] for f in fields} - for chunk in chunks: + + for chunk in parallel_objects(chunks): # Gather fields by type to minimize i/o operations for ft in ftypes: # Get all the fields of the same type - field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields)) + field_subs = [field for field in fields if field[0] == ft] # Loop over subsets for subset in chunk.objs: @@ -209,12 +211,17 @@ def _read_fluid_selection(self, chunks, selector, fields, size): d.max(), d.size, ) - tr[(ft, f)].append(d) + tr[ft, f].append(np.atleast_1d(d)) + d = {} for field in fields: - d[field] = np.concatenate(tr.pop(field)) + tmp = tr.pop(field, None) + if tmp: + d[field] = np.concatenate(tmp) + else: + d[field] = np.empty(0, dtype="=f8") - return d + return self.ds.index.comm.par_combine_object(d, op="cat") def _read_particle_coords(self, chunks, ptf): pn = "particle_position_%s" @@ -258,7 +265,8 @@ def _read_particle_fields(self, chunks, ptf, selector): yield (ptype, field), data else: - for chunk in chunks: + tr = defaultdict(list) + for chunk in parallel_objects(chunks): for subset in chunk.objs: rv = self._read_particle_subset(subset, fields) for ptype, field_list in sorted(ptf.items()): @@ -270,7 +278,19 @@ def _read_particle_fields(self, chunks, ptf, selector): mask = [] for field in field_list: data = np.asarray(rv.pop((ptype, field))[mask], "=f8") - yield (ptype, field), data + tr[ptype, field].append(np.atleast_1d(data)) + + d = {} + for ptype, field_list in sorted(ptf.items()): + for field in field_list: + tmp = tr.pop((ptype, field), None) + if tmp: + d[ptype, field] = np.concatenate(tmp) + else: + d[ptype, field] = np.empty(0, dtype="=f8") + + d = self.ds.index.comm.par_combine_object(d, op="cat") + yield from d.items() def _read_particle_subset(self, subset, fields): """Read the particle files.""" From 27a8aa3eb19238bbc5176bcc7e5691bee7236ec1 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 13:43:36 +0000 Subject: [PATCH 10/12] Parallelize at the level of the domains --- yt/frontends/ramses/data_structures.py | 31 +++++++++++++++---- yt/frontends/ramses/io.py | 5 ++- yt/frontends/ramses/io_utils.pyx | 9 ++++-- yt/geometry/oct_container.pxd | 12 +++++++ yt/geometry/oct_container.pyx | 17 ++++++---- .../parallel_analysis_interface.py | 30 ++++++++++++++++-- 6 files changed, 85 insertions(+), 19 deletions(-) diff --git a/yt/frontends/ramses/data_structures.py b/yt/frontends/ramses/data_structures.py index 3c44ecfa6d7..cbf8d1baafb 100644 --- a/yt/frontends/ramses/data_structures.py +++ b/yt/frontends/ramses/data_structures.py @@ -18,6 +18,7 @@ from yt.utilities.cython_fortran_utils import FortranFile as fpu from yt.utilities.lib.cosmology_time import friedman from yt.utilities.on_demand_imports import _f90nml as f90nml +from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects from yt.utilities.physical_constants import kb, mp from yt.utilities.physical_ratios import cm_per_mpc @@ -548,19 +549,37 @@ def _initialize_oct_handler(self): if self.ds._bbox is not None: cpu_list = get_cpu_list(self.dataset, self.dataset._bbox) else: - cpu_list = range(self.dataset["ncpu"]) + cpu_list = list(range(self.dataset["ncpu"])) + + if len(cpu_list) < self.comm.size: + mylog.error( + "The number of MPI tasks is larger than the number of files to read " + "on disk. This is not supported. Try running with at most " + f"{len(cpu_list)} MPI tasks." + ) + self.comm.comm.Abort(2) + + # Important note: we need to use the method="sequential" + # so that the first MPI task gets domains 1, 2, ..., n + # and the second task gets domains n+1, n+2, ..., 2n, etc. + self.domains = [ + RAMSESDomainFile(self.dataset, i + 1) + for i in parallel_objects(cpu_list, method="sequential") + ] + print(self.domains) - self.domains = [RAMSESDomainFile(self.dataset, i + 1) for i in cpu_list] total_octs = sum( dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum() ) + max_level_loc = max(dom.max_level for dom in self.domains) + force_max_level, convention = self.ds._force_max_level if convention == "yt": force_max_level += self.ds.min_level + 1 - self.max_level = min( - force_max_level, max(dom.max_level for dom in self.domains) - ) - self.num_grids = total_octs + + max_level_loc = min(force_max_level, max_level_loc) + self.max_level = self.comm.mpi_allreduce(max_level_loc, op="max") + self.num_grids = self.comm.mpi_allreduce(total_octs, op="sum") def _detect_output_fields(self): dsl = set() diff --git a/yt/frontends/ramses/io.py b/yt/frontends/ramses/io.py index 90c7733993d..f6e503dd793 100644 --- a/yt/frontends/ramses/io.py +++ b/yt/frontends/ramses/io.py @@ -14,7 +14,6 @@ ) from yt.utilities.io_handler import BaseIOHandler from yt.utilities.logger import ytLogger as mylog -from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects from yt.utilities.physical_ratios import cm_per_km, cm_per_mpc @@ -178,7 +177,7 @@ def _read_fluid_selection(self, chunks, selector, fields, size): # Set of field types ftypes = {f[0] for f in fields} - for chunk in parallel_objects(chunks): + for chunk in chunks: # Gather fields by type to minimize i/o operations for ft in ftypes: # Get all the fields of the same type @@ -266,7 +265,7 @@ def _read_particle_fields(self, chunks, ptf, selector): else: tr = defaultdict(list) - for chunk in parallel_objects(chunks): + for chunk in chunks: for subset in chunk.objs: rv = self._read_particle_subset(subset, fields) for ptype, field_list in sorted(ptf.items()): diff --git a/yt/frontends/ramses/io_utils.pyx b/yt/frontends/ramses/io_utils.pyx index 335176eff2e..46ed160b28d 100644 --- a/yt/frontends/ramses/io_utils.pyx +++ b/yt/frontends/ramses/io_utils.pyx @@ -80,8 +80,13 @@ def read_amr(FortranFile f, dict headers, f.skip(skip_len) # Note that we're adding *grids*, not individual cells. if ilevel >= min_level: - n = oct_handler.add(icpu + 1, ilevel - min_level, pos[:ng, :], - count_boundary = 1) + n = oct_handler.add( + icpu + 1, + ilevel - min_level, + pos[:ng, :], + skip_boundary = 1, + count_boundary = 1, + ) if n > 0: max_level = max(ilevel - min_level, max_level) diff --git a/yt/geometry/oct_container.pxd b/yt/geometry/oct_container.pxd index 3891f8fdab0..8b1a7be4595 100644 --- a/yt/geometry/oct_container.pxd +++ b/yt/geometry/oct_container.pxd @@ -82,6 +82,18 @@ cdef class OctreeContainer: cdef public object fill_style cdef public int max_level + cpdef int add( + self, + const int curdom, + const int curlevel, + const np.float64_t[:, ::1] pos, + int skip_boundary = ?, + int count_boundary = ?, + np.uint64_t[::1] levels = ?, + np.int64_t[::1] file_inds = ?, + np.uint8_t[::1] domain_inds = ?, + ) + cpdef void fill_level( self, const int level, diff --git a/yt/geometry/oct_container.pyx b/yt/geometry/oct_container.pyx index 4b09d360e51..6c7b8bea33d 100644 --- a/yt/geometry/oct_container.pyx +++ b/yt/geometry/oct_container.pyx @@ -575,12 +575,17 @@ cdef class OctreeContainer: @cython.boundscheck(False) @cython.wraparound(False) @cython.cdivision(True) - def add(self, int curdom, int curlevel, - np.ndarray[np.float64_t, ndim=2] pos, - int skip_boundary = 1, - int count_boundary = 0, - np.ndarray[np.uint64_t, ndim=1, cast=True] levels = None - ): + cpdef int add( + self, + const int curdom, + const int curlevel, + const np.float64_t[:, ::1] pos, + int skip_boundary = 1, + int count_boundary = 0, + np.uint64_t[::1] levels = None, + np.int64_t[::1] file_inds = None, + np.uint8_t[::1] domain_inds = None + ): # In this function, if we specify curlevel = -1, then we query the # (optional) levels array for the oct level. cdef int no, p, i diff --git a/yt/utilities/parallel_tools/parallel_analysis_interface.py b/yt/utilities/parallel_tools/parallel_analysis_interface.py index bbaad9316da..e3942e394c9 100644 --- a/yt/utilities/parallel_tools/parallel_analysis_interface.py +++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py @@ -5,6 +5,7 @@ import traceback from functools import wraps from io import StringIO +from typing import Literal, Union import numpy as np from more_itertools import always_iterable @@ -466,7 +467,14 @@ class ResultsStorage: result_id = None -def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False): +def parallel_objects( + objects, + njobs=0, + storage=None, + barrier=True, + dynamic=False, + method: Union[Literal["strided"], Literal["sequential"]] = "strided", +): r"""This function dispatches components of an iterable to different processors. @@ -503,6 +511,11 @@ def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False This requires one dedicated processor; if this is enabled with a set of 128 processors available, only 127 will be available to iterate over objects as one will be load balancing the rest. + method : str + This governs how the objects are distributed to processors. The + default, "strided", will assign objects to processors in a round-robin + fashion. The other option, "sequential", will assign objects to + processors in a consecutive fashion. Examples @@ -558,7 +571,20 @@ def parallel_objects(objects, njobs=0, storage=None, barrier=True, dynamic=False to_share = {} # If our objects object is slice-aware, like time series data objects are, # this will prevent intermediate objects from being created. - oiter = itertools.islice(enumerate(objects), my_new_id, None, njobs) + if method == "strided": + oiter = itertools.islice(enumerate(objects), my_new_id, None, njobs) + elif method == "sequential": + chunk_size = len(objects) // njobs + remainder = len(objects) % njobs + + if my_new_id < remainder: + istart = my_new_id * (chunk_size + 1) + iend = istart + chunk_size + 1 + else: + istart = (chunk_size + 1) * remainder + (my_new_id - remainder) * chunk_size + iend = istart + chunk_size + + oiter = itertools.islice(enumerate(objects), istart, iend, 1) for result_id, obj in oiter: if storage is not None: rstore = ResultsStorage() From 2bb07abc779853c2a0598bfacb4a7fe43b978a9d Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 14:21:11 +0000 Subject: [PATCH 11/12] Gather npart/levels --- yt/frontends/ramses/data_structures.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/yt/frontends/ramses/data_structures.py b/yt/frontends/ramses/data_structures.py index cbf8d1baafb..94359652013 100644 --- a/yt/frontends/ramses/data_structures.py +++ b/yt/frontends/ramses/data_structures.py @@ -641,6 +641,7 @@ def _chunk_io(self, dobj, cache=True, local_only=False): def _initialize_level_stats(self): levels = sum(dom.level_count for dom in self.domains) + levels = self.comm.mpi_allreduce(levels, op="sum") desc = {"names": ["numcells", "level"], "formats": ["int64"] * 2} max_level = self.dataset.min_level + self.dataset.max_level + 2 self.level_stats = blankRecordArray(desc, max_level) @@ -663,6 +664,8 @@ def _get_particle_type_counts(self): count = fh.local_particle_count npart[fh.ptype] += count + npart = self.comm.par_combine_object(npart, op="sum") + return npart def print_stats(self): From c9fc44d32bb0abffe5d6e18e86774babbd7c9f25 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 14:21:38 +0000 Subject: [PATCH 12/12] Transport with as close-a-type as possible --- yt/frontends/ramses/data_structures.py | 1 - .../parallel_analysis_interface.py | 40 +++++++++++++------ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/yt/frontends/ramses/data_structures.py b/yt/frontends/ramses/data_structures.py index 94359652013..15042099f7f 100644 --- a/yt/frontends/ramses/data_structures.py +++ b/yt/frontends/ramses/data_structures.py @@ -566,7 +566,6 @@ def _initialize_oct_handler(self): RAMSESDomainFile(self.dataset, i + 1) for i in parallel_objects(cpu_list, method="sequential") ] - print(self.domains) total_octs = sum( dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum() diff --git a/yt/utilities/parallel_tools/parallel_analysis_interface.py b/yt/utilities/parallel_tools/parallel_analysis_interface.py index e3942e394c9..4ded4c52d73 100644 --- a/yt/utilities/parallel_tools/parallel_analysis_interface.py +++ b/yt/utilities/parallel_tools/parallel_analysis_interface.py @@ -5,7 +5,7 @@ import traceback from functools import wraps from io import StringIO -from typing import Literal, Union +from typing import Any, Literal, Union import numpy as np from more_itertools import always_iterable @@ -166,6 +166,15 @@ def get_mpi_type(dtype): return val +def get_transport_dtype(dtype: type) -> Union[Any, str]: + mpi_type = get_mpi_type(dtype) + if mpi_type is None: + # Fallback to char + return MPI.CHAR, "c" + else: + return mpi_type, dtype.str + + class ObjectIterator: """ This is a generalized class that accepts a list of objects and then @@ -749,7 +758,6 @@ class Communicator: comm = None _grids = None _distributed = None - __tocast = "c" def __init__(self, comm=None): self.comm = comm @@ -1127,10 +1135,12 @@ def merge_quadtree_buffers(self, qt, merge_style): def send_array(self, arr, dest, tag=0): if not isinstance(arr, np.ndarray): - self.comm.send((None, None), dest=dest, tag=tag) + self.comm.send((None, None, None), dest=dest, tag=tag) self.comm.send(arr, dest=dest, tag=tag) return - tmp = arr.view(self.__tocast) # Cast to CHAR + mpi_type, transport_dtype = get_transport_dtype(arr.dtype) + tmp = arr.view(transport_dtype) + # communicate type and shape and optionally units if isinstance(arr, YTArray): unit_metadata = (str(arr.units), arr.units.registry.lut) @@ -1140,13 +1150,17 @@ def send_array(self, arr, dest, tag=0): unit_metadata += ("YTArray",) else: unit_metadata = () - self.comm.send((arr.dtype.str, arr.shape) + unit_metadata, dest=dest, tag=tag) - self.comm.Send([arr, MPI.CHAR], dest=dest, tag=tag) + self.comm.send( + (arr.dtype.str, arr.shape, transport_dtype) + unit_metadata, + dest=dest, + tag=tag, + ) + self.comm.Send([arr, mpi_type], dest=dest, tag=tag) del tmp def recv_array(self, source, tag=0): metadata = self.comm.recv(source=source, tag=tag) - dt, ne = metadata[:2] + dt, ne, transport_dt = metadata[:3] if ne is None and dt is None: return self.comm.recv(source=source, tag=tag) arr = np.empty(ne, dtype=dt) @@ -1156,8 +1170,9 @@ def recv_array(self, source, tag=0): arr = ImageArray(arr, units=metadata[2], registry=registry) else: arr = YTArray(arr, metadata[2], registry=registry) - tmp = arr.view(self.__tocast) - self.comm.Recv([tmp, MPI.CHAR], source=source, tag=tag) + mpi_type = get_mpi_type(transport_dt) + tmp = arr.view(transport_dt) + self.comm.Recv([tmp, mpi_type], source=source, tag=tag) return arr def alltoallv_array(self, send, total_size, offsets, sizes): @@ -1170,7 +1185,8 @@ def alltoallv_array(self, send, total_size, offsets, sizes): recv = np.array(recv) return recv offset = offsets[self.comm.rank] - tmp_send = send.view(self.__tocast) + mpi_type, transport_dtype = get_transport_dtype(send.dtype) + tmp_send = send.view(transport_dtype) recv = np.empty(total_size, dtype=send.dtype) if isinstance(send, YTArray): # We assume send.units is consistent with the units @@ -1183,9 +1199,9 @@ def alltoallv_array(self, send, total_size, offsets, sizes): dtr = send.dtype.itemsize / tmp_send.dtype.itemsize # > 1 roff = [off * dtr for off in offsets] rsize = [siz * dtr for siz in sizes] - tmp_recv = recv.view(self.__tocast) + tmp_recv = recv.view(transport_dtype) self.comm.Allgatherv( - (tmp_send, tmp_send.size, MPI.CHAR), (tmp_recv, (rsize, roff), MPI.CHAR) + (tmp_send, tmp_send.size, mpi_type), (tmp_recv, (rsize, roff), mpi_type) ) return recv