diff --git a/python_scripts/concat.py b/python_scripts/concat.py new file mode 100644 index 000000000..7e106bf54 --- /dev/null +++ b/python_scripts/concat.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +Command line program that provides a unified interface for concatenating 3D +hdf5 data dumps produced by Cholla. + +At the moment, we only support fluid quantities. In the future, we could +support other types of fields. +""" + +import numpy as np + +import argparse +import datetime +from functools import partial +import pathlib + +import concat_internals +from concat_2d_data import concat_2d_dataset +from concat_3d_data import concat_3d_dataset + +parser = argparse.ArgumentParser( + description = ("Concatenates HDF5 ouputs produced by Cholla") +) +concat_internals.add_common_cli_args(parser, num_processes_choice = 'omit', + add_concat_outputs_arg = False) + +_2D_kinds = ("proj", "slice", "rot_proj") + +parser.add_argument("--kind", nargs = "+", required = True, + help = ( + "specifies the types of hdf5 files that are to be concatenated. This " + f"can be `3D` or the name of a 2D dataset `{'`, `'.join(_2D_kinds)}`. " + "For a 2D dataset like 'proj', you can append a suffix (e.g. `-xy`, " + "`-yz`, `-xz`) or a series of suffixes (e.g. `-xy,yz`) to specify " + "only a subset of the datasets should be concatenated")) + +def _try_identify_2D_kind_kwargs(kind): + # try to identify the 2d dataset-kind and any associated kwargs for + # concat_2D_dataset + + prefix = None + for k in _2D_kinds: + if kind.startswith(k): + prefix = k + break + else: # this get's executed if we don't break out of for-loop + return None + + suffix = kind[len(prefix):] + tmp = {'concat_xy' : False, 'concat_yz' : False, 'concat_xz' : False} + if suffix in ['', '-xy,yz,xz', '-xy,xz,yz', '-yz,xy,xz', '-xz,xy,yz', + '-yz,xz,xy', '-xz,yz,xy']: + for key in tmp: + tmp[key] = True + elif suffix == '-xy': + tmp['concat_xy'] = True + elif suffix == '-xz': + tmp['concat_xz'] = True + elif suffix == '-yz': + tmp['concat_yz'] = True + elif suffix in ['-xy,xz', '-xz,xy']: + tmp['concat_xy'] = True + tmp['concat_xz'] = True + elif kind in ['-xy,yz', '-yz,xy']: + tmp['concat_xy'] = True + tmp['concat_yz'] = True + elif kind in ['-xz,yz', '-yz,xz']: + tmp['concat_xz'] = True + tmp['concat_yz'] = True + else: + raise ValueError(f"{kind} has an invalid suffix") + return prefix, tmp + +def _handle_kinds_processing(kind_l: list): + encountered_3D = False + encountered_2D_kind_set = set() + kindkw_2D_pairs = [] + + for kind in kind_l: + if kind == '3D': + if encountered_3D: + raise ValueError("3D kind appears more than once") + encountered_3D = True + continue + # try to treat kind as 2D kind + pair = _try_identify_2D_kind_kwargs(kind) + if pair is not None: + if pair[0] in encountered_2D_kind_set: + raise ValueError(f"{kind} appears more than once") + encountered_2D_kind_set.add(pair[0]) + kindkw_2D_pairs.append(pair) + else: + raise ValueError(f"{kind} is a totally unrecognized dataset-kind") + return encountered_3D, kindkw_2D_pairs + +def main(args): + handle_3D, kindkw_2D_pairs = _handle_kinds_processing(args.kind) + assert handle_3D or len(kindkw_2D_pairs) > 0 # sanity-check + + # create a function to build_source_paths + if handle_3D: + _temp_pre_extension_suffix = '' + else: + _temp_pre_extension_suffix = f'_{kindkw_2D_pairs[0][0]}' + + temp_build_source_path = concat_internals.get_source_path_builder( + source_directory = args.source_directory, + pre_extension_suffix = _temp_pre_extension_suffix, + known_output_snap = args.concat_outputs[0]) + + # construct a list of concatenation commands performed at each output + command_triples = [] + if handle_3D: + command_triples.append( + ('3D', + partial(temp_build_source_path, pre_extension_suffix = ''), + concat_3d_dataset) + ) + for kind_2D, kwargs in kindkw_2D_pairs: + command_triples.append( + (kind_2D, + partial(temp_build_source_path, + pre_extension_suffix = f'_{kind_2D}'), + partial(concat_2d_dataset, dataset_kind=kind_2D, **kwargs)) + ) + + #raise RuntimeError(repr(command_triples)) + + for output in args.concat_outputs: + print(f"concatenating {output}") + for dset_kind, build_source_path, concat_fn in command_triples: + t1 = datetime.datetime.now() + concat_fn(output_directory=args.output_directory, + output_number=output, + build_source_path = build_source_path, + skip_fields=args.skip_fields, + destination_dtype=args.dtype, + compression_type=args.compression_type, + compression_options=args.compression_opts, + chunking=args.chunking) + t2 = datetime.datetime.now() + print(f' -> {dset_kind!r}: {(t2 - t1).total_seconds()}') + + + +if __name__ == '__main__': + main(parser.parse_args()) \ No newline at end of file diff --git a/python_scripts/concat_2d_data.py b/python_scripts/concat_2d_data.py index 9c4e0dd86..6db15c03e 100755 --- a/python_scripts/concat_2d_data.py +++ b/python_scripts/concat_2d_data.py @@ -15,17 +15,21 @@ """ import h5py -import pathlib import numpy as np +import pathlib +from typing import Optional +import warnings + import concat_internals # ============================================================================== def concat_2d_dataset(output_directory: pathlib.Path, - num_processes: int, output_number: int, dataset_kind: str, build_source_path, + *, + num_processes: Optional[int] = None, concat_xy: bool = True, concat_yz: bool = True, concat_xz: bool = True, @@ -42,15 +46,16 @@ def concat_2d_dataset(output_directory: pathlib.Path, Parameters ---------- output_directory : pathlib.Path - The directory containing the new concatenated files - num_processes : int - The number of ranks that Cholla was run with + The directory containing the new concatenated files output_number : int The output number to concatenate dataset_kind : str The type of 2D dataset to concatenate. Can be 'slice', 'proj', or 'rot_proj'. build_source_path : callable A function used to construct the paths to the files that are to be concatenated. + num_processes : int, optional + The number of ranks that Cholla was run with. This information is now inferred + from the hdf5 file and the parameter will be removed in the future. concat_xy : bool If True then concatenate the XY slices/projections. Defaults to True. concat_yz : bool @@ -67,36 +72,12 @@ def concat_2d_dataset(output_directory: pathlib.Path, What compression settings to use if compressing. Defaults to None. chunking : bool or tuple Whether or not to use chunking and the chunk size. Defaults to None. - output_directory: pathlib.Path : - - num_processes: int : - - output_number: int : - - dataset_kind: str : - - concat_xy: bool : - (Default value = True) - concat_yz: bool : - (Default value = True) - concat_xz: bool : - (Default value = True) - skip_fields: list : - (Default value = []) - destination_dtype: np.dtype : - (Default value = None) - compression_type: str : - (Default value = None) - compression_options: str : - (Default value = None) - - Returns - ------- - """ + if num_processes is not None: + warnings.warn('the num_processes parameter will be removed') + # Error checking - assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' assert dataset_kind in ['slice', 'proj', 'rot_proj'], '`dataset_kind` can only be one of "slice", "proj", "rot_proj".' @@ -104,7 +85,20 @@ def concat_2d_dataset(output_directory: pathlib.Path, destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}_{dataset_kind}.h5') # Setup the destination file - with h5py.File(build_source_path(proc_id = 0, nfile = output_number), 'r') as source_file: + source_fname_0 = build_source_path(proc_id = 0, nfile = output_number) + with h5py.File(source_fname_0, 'r') as source_file: + + # determine how many files data must be concatenated from + num_files = concat_internals.infer_numfiles_from_header(source_file.attrs) + + if (num_processes is not None) and (num_processes != num_files): + raise RuntimeError( + f"header of {source_fname_0!r} implies that it contains data that is " + f"split across {num_files} files (rather than {num_processes} files).") + elif num_files < 2: + raise RuntimeError('it only makes sense to concatenate data split across ' + '2 or more files') + # Copy over header destination_file = concat_internals.copy_header(source_file, destination_file) @@ -139,8 +133,10 @@ def concat_2d_dataset(output_directory: pathlib.Path, compression=compression_type, compression_opts=compression_options) + + # Copy data - for rank in range(num_processes): + for rank in range(num_files): # Open source file source_file = h5py.File(build_source_path(proc_id = rank, nfile = output_number), 'r') @@ -240,7 +236,7 @@ def __write_bounds_2d_dataset(source_file: h5py.File, dataset: str) -> tuple: from timeit import default_timer start = default_timer() - cli = concat_internals.common_cli() + cli = concat_internals.common_cli(num_processes_choice = 'deprecate') cli.add_argument('-d', '--dataset-kind', type=str, required=True, help='What kind of 2D dataset to concatnate. Options are "slice", "proj", and "rot_proj"') cli.add_argument('--disable-xy', default=True, action='store_false', help='Disables concating the XY datasets.') cli.add_argument('--disable-yz', default=True, action='store_false', help='Disables concating the YZ datasets.') diff --git a/python_scripts/concat_3d_data.py b/python_scripts/concat_3d_data.py index 1d5ba8228..4e7865f6f 100755 --- a/python_scripts/concat_3d_data.py +++ b/python_scripts/concat_3d_data.py @@ -14,15 +14,19 @@ import h5py import numpy as np + import pathlib +from typing import Optional +import warnings import concat_internals # ============================================================================== def concat_3d_dataset(output_directory: pathlib.Path, - num_processes: int, output_number: int, build_source_path, + *, + num_processes: Optional[int] = None, skip_fields: list = [], destination_dtype: np.dtype = None, compression_type: str = None, @@ -43,6 +47,9 @@ def concat_3d_dataset(output_directory: pathlib.Path, List of fields to skip concatenating. Defaults to []. build_source_path : callable A function used to construct the paths to the files that are to be concatenated. + num_processes : int, optional + The number of ranks that Cholla was run with. This information is now inferred + from the hdf5 file and the parameter will be removed in the future. destination_dtype : np.dtype The data type of the output datasets. Accepts most numpy types. Defaults to the same as the input datasets. compression_type : str @@ -51,35 +58,32 @@ def concat_3d_dataset(output_directory: pathlib.Path, What compression settings to use if compressing. Defaults to None. chunking : bool or tuple Whether or not to use chunking and the chunk size. Defaults to None. - output_directory: pathlib.Path : - - num_processes: int : - - output_number: int : - - skip_fields: list : - (Default value = []) - destination_dtype: np.dtype : - (Default value = None) - compression_type: str : - (Default value = None) - compression_options: str : - (Default value = None) - - Returns - ------- - """ + if num_processes is not None: + warnings.warn('the num_processes parameter will be removed') + # Error checking - assert num_processes > 1, 'num_processes must be greater than 1' assert output_number >= 0, 'output_number must be greater than or equal to 0' # Open the output file for writing destination_file = concat_internals.destination_safe_open(output_directory / f'{output_number}.h5') # Setup the output file - with h5py.File(build_source_path(proc_id = 0, nfile = output_number), 'r') as source_file: + source_fname_0 = build_source_path(proc_id = 0, nfile = output_number) + with h5py.File(source_fname_0, 'r') as source_file: + + # determine how many files data must be concatenated from + num_files = concat_internals.infer_numfiles_from_header(source_file.attrs) + + if (num_processes is not None) and (num_processes != num_files): + raise RuntimeError( + f"header of {source_fname_0!r} implies that it contains data that is " + f"split across {num_files} files (rather than {num_processes} files).") + elif num_files < 2: + raise RuntimeError('it only makes sense to concatenate data split across ' + '2 or more files') + # Copy header data destination_file = concat_internals.copy_header(source_file, destination_file) @@ -104,7 +108,7 @@ def concat_3d_dataset(output_directory: pathlib.Path, compression_opts=compression_options) # loop over files for a given output - for i in range(0, num_processes): + for i in range(0, num_files): # open the input file for reading source_file = h5py.File(build_source_path(proc_id = i, nfile = output_number), 'r') @@ -135,7 +139,7 @@ def concat_3d_dataset(output_directory: pathlib.Path, from timeit import default_timer start = default_timer() - cli = concat_internals.common_cli() + cli = concat_internals.common_cli(num_processes_choice = 'deprecate') args = cli.parse_args() build_source_path = concat_internals.get_source_path_builder( diff --git a/python_scripts/concat_internals.py b/python_scripts/concat_internals.py index bc615012e..0990ebcec 100755 --- a/python_scripts/concat_internals.py +++ b/python_scripts/concat_internals.py @@ -4,9 +4,17 @@ """ import h5py +import numpy as np + import argparse import functools import pathlib +import re +import warnings + +# imports for type annotations: +from collections.abc import Mapping +from typing import Optional # ============================================================================== def destination_safe_open(filename: pathlib.Path) -> h5py.File: @@ -39,6 +47,38 @@ def destination_safe_open(filename: pathlib.Path) -> h5py.File: return destination_file # ============================================================================== +def infer_numfiles_from_header(hdr: Mapping) -> int: + """Infers the total number of ranks that cholla was run with to produce this + file. Equivalently, this returns the number of files that must be + concatenated. + + Parameters + ---------- + hdr: Mapping + ``dict``-like object specifying the core attributes of an hdf5 file. This + is commonly the value returned by the ``attrs`` property of a ``h5py.File`` + instance (but we don't really care about the type). + + Returns + ------- + int + The number of files that must be concatenated + + Notes + ----- + In the future, it would be nice to directly encode this information rather + than requiring us to encode it + """ + dims, dims_local = hdr['dims'], hdr['dims_local'] + assert np.issubdtype(dims.dtype, np.signedinteger) # sanity check + assert np.issubdtype(dims_local.dtype, np.signedinteger) # sanity check + + blocks_per_ax, remainders = np.divmod(dims, dims_local, dtype = 'i8') + assert np.all(blocks_per_ax > 0) and np.all(remainders == 0) # sanity check + + return int(np.prod(blocks_per_ax)) + + # ============================================================================== def copy_header(source_file: h5py.File, destination_file: h5py.File) -> h5py.File: """Copy the attributes of one HDF5 file to another, skipping all fields that are specific to an individual rank @@ -68,25 +108,70 @@ def copy_header(source_file: h5py.File, destination_file: h5py.File) -> h5py.Fil return destination_file # ============================================================================== +def _integer_sequence(s: str): + # converts an argument string to an integer sequence + # -> s can be a range specified as start:stop:step. This follows mirrors + # the semantics of a python slice (at the moment, start and stop are + # both required) + # -> s can b a comma separated list + # -> s can be a single value + m = re.match( + r"(?P[-+]?\d+):(?P[-+]?\d+)(:(?P[-+]?\d+))?", + s) + if m is not None: + rslts = m.groupdict() + step = 1 + if rslts['step'] is not None: + step = int(rslts.get('step',1)) + if step == 0: + raise ValueError(f"The range, {s!r}, has a stepsize of 0") + seq = range(int(rslts['start']), int(rslts['stop']), step) + if len(seq) == 0: + raise ValueError(f"The range, {s!r}, has 0 values") + return seq + elif re.match(r"([-+]?\d+)(,[ ]*[-+]?\d+)+", s): + seq = [int(elem) for elem in s.split(',')] + return seq + try: + return [int(s)] + except ValueError: + raise ValueError( + f"{s!r} is invalid. It should be a single int or a range" + ) from None + + # ============================================================================== -def common_cli() -> argparse.ArgumentParser: - """This function provides the basis for the common CLI amongst the various concatenation scripts. It returns an - `argparse.ArgumentParser` object to which additional arguments can be passed before the final `.parse_args()` method - is used. +def _add_snaps_arg(cli, required: bool = False): + cli.add_argument( + '--snaps', type=_integer_sequence, dest = "concat_outputs", + required = required, + metavar='(NUM | START:STOP[:STEP] | N1,N2,...)', + help = ('Specify output(s) to concatenate. Either a single number ' + '(e.g. 8), a range (in python slice syntax), or a list (e.g. ' + '1,2,3)') + ) + +def add_common_cli_args(cli: argparse.ArgumentParser, + num_processes_choice: str, + add_concat_outputs_arg: bool = True): + """Add common command-line arguments to an argparse.ArguementParser instance + + These arguments are shared among the various concatenation scripts. Parameters ---------- - - Returns - ------- - argparse.ArgumentParser - The common components of the CLI for the concatenation scripts + cli: argparse.ArgumentParser + Instance that arguments are added to """ # ============================================================================ - def concat_output(raw_argument: str) -> list: + def concat_output(raw_argument: str,)-> list: """Function used to parse the `--concat-output` argument """ + warnings.warn( + "The -c/--concat-output flag is now deprecated. use the --snaps " + "flag instead" + ) # Check if the string is empty if len(raw_argument) < 1: raise ValueError('The --concat-output argument must not be of length zero.') @@ -159,14 +244,31 @@ def chunk_arg(raw_argument: str) -> tuple: return tuple([int(i) for i in cleaned_argument.split(',')]) # ============================================================================ - # Initialize the CLI - cli = argparse.ArgumentParser() + if num_processes_choice == 'use': + cli.add_argument( + '-n', '--num-processes', type=positive_int, required=True, + help='The number of processes that were used while running Cholla.') + elif num_processes_choice == 'deprecate': + cli.add_argument( + '-n', '--num-processes', type=positive_int, required=False, + default = None, + help='DEPRECATED: The number of processes that were used while running Cholla.') + elif num_processes_choice != 'omit': + raise ValueError('invalid value passed for num_processes_choice') + + if add_concat_outputs_arg: + grp = cli.add_mutually_exclusive_group(required=True) + grp.add_argument( + '-c', '--concat-outputs', type=concat_output, + help = 'DEPRECATED (use --snaps instead) Specify outputs to concatenate. Can be a single number (e.g. 8), an inclusive range (e.g. 2-9), or a list (e.g. [1,2,3]).') + _add_snaps_arg(grp, required = False) + else: + _add_snaps_arg(cli, required = True) - # Required Arguments + # Other Required Arguments cli.add_argument('-s', '--source-directory', type=pathlib.Path, required=True, help='The path to the directory for the source HDF5 files.') - cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') - cli.add_argument('-n', '--num-processes', type=positive_int, required=True, help='The number of processes that were used') - cli.add_argument('-c', '--concat-outputs', type=concat_output, required=True, help='Which outputs to concatenate. Can be a single number (e.g. 8), a range (e.g. 2-9), or a list (e.g. [1,2,3]). Ranges are inclusive') + cli.add_argument('-o', '--output-directory', type=pathlib.Path, required=True, help='The path to the directory to write out the concatenated HDF5 files.') + # Optional Arguments cli.add_argument('--skip-fields', type=skip_fields, default=[], help='List of fields to skip concatenating. Defaults to empty.') @@ -175,7 +277,29 @@ def chunk_arg(raw_argument: str) -> tuple: cli.add_argument('--compression-opts', type=str, default=None, help='What compression settings to use if compressing. Defaults to None.') cli.add_argument('--chunking', type=chunk_arg, default=None, nargs='?', const=True, help='Enable chunking of the output file. Default is `False`. If set without an argument then the chunk size will be automatically chosen or a tuple can be passed to indicate the chunk size desired.') + +def common_cli(num_processes_choice = 'use') -> argparse.ArgumentParser: + """This function provides the basis for the common CLI amongst the various + concatenation scripts. + + It returns a newly constructed `argparse.ArgumentParser` object to which + additional arguments can be passed before the final `.parse_args()` method is + used. + + Parameters + ---------- + + Returns + ------- + argparse.ArgumentParser + The common components of the CLI for the concatenation scripts + """ + # Initialize the CLI + cli = argparse.ArgumentParser() + add_common_cli_args(cli, num_processes_choice = num_processes_choice, + add_concat_outputs_arg = True) return cli + # ============================================================================== def _get_source_path(proc_id : int, source_directory : pathlib.Path,