Skip to content

Commit

Permalink
Merge pull request cholla-hydro#403 from mabruzzo/ergonomic_concat
Browse files Browse the repository at this point in the history
Ergonomic improvements to concatenation scripts
  • Loading branch information
evaneschneider authored Jul 8, 2024
2 parents cb3d2d5 + aaf784b commit 43f381b
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 74 deletions.
147 changes: 147 additions & 0 deletions python_scripts/concat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python3
"""
Command line program that provides a unified interface for concatenating 3D
hdf5 data dumps produced by Cholla.
At the moment, we only support fluid quantities. In the future, we could
support other types of fields.
"""

import numpy as np

import argparse
import datetime
from functools import partial
import pathlib

import concat_internals
from concat_2d_data import concat_2d_dataset
from concat_3d_data import concat_3d_dataset

parser = argparse.ArgumentParser(
description = ("Concatenates HDF5 ouputs produced by Cholla")
)
concat_internals.add_common_cli_args(parser, num_processes_choice = 'omit',
add_concat_outputs_arg = False)

_2D_kinds = ("proj", "slice", "rot_proj")

parser.add_argument("--kind", nargs = "+", required = True,
help = (
"specifies the types of hdf5 files that are to be concatenated. This "
f"can be `3D` or the name of a 2D dataset `{'`, `'.join(_2D_kinds)}`. "
"For a 2D dataset like 'proj', you can append a suffix (e.g. `-xy`, "
"`-yz`, `-xz`) or a series of suffixes (e.g. `-xy,yz`) to specify "
"only a subset of the datasets should be concatenated"))

def _try_identify_2D_kind_kwargs(kind):
# try to identify the 2d dataset-kind and any associated kwargs for
# concat_2D_dataset

prefix = None
for k in _2D_kinds:
if kind.startswith(k):
prefix = k
break
else: # this get's executed if we don't break out of for-loop
return None

suffix = kind[len(prefix):]
tmp = {'concat_xy' : False, 'concat_yz' : False, 'concat_xz' : False}
if suffix in ['', '-xy,yz,xz', '-xy,xz,yz', '-yz,xy,xz', '-xz,xy,yz',
'-yz,xz,xy', '-xz,yz,xy']:
for key in tmp:
tmp[key] = True
elif suffix == '-xy':
tmp['concat_xy'] = True
elif suffix == '-xz':
tmp['concat_xz'] = True
elif suffix == '-yz':
tmp['concat_yz'] = True
elif suffix in ['-xy,xz', '-xz,xy']:
tmp['concat_xy'] = True
tmp['concat_xz'] = True
elif kind in ['-xy,yz', '-yz,xy']:
tmp['concat_xy'] = True
tmp['concat_yz'] = True
elif kind in ['-xz,yz', '-yz,xz']:
tmp['concat_xz'] = True
tmp['concat_yz'] = True
else:
raise ValueError(f"{kind} has an invalid suffix")
return prefix, tmp

def _handle_kinds_processing(kind_l: list):
encountered_3D = False
encountered_2D_kind_set = set()
kindkw_2D_pairs = []

for kind in kind_l:
if kind == '3D':
if encountered_3D:
raise ValueError("3D kind appears more than once")
encountered_3D = True
continue
# try to treat kind as 2D kind
pair = _try_identify_2D_kind_kwargs(kind)
if pair is not None:
if pair[0] in encountered_2D_kind_set:
raise ValueError(f"{kind} appears more than once")
encountered_2D_kind_set.add(pair[0])
kindkw_2D_pairs.append(pair)
else:
raise ValueError(f"{kind} is a totally unrecognized dataset-kind")
return encountered_3D, kindkw_2D_pairs

def main(args):
handle_3D, kindkw_2D_pairs = _handle_kinds_processing(args.kind)
assert handle_3D or len(kindkw_2D_pairs) > 0 # sanity-check

# create a function to build_source_paths
if handle_3D:
_temp_pre_extension_suffix = ''
else:
_temp_pre_extension_suffix = f'_{kindkw_2D_pairs[0][0]}'

temp_build_source_path = concat_internals.get_source_path_builder(
source_directory = args.source_directory,
pre_extension_suffix = _temp_pre_extension_suffix,
known_output_snap = args.concat_outputs[0])

# construct a list of concatenation commands performed at each output
command_triples = []
if handle_3D:
command_triples.append(
('3D',
partial(temp_build_source_path, pre_extension_suffix = ''),
concat_3d_dataset)
)
for kind_2D, kwargs in kindkw_2D_pairs:
command_triples.append(
(kind_2D,
partial(temp_build_source_path,
pre_extension_suffix = f'_{kind_2D}'),
partial(concat_2d_dataset, dataset_kind=kind_2D, **kwargs))
)

#raise RuntimeError(repr(command_triples))

for output in args.concat_outputs:
print(f"concatenating {output}")
for dset_kind, build_source_path, concat_fn in command_triples:
t1 = datetime.datetime.now()
concat_fn(output_directory=args.output_directory,
output_number=output,
build_source_path = build_source_path,
skip_fields=args.skip_fields,
destination_dtype=args.dtype,
compression_type=args.compression_type,
compression_options=args.compression_opts,
chunking=args.chunking)
t2 = datetime.datetime.now()
print(f' -> {dset_kind!r}: {(t2 - t1).total_seconds()}')



if __name__ == '__main__':
main(parser.parse_args())
66 changes: 31 additions & 35 deletions python_scripts/concat_2d_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
"""

import h5py
import pathlib
import numpy as np

import pathlib
from typing import Optional
import warnings

import concat_internals

# ==============================================================================
def concat_2d_dataset(output_directory: pathlib.Path,
num_processes: int,
output_number: int,
dataset_kind: str,
build_source_path,
*,
num_processes: Optional[int] = None,
concat_xy: bool = True,
concat_yz: bool = True,
concat_xz: bool = True,
Expand All @@ -42,15 +46,16 @@ def concat_2d_dataset(output_directory: pathlib.Path,
Parameters
----------
output_directory : pathlib.Path
The directory containing the new concatenated files
num_processes : int
The number of ranks that Cholla was run with
The directory containing the new concatenated files
output_number : int
The output number to concatenate
dataset_kind : str
The type of 2D dataset to concatenate. Can be 'slice', 'proj', or 'rot_proj'.
build_source_path : callable
A function used to construct the paths to the files that are to be concatenated.
num_processes : int, optional
The number of ranks that Cholla was run with. This information is now inferred
from the hdf5 file and the parameter will be removed in the future.
concat_xy : bool
If True then concatenate the XY slices/projections. Defaults to True.
concat_yz : bool
Expand All @@ -67,44 +72,33 @@ def concat_2d_dataset(output_directory: pathlib.Path,
What compression settings to use if compressing. Defaults to None.
chunking : bool or tuple
Whether or not to use chunking and the chunk size. Defaults to None.
output_directory: pathlib.Path :
num_processes: int :
output_number: int :
dataset_kind: str :
concat_xy: bool :
(Default value = True)
concat_yz: bool :
(Default value = True)
concat_xz: bool :
(Default value = True)
skip_fields: list :
(Default value = [])
destination_dtype: np.dtype :
(Default value = None)
compression_type: str :
(Default value = None)
compression_options: str :
(Default value = None)
Returns
-------
"""

if num_processes is not None:
warnings.warn('the num_processes parameter will be removed')

# Error checking
assert num_processes > 1, 'num_processes must be greater than 1'
assert output_number >= 0, 'output_number must be greater than or equal to 0'
assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".'

# Open destination file
destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}_{dataset_kind}.h5')

# Setup the destination file
with h5py.File(build_source_path(proc_id = 0, nfile = output_number), 'r') as source_file:
source_fname_0 = build_source_path(proc_id = 0, nfile = output_number)
with h5py.File(source_fname_0, 'r') as source_file:

# determine how many files data must be concatenated from
num_files = concat_internals.infer_numfiles_from_header(source_file.attrs)

if (num_processes is not None) and (num_processes != num_files):
raise RuntimeError(
f"header of {source_fname_0!r} implies that it contains data that is "
f"split across {num_files} files (rather than {num_processes} files).")
elif num_files < 2:
raise RuntimeError('it only makes sense to concatenate data split across '
'2 or more files')

# Copy over header
destination_file = concat_internals.copy_header(source_file, destination_file)

Expand Down Expand Up @@ -139,8 +133,10 @@ def concat_2d_dataset(output_directory: pathlib.Path,
compression=compression_type,
compression_opts=compression_options)



# Copy data
for rank in range(num_processes):
for rank in range(num_files):
# Open source file
source_file = h5py.File(build_source_path(proc_id = rank, nfile = output_number), 'r')

Expand Down Expand Up @@ -240,7 +236,7 @@ def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str) -> tuple:
from timeit import default_timer
start = default_timer()

cli = concat_internals.common_cli()
cli = concat_internals.common_cli(num_processes_choice = 'deprecate')
cli.add_argument('-d', '--dataset-kind', type=str, required=True, help='What kind of 2D dataset to concatnate. Options are "slice", "proj", and "rot_proj"')
cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY datasets.')
cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ datasets.')
Expand Down
50 changes: 27 additions & 23 deletions python_scripts/concat_3d_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@

import h5py
import numpy as np

import pathlib
from typing import Optional
import warnings

import concat_internals

# ==============================================================================
def concat_3d_dataset(output_directory: pathlib.Path,
num_processes: int,
output_number: int,
build_source_path,
*,
num_processes: Optional[int] = None,
skip_fields: list = [],
destination_dtype: np.dtype = None,
compression_type: str = None,
Expand All @@ -43,6 +47,9 @@ def concat_3d_dataset(output_directory: pathlib.Path,
List of fields to skip concatenating. Defaults to [].
build_source_path : callable
A function used to construct the paths to the files that are to be concatenated.
num_processes : int, optional
The number of ranks that Cholla was run with. This information is now inferred
from the hdf5 file and the parameter will be removed in the future.
destination_dtype : np.dtype
The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets.
compression_type : str
Expand All @@ -51,35 +58,32 @@ def concat_3d_dataset(output_directory: pathlib.Path,
What compression settings to use if compressing. Defaults to None.
chunking : bool or tuple
Whether or not to use chunking and the chunk size. Defaults to None.
output_directory: pathlib.Path :
num_processes: int :
output_number: int :
skip_fields: list :
(Default value = [])
destination_dtype: np.dtype :
(Default value = None)
compression_type: str :
(Default value = None)
compression_options: str :
(Default value = None)
Returns
-------
"""

if num_processes is not None:
warnings.warn('the num_processes parameter will be removed')

# Error checking
assert num_processes > 1, 'num_processes must be greater than 1'
assert output_number >= 0, 'output_number must be greater than or equal to 0'

# Open the output file for writing
destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}.h5')

# Setup the output file
with h5py.File(build_source_path(proc_id = 0, nfile = output_number), 'r') as source_file:
source_fname_0 = build_source_path(proc_id = 0, nfile = output_number)
with h5py.File(source_fname_0, 'r') as source_file:

# determine how many files data must be concatenated from
num_files = concat_internals.infer_numfiles_from_header(source_file.attrs)

if (num_processes is not None) and (num_processes != num_files):
raise RuntimeError(
f"header of {source_fname_0!r} implies that it contains data that is "
f"split across {num_files} files (rather than {num_processes} files).")
elif num_files < 2:
raise RuntimeError('it only makes sense to concatenate data split across '
'2 or more files')

# Copy header data
destination_file = concat_internals.copy_header(source_file, destination_file)

Expand All @@ -104,7 +108,7 @@ def concat_3d_dataset(output_directory: pathlib.Path,
compression_opts=compression_options)

# loop over files for a given output
for i in range(0, num_processes):
for i in range(0, num_files):
# open the input file for reading
source_file = h5py.File(build_source_path(proc_id = i, nfile = output_number), 'r')

Expand Down Expand Up @@ -135,7 +139,7 @@ def concat_3d_dataset(output_directory: pathlib.Path,
from timeit import default_timer
start = default_timer()

cli = concat_internals.common_cli()
cli = concat_internals.common_cli(num_processes_choice = 'deprecate')
args = cli.parse_args()

build_source_path = concat_internals.get_source_path_builder(
Expand Down
Loading

0 comments on commit 43f381b

Please sign in to comment.