Skip to content

Commit

Permalink
Allow parallel I/O for hydro and particle reading
Browse files Browse the repository at this point in the history
  • Loading branch information
cphyc committed Nov 3, 2023
1 parent a16bda2 commit a8d7767
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions yt/frontends/ramses/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from yt.utilities.io_handler import BaseIOHandler
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects
from yt.utilities.physical_ratios import cm_per_km, cm_per_mpc


Expand Down Expand Up @@ -176,11 +177,12 @@ def _read_fluid_selection(self, chunks, selector, fields, size):

# Set of field types
ftypes = {f[0] for f in fields}
for chunk in chunks:

for chunk in parallel_objects(chunks):
# Gather fields by type to minimize i/o operations
for ft in ftypes:
# Get all the fields of the same type
field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields))
field_subs = [field for field in fields if field[0] == ft]

# Loop over subsets
for subset in chunk.objs:
Expand Down Expand Up @@ -209,12 +211,17 @@ def _read_fluid_selection(self, chunks, selector, fields, size):
d.max(),
d.size,
)
tr[(ft, f)].append(d)
tr[ft, f].append(np.atleast_1d(d))

d = {}
for field in fields:
d[field] = np.concatenate(tr.pop(field))
tmp = tr.pop(field, None)
if tmp:
d[field] = np.concatenate(tmp)
else:
d[field] = np.empty(0, dtype="=f8")

return d
return self.ds.index.comm.par_combine_object(d, op="cat")

def _read_particle_coords(self, chunks, ptf):
pn = "particle_position_%s"
Expand Down Expand Up @@ -258,7 +265,8 @@ def _read_particle_fields(self, chunks, ptf, selector):
yield (ptype, field), data

else:
for chunk in chunks:
tr = defaultdict(list)
for chunk in parallel_objects(chunks):
for subset in chunk.objs:
rv = self._read_particle_subset(subset, fields)
for ptype, field_list in sorted(ptf.items()):
Expand All @@ -270,7 +278,19 @@ def _read_particle_fields(self, chunks, ptf, selector):
mask = []
for field in field_list:
data = np.asarray(rv.pop((ptype, field))[mask], "=f8")
yield (ptype, field), data
tr[ptype, field].append(np.atleast_1d(data))

d = {}
for ptype, field_list in sorted(ptf.items()):
for field in field_list:
tmp = tr.pop((ptype, field), None)
if tmp:
d[ptype, field] = np.concatenate(tmp)
else:
d[ptype, field] = np.empty(0, dtype="=f8")

d = self.ds.index.comm.par_combine_object(d, op="cat")
yield from d.items()

def _read_particle_subset(self, subset, fields):
"""Read the particle files."""
Expand Down

0 comments on commit a8d7767

Please sign in to comment.