Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add type annotations and args to docstrings #115

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
### Changed
- Updated python libraries
- [issue #114](https://github.com/podaac/concise/issues/114): add type annotations
### Deprecated
### Removed
- Removed CMR testing. Now done in [concise-autotest](https://github.com/podaac/concise-autotest) repo
Expand Down
30 changes: 20 additions & 10 deletions podaac/merger/merge.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Main module containing merge implementation"""

from pathlib import Path
from time import perf_counter
from logging import getLogger
from os import cpu_count
Expand All @@ -11,9 +11,9 @@
from podaac.merger.preprocess_worker import run_preprocess


def is_file_empty(parent_group):
def is_file_empty(parent_group: nc.Dataset | nc.Group) -> bool:
"""
Function to test if a all variable size in a dataset is 0
Function to test if any variable size in a dataset is zero
"""

for var in parent_group.variables.values():
Expand All @@ -24,17 +24,23 @@ def is_file_empty(parent_group):
return True


def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
def merge_netcdf_files(original_input_files: list[Path], # pylint: disable=too-many-locals
output_file: str,
granule_urls,
logger=getLogger(__name__),
perf_stats: dict = None,
process_count: int = None):
"""
Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
granule. Named in reference to original Java implementation.
granule. Named in reference to the original Java implementation.

Parameters
----------
input_files: list
list of string paths to NetCDF4 files to merge
original_input_files: list
list of Paths to NetCDF4 files to merge
output_file: str
output path for merged product
granule_urls
logger: logger
logger object
perf_stats: dict
Expand Down Expand Up @@ -113,7 +119,7 @@ def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=g
logger.info('Done!')


def clean_metadata(metadata):
def clean_metadata(metadata: dict) -> None:
"""
Prepares metadata dictionary for insertion by removing inconsistent entries
and performing escaping of attribute names
Expand Down Expand Up @@ -141,9 +147,13 @@ def clean_metadata(metadata):
del metadata[key]


def init_dataset(merged_dataset, groups, var_info, max_dims, input_files):
def init_dataset(merged_dataset: nc.Dataset,
groups: list[str],
var_info: dict,
max_dims: dict,
input_files: list[Path]) -> None:
"""
Initialize the dataset utilizing data gathered from preprocessing
Initialize the dataset using data gathered from preprocessing

Parameters
----------
Expand Down
48 changes: 33 additions & 15 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import logging
import math
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
import time
import os
import shutil
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
from pathlib import Path
import netCDF4 as nc
import numpy as np

from podaac.merger.path_utils import resolve_dim, resolve_group


def shared_memory_size():
def shared_memory_size() -> int:
"""
try to get the shared memory space size by reading the /dev/shm on linux machines
"""
Expand All @@ -26,16 +27,16 @@ def shared_memory_size():
return int(default_memory_size)


def max_var_memory(file_list, var_info, max_dims):
"""
function to get the maximum shared memory that will be used for variables
def max_var_memory(file_list: list[Path], var_info: dict, max_dims) -> int:
"""Function to get the maximum shared memory that will be used for variables

Parameters
----------
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
max_dims
"""

max_var_mem = 0
Expand All @@ -57,7 +58,12 @@ def max_var_memory(file_list, var_info, max_dims):
return max_var_mem


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
def run_merge(merged_dataset: nc.Dataset,
file_list: list[Path],
var_info: dict,
max_dims: dict,
process_count: int,
logger: logging.Logger):
"""
Automagically run merging in an optimized mode determined by the environment

Expand All @@ -73,6 +79,7 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 1)
logger
"""

if process_count == 1:
Expand All @@ -91,7 +98,11 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
def _run_single_core(merged_dataset: nc.Dataset,
file_list: list[Path],
var_info: dict,
max_dims: dict,
logger: logging.Logger):
"""
Run the variable merge in the current thread/single-core mode

Expand All @@ -105,6 +116,7 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
Dictionary of variable paths and associated VariableInfo
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
logger
"""

logger.info("Running single core ......")
Expand All @@ -129,7 +141,12 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
merged_var[i] = resized


def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
def _run_multi_core(merged_dataset: nc.Dataset, # pylint: disable=too-many-locals
file_list: list[Path],
var_info: dict,
max_dims: dict,
process_count: int,
logger: logging.Logger):
"""
Run the variable merge in multi-core mode. This method creates (process_count - 1)
read processes which read data from an origin granule, resize it, then queue it
Expand All @@ -150,6 +167,7 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
Dictionary of dimension paths and maximum dimensions found during preprocessing
process_count : int
Number of worker processes to run (expected >= 2)
logger
"""

logger.info("Running multicore ......")
Expand Down Expand Up @@ -266,7 +284,7 @@ def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
shared_mem.close()


def _check_exit(processes):
def _check_exit(processes: list):
"""
Ensure that all processes have exited without error by checking their exitcode
if they're no longer running. Processes that have exited properly are removed
Expand All @@ -286,7 +304,7 @@ def _check_exit(processes):
raise RuntimeError(f'Merging failed - exit code: {process.exitcode}')


def resize_var(var, var_info, max_dims):
def resize_var(var: nc.Variable, var_info, max_dims: dict) -> np.ndarray:
"""
Resizes a variable's data to the maximum dimensions found in preprocessing.
This method will never downscale a variable and only performs bottom and
Expand All @@ -296,8 +314,8 @@ def resize_var(var, var_info, max_dims):
----------
var : nc.Variable
variable to be resized
group_path : str
group path to this variable
var_info
contains a group path to this variable
max_dims : dict
dictionary of maximum dimensions found during preprocessing

Expand All @@ -310,7 +328,7 @@ def resize_var(var, var_info, max_dims):
if var.ndim == 0:
return var[:]

# generate ordered array of new widths
# generate an ordered array of new widths
dims = [resolve_dim(max_dims, var_info.group_path, dim.name) - dim.size for dim in var.get_dims()]
widths = [[0, dim] for dim in dims]

Expand Down
9 changes: 5 additions & 4 deletions podaac/merger/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
Utilities used throughout the merging implementation to simplify group path resolution
and generation
"""
import netCDF4 as nc


def get_group_path(group, resource):
def get_group_path(group: nc.Group, resource: str) -> str:
"""
Generates a Unix-like path from a group and resource to be accessed

Expand All @@ -27,7 +28,7 @@ def get_group_path(group, resource):
return group.path + '/' + resource


def resolve_group(dataset, path):
def resolve_group(dataset: nc.Dataset, path: str):
"""
Resolves a group path into two components: the group and the resource's name

Expand All @@ -50,10 +51,10 @@ def resolve_group(dataset, path):
if len(components[0]) > 0:
group = dataset[components[0]]

return (group, components[1])
return group, components[1]


def resolve_dim(dims, group_path, dim_name):
def resolve_dim(dims: dict, group_path: str, dim_name: str):
"""
Attempt to resolve dim name starting from top-most group going down to the root group

Expand Down
Loading