diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c8e867c..1840fe9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added ### Changed - Updated python libraries + - [issue #114](https://github.com/podaac/concise/issues/114): add type annotations ### Deprecated ### Removed - Removed CMR testing. Now done in [concise-autotest](https://github.com/podaac/concise-autotest) repo diff --git a/podaac/merger/merge.py b/podaac/merger/merge.py index c2cd9edc..44a20503 100644 --- a/podaac/merger/merge.py +++ b/podaac/merger/merge.py @@ -1,5 +1,5 @@ """Main module containing merge implementation""" - +from pathlib import Path from time import perf_counter from logging import getLogger from os import cpu_count @@ -11,9 +11,9 @@ from podaac.merger.preprocess_worker import run_preprocess -def is_file_empty(parent_group): +def is_file_empty(parent_group: nc.Dataset | nc.Group) -> bool: """ - Function to test if a all variable size in a dataset is 0 + Function to test if any variable size in a dataset is zero """ for var in parent_group.variables.values(): @@ -24,17 +24,23 @@ def is_file_empty(parent_group): return True -def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals +def merge_netcdf_files(original_input_files: list[Path], # pylint: disable=too-many-locals + output_file: str, + granule_urls, + logger=getLogger(__name__), + perf_stats: dict = None, + process_count: int = None): """ Main entrypoint to merge implementation. Merges n >= 2 granules together as a single - granule. Named in reference to original Java implementation. + granule. Named in reference to the original Java implementation. Parameters ---------- - input_files: list - list of string paths to NetCDF4 files to merge + original_input_files: list + list of Paths to NetCDF4 files to merge output_file: str output path for merged product + granule_urls logger: logger logger object perf_stats: dict @@ -113,7 +119,7 @@ def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=g logger.info('Done!') -def clean_metadata(metadata): +def clean_metadata(metadata: dict) -> None: """ Prepares metadata dictionary for insertion by removing inconsistent entries and performing escaping of attribute names @@ -141,9 +147,13 @@ def clean_metadata(metadata): del metadata[key] -def init_dataset(merged_dataset, groups, var_info, max_dims, input_files): +def init_dataset(merged_dataset: nc.Dataset, + groups: list[str], + var_info: dict, + max_dims: dict, + input_files: list[Path]) -> None: """ - Initialize the dataset utilizing data gathered from preprocessing + Initialize the dataset using data gathered from preprocessing Parameters ---------- diff --git a/podaac/merger/merge_worker.py b/podaac/merger/merge_worker.py index 7a16faf5..d43ee66a 100644 --- a/podaac/merger/merge_worker.py +++ b/podaac/merger/merge_worker.py @@ -1,19 +1,20 @@ """Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes""" - +import logging import math -import multiprocessing -from multiprocessing.shared_memory import SharedMemory import queue import time import os import shutil +import multiprocessing +from multiprocessing.shared_memory import SharedMemory +from pathlib import Path import netCDF4 as nc import numpy as np from podaac.merger.path_utils import resolve_dim, resolve_group -def shared_memory_size(): +def shared_memory_size() -> int: """ try to get the shared memory space size by reading the /dev/shm on linux machines """ @@ -26,9 +27,8 @@ def shared_memory_size(): return int(default_memory_size) -def max_var_memory(file_list, var_info, max_dims): - """ - function to get the maximum shared memory that will be used for variables +def max_var_memory(file_list: list[Path], var_info: dict, max_dims) -> int: + """Function to get the maximum shared memory that will be used for variables Parameters ---------- @@ -36,6 +36,7 @@ def max_var_memory(file_list, var_info, max_dims): List of file paths to be processed var_info : dict Dictionary of variable paths and associated VariableInfo + max_dims """ max_var_mem = 0 @@ -57,7 +58,12 @@ def max_var_memory(file_list, var_info, max_dims): return max_var_mem -def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger): +def run_merge(merged_dataset: nc.Dataset, + file_list: list[Path], + var_info: dict, + max_dims: dict, + process_count: int, + logger: logging.Logger): """ Automagically run merging in an optimized mode determined by the environment @@ -73,6 +79,7 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg Dictionary of dimension paths and maximum dimensions found during preprocessing process_count : int Number of worker processes to run (expected >= 1) + logger """ if process_count == 1: @@ -91,7 +98,11 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg _run_single_core(merged_dataset, file_list, var_info, max_dims, logger) -def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger): +def _run_single_core(merged_dataset: nc.Dataset, + file_list: list[Path], + var_info: dict, + max_dims: dict, + logger: logging.Logger): """ Run the variable merge in the current thread/single-core mode @@ -105,6 +116,7 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger): Dictionary of variable paths and associated VariableInfo max_dims : dict Dictionary of dimension paths and maximum dimensions found during preprocessing + logger """ logger.info("Running single core ......") @@ -129,7 +141,12 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger): merged_var[i] = resized -def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals +def _run_multi_core(merged_dataset: nc.Dataset, # pylint: disable=too-many-locals + file_list: list[Path], + var_info: dict, + max_dims: dict, + process_count: int, + logger: logging.Logger): """ Run the variable merge in multi-core mode. This method creates (process_count - 1) read processes which read data from an origin granule, resize it, then queue it @@ -150,6 +167,7 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count Dictionary of dimension paths and maximum dimensions found during preprocessing process_count : int Number of worker processes to run (expected >= 2) + logger """ logger.info("Running multicore ......") @@ -266,7 +284,7 @@ def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock): shared_mem.close() -def _check_exit(processes): +def _check_exit(processes: list): """ Ensure that all processes have exited without error by checking their exitcode if they're no longer running. Processes that have exited properly are removed @@ -286,7 +304,7 @@ def _check_exit(processes): raise RuntimeError(f'Merging failed - exit code: {process.exitcode}') -def resize_var(var, var_info, max_dims): +def resize_var(var: nc.Variable, var_info, max_dims: dict) -> np.ndarray: """ Resizes a variable's data to the maximum dimensions found in preprocessing. This method will never downscale a variable and only performs bottom and @@ -296,8 +314,8 @@ def resize_var(var, var_info, max_dims): ---------- var : nc.Variable variable to be resized - group_path : str - group path to this variable + var_info + contains a group path to this variable max_dims : dict dictionary of maximum dimensions found during preprocessing @@ -310,7 +328,7 @@ def resize_var(var, var_info, max_dims): if var.ndim == 0: return var[:] - # generate ordered array of new widths + # generate an ordered array of new widths dims = [resolve_dim(max_dims, var_info.group_path, dim.name) - dim.size for dim in var.get_dims()] widths = [[0, dim] for dim in dims] diff --git a/podaac/merger/path_utils.py b/podaac/merger/path_utils.py index 9cf7ebbe..8a93c9fe 100644 --- a/podaac/merger/path_utils.py +++ b/podaac/merger/path_utils.py @@ -2,9 +2,10 @@ Utilities used throughout the merging implementation to simplify group path resolution and generation """ +import netCDF4 as nc -def get_group_path(group, resource): +def get_group_path(group: nc.Group, resource: str) -> str: """ Generates a Unix-like path from a group and resource to be accessed @@ -27,7 +28,7 @@ def get_group_path(group, resource): return group.path + '/' + resource -def resolve_group(dataset, path): +def resolve_group(dataset: nc.Dataset, path: str): """ Resolves a group path into two components: the group and the resource's name @@ -50,10 +51,10 @@ def resolve_group(dataset, path): if len(components[0]) > 0: group = dataset[components[0]] - return (group, components[1]) + return group, components[1] -def resolve_dim(dims, group_path, dim_name): +def resolve_dim(dims: dict, group_path: str, dim_name: str): """ Attempt to resolve dim name starting from top-most group going down to the root group diff --git a/podaac/merger/preprocess_worker.py b/podaac/merger/preprocess_worker.py index 9edefabe..4bedba33 100644 --- a/podaac/merger/preprocess_worker.py +++ b/podaac/merger/preprocess_worker.py @@ -1,10 +1,10 @@ """Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes""" - +from pathlib import Path import json import queue from copy import deepcopy from datetime import datetime, timezone -from multiprocessing import Manager, Process +from multiprocessing import Manager, Process, Queue import importlib_metadata import netCDF4 as nc @@ -14,7 +14,7 @@ from podaac.merger.variable_info import VariableInfo -def run_preprocess(file_list, process_count, granule_urls): +def run_preprocess(file_list: list[Path], process_count: int, granule_urls: str) -> dict: """ Automagically run preprocessing in an optimized mode determined by the environment @@ -24,6 +24,7 @@ def run_preprocess(file_list, process_count, granule_urls): List of file paths to be processed process_count : int Number of worker processes to run (expected >= 1) + granule_urls """ if process_count == 1: @@ -50,7 +51,7 @@ def merge_max_dims(merged_max_dims, subset_max_dims): merged_max_dims[dim_name] = subset_dim_size -def merge_metadata(merged_metadata, subset_metadata): +def merge_metadata(merged_metadata: dict, subset_metadata: dict) -> None: """ Perform aggregation of metadata. Intended for use in multithreaded mode only @@ -75,7 +76,7 @@ def merge_metadata(merged_metadata, subset_metadata): merged_attrs[attr_name] = False # mark as inconsistent -def construct_history(input_files, granule_urls): +def construct_history(input_files: list[Path], granule_urls: str) -> dict: """ Construct history JSON entry for this concatenation operation https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42 @@ -84,6 +85,7 @@ def construct_history(input_files, granule_urls): ---------- input_files : list List of input files + granule_urls : str Returns ------- @@ -123,7 +125,7 @@ def retrieve_history(dataset): return json.loads(history_json) -def _run_single_core(file_list, granule_urls): +def _run_single_core(file_list: list[Path], granule_urls: str) -> dict: """ Run the granule preprocessing in the current thread/single-core mode @@ -131,6 +133,7 @@ def _run_single_core(file_list, granule_urls): ---------- file_list : list List of file paths to be processed + granule_urls Returns ------- @@ -167,7 +170,9 @@ def _run_single_core(file_list, granule_urls): } -def _run_multi_core(file_list, process_count, granule_urls): +def _run_multi_core(file_list: list[Path], + process_count: int, + granule_urls: str) -> dict: """ Run the granule preprocessing in multi-core mode. This method spins up the number of processes defined by process_count which process granules @@ -181,6 +186,7 @@ def _run_multi_core(file_list, process_count, granule_urls): List of file paths to be processed process_count : int Number of worker processes to run (expected >= 2) + granule_urls Returns ------- @@ -262,11 +268,11 @@ def _run_multi_core(file_list, process_count, granule_urls): } -def _run_worker(in_queue, results): +def _run_worker(in_queue: Queue, results: list[dict]) -> None: """ A method to be executed in a separate process which runs preprocessing on granules from the input queue and stores the results internally. When the queue is empty - (processing is complete), the local results are transfered to the external results + (processing is complete), the local results are transferred to the external results array to be merged by the main process. If the process never processed any granules which is possible if the input queue is underfilled, the process just exits without appending to the array @@ -311,7 +317,12 @@ def _run_worker(in_queue, results): }) -def process_groups(parent_group, group_list, max_dims, group_metadata, var_metadata, var_info): +def process_groups(parent_group: nc.Dataset | nc.Group, + group_list: list, + max_dims: dict, + group_metadata: dict, + var_metadata: dict, + var_info: dict): """ Perform preprocessing of a group and recursively process each child group @@ -345,7 +356,7 @@ def process_groups(parent_group, group_list, max_dims, group_metadata, var_metad process_groups(child_group, group_list, max_dims, group_metadata, var_metadata, var_info) -def get_max_dims(group, max_dims): +def get_max_dims(group: nc.Dataset | nc.Group, max_dims: dict) -> None: """ Aggregates dimensions from each group and creates a dictionary of the largest dimension sizes for each group @@ -365,7 +376,7 @@ def get_max_dims(group, max_dims): max_dims[dim_path] = dim.size -def get_metadata(group, metadata): +def get_metadata(group: nc.Dataset | nc.Group | nc.Variable, metadata: dict) -> None: """ Aggregates metadata from various NetCDF4 objects into a dictionary @@ -386,7 +397,7 @@ def get_metadata(group, metadata): metadata[attr_name] = False # mark as inconsistent -def attr_eq(attr_1, attr_2): +def attr_eq(attr_1, attr_2) -> bool: """ Helper function to check if one attribute value is equal to another (no, a simple == was not working) @@ -408,7 +419,7 @@ def attr_eq(attr_1, attr_2): return True -def get_variable_data(group, var_info, var_metadata): +def get_variable_data(group: nc.Dataset | nc.Group, var_info: dict, var_metadata: dict) -> None: """ Aggregate variable metadata and attributes. Primarily utilized in process_groups