From a8d7767fd3fac06e04365c0cdf35fdaa4cd3a956 Mon Sep 17 00:00:00 2001 From: Corentin Cadiou Date: Fri, 3 Nov 2023 11:06:06 +0000 Subject: [PATCH] Allow parallel I/O for hydro and particle reading --- yt/frontends/ramses/io.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/yt/frontends/ramses/io.py b/yt/frontends/ramses/io.py index a74a24cf1fa..90c7733993d 100644 --- a/yt/frontends/ramses/io.py +++ b/yt/frontends/ramses/io.py @@ -14,6 +14,7 @@ ) from yt.utilities.io_handler import BaseIOHandler from yt.utilities.logger import ytLogger as mylog +from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects from yt.utilities.physical_ratios import cm_per_km, cm_per_mpc @@ -176,11 +177,12 @@ def _read_fluid_selection(self, chunks, selector, fields, size): # Set of field types ftypes = {f[0] for f in fields} - for chunk in chunks: + + for chunk in parallel_objects(chunks): # Gather fields by type to minimize i/o operations for ft in ftypes: # Get all the fields of the same type - field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields)) + field_subs = [field for field in fields if field[0] == ft] # Loop over subsets for subset in chunk.objs: @@ -209,12 +211,17 @@ def _read_fluid_selection(self, chunks, selector, fields, size): d.max(), d.size, ) - tr[(ft, f)].append(d) + tr[ft, f].append(np.atleast_1d(d)) + d = {} for field in fields: - d[field] = np.concatenate(tr.pop(field)) + tmp = tr.pop(field, None) + if tmp: + d[field] = np.concatenate(tmp) + else: + d[field] = np.empty(0, dtype="=f8") - return d + return self.ds.index.comm.par_combine_object(d, op="cat") def _read_particle_coords(self, chunks, ptf): pn = "particle_position_%s" @@ -258,7 +265,8 @@ def _read_particle_fields(self, chunks, ptf, selector): yield (ptype, field), data else: - for chunk in chunks: + tr = defaultdict(list) + for chunk in parallel_objects(chunks): for subset in chunk.objs: rv = self._read_particle_subset(subset, fields) for ptype, field_list in sorted(ptf.items()): @@ -270,7 +278,19 @@ def _read_particle_fields(self, chunks, ptf, selector): mask = [] for field in field_list: data = np.asarray(rv.pop((ptype, field))[mask], "=f8") - yield (ptype, field), data + tr[ptype, field].append(np.atleast_1d(data)) + + d = {} + for ptype, field_list in sorted(ptf.items()): + for field in field_list: + tmp = tr.pop((ptype, field), None) + if tmp: + d[ptype, field] = np.concatenate(tmp) + else: + d[ptype, field] = np.empty(0, dtype="=f8") + + d = self.ds.index.comm.par_combine_object(d, op="cat") + yield from d.items() def _read_particle_subset(self, subset, fields): """Read the particle files."""