Skip to content

Commit

Permalink
Merge pull request #24 from podaac/release/0.3.0
Browse files Browse the repository at this point in the history
Release/0.3.0
  • Loading branch information
skorper authored Mar 17, 2022
2 parents f4d5d9b + d4fd8c5 commit dc973a5
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 32 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
tag: ${{ env.software_version }}
message: "Version ${{ env.software_version }}"
- name: Publish UMM-S with new version
uses: podaac/cmr-umm-updater@0.1.1
uses: podaac/cmr-umm-updater@0.2.1
if: |
github.ref == 'refs/heads/main' ||
startsWith(github.ref, 'refs/heads/release')
Expand All @@ -136,6 +136,7 @@ jobs:
provider: 'POCLOUD'
env: ${{ env.venue }}
version: ${{ env.software_version }}
timeout: 60
env:
cmr_user: ${{secrets.CMR_USER}}
cmr_pass: ${{secrets.CMR_PASS}}
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
### Security

## [0.3.0]

### Added
- PODAAC-4171
- Add AVHRRMTA_G-NAVO-L2P-v1.0 to associations
- Added in shared memory allocation limit in fork process
- PODAAC-4173
- Add AVHRRMTB_G-NAVO-L2P-v1.0 to associations
- [issue 10](https://github.com/podaac/concise/issues/10):
Handle empty granule files.
- [issue-14](https://github.com/podaac/concise/issues/14): Added support
for concatenating granules together that have different variables
- Added `timeout` option to `cmr-umm-updater`
### Changed
- Upgraded `cmr-umm-updater` to 0.2.1
### Deprecated
### Removed
### Fixed
### Security

## [0.2.0]

### Added
Expand Down
6 changes: 5 additions & 1 deletion cmr/ops_associations.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
C1940473819-POCLOUD
C1940475563-POCLOUD
C1940475563-POCLOUD
C2205618215-POCLOUD
C2205618339-POCLOUD
C2205121384-POCLOUD
C2205121394-POCLOUD
6 changes: 5 additions & 1 deletion cmr/uat_associations.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
C1234724470-POCLOUD
C1234724471-POCLOUD
C1234724471-POCLOUD
C1240739577-POCLOUD
C1240739709-POCLOUD
C1240739734-POCLOUD
C1240739726-POCLOUD
26 changes: 24 additions & 2 deletions podaac/merger/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@
from podaac.merger.preprocess_worker import run_preprocess


def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None):
def is_file_empty(parent_group):
"""
Function to test if a all variable size in a dataset is 0
"""

for var in parent_group.variables.values():
if var.size != 0:
return False
for child_group in parent_group.groups.values():
return is_file_empty(child_group)
return True


def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
"""
Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
granule. Named in reference to original Java implementation.
Expand Down Expand Up @@ -42,6 +55,15 @@ def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), per
logger.info('Preprocessing data...')
start = perf_counter()

input_files = []

# only concatinate files that are not empty
for file in original_input_files:
with nc.Dataset(file, 'r') as dataset:
is_empty = is_file_empty(dataset)
if is_empty is False:
input_files.append(file)

preprocess = run_preprocess(input_files, process_count)
group_list = preprocess['group_list']
max_dims = preprocess['max_dims']
Expand All @@ -59,7 +81,7 @@ def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), per
# -- merge datasets --
logger.info('Merging datasets...')
start = perf_counter()
run_merge(merged_dataset, input_files, var_info, max_dims, process_count)
run_merge(merged_dataset, input_files, var_info, max_dims, process_count, logger)

perf_stats['merge'] = perf_counter() - start
logger.info('Merging completed: %f', perf_stats['merge'])
Expand Down
81 changes: 66 additions & 15 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,29 @@
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
import time
import os
import shutil
import netCDF4 as nc
import numpy as np

from podaac.merger.path_utils import resolve_dim, resolve_group


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count):
def shared_memory_size():
"""
try to get the shared memory space size by reading the /dev/shm on linux machines
"""
try:
stat = shutil.disk_usage("/dev/shm")
return stat.total
except FileNotFoundError:
# Get memory size via env or default to 60 MB
default_memory_size = os.getenv("SHARED_MEMORY_SIZE", "60000000")
return int(default_memory_size)


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
"""
Automagically run merging in an optimized mode determined by the environment
Expand All @@ -33,7 +49,7 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count):
# Merging is bottlenecked at the write process which is single threaded
# so spinning up more than 2 processes for read/write won't scale the
# optimization
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2)
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims):
Expand All @@ -55,18 +71,24 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims):
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)

for item in var_info.items():
ds_group = resolve_group(origin_dataset, item[0])
merged_group = resolve_group(merged_dataset, item[0])
for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
merged_group = resolve_group(merged_dataset, var_path)
ds_var = ds_group.variables.get(var_name)

merged_var = merged_group[0].variables[var_name]

ds_var = ds_group[0].variables[ds_group[1]]
merged_var = merged_group[0].variables[ds_group[1]]
if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
merged_var[i] = np.full(target_shape, fill_value)
continue

resized = resize_var(ds_var, item[1], max_dims)
resized = resize_var(ds_var, var_meta, max_dims)
merged_var[i] = resized


def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count): # pylint: disable=too-many-locals
def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
"""
Run the variable merge in multi-core mode. This method creates (process_count - 1)
read processes which read data from an origin granule, resize it, then queue it
Expand All @@ -88,26 +110,35 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
process_count : int
Number of worker processes to run (expected >= 2)
"""

logger.info("Running multicore ......")
total_variables = len(file_list) * len(var_info)
logger.info(f"total variables {total_variables}")

# Ensure SharedMemory doesn't get cleaned up before being processed
context = multiprocessing.get_context('forkserver')

with context.Manager() as manager:
in_queue = manager.Queue(len(file_list))
out_queue = manager.Queue((process_count - 1) * len(var_info)) # Store (process_count - 1) granules in buffer
memory_limit = manager.Value('i', 0)
lock = manager.Lock()

logger.info(file_list)
for i, file in enumerate(file_list):
in_queue.put((i, file))

processes = []

logger.info("creating read processes")
for _ in range(process_count - 1):
process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info))
process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info, memory_limit, lock))
processes.append(process)
process.start()

processed_variables = 0

logger.info("Start processing variables in main process")
while processed_variables < total_variables:
try:
i, var_path, shape, memory_name = out_queue.get_nowait()
Expand All @@ -121,10 +152,10 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
resized_arr = np.ndarray(shape, var_meta.datatype, shared_memory.buf)

merged_var[i] = resized_arr # The write operation itself

shared_memory.unlink()
shared_memory.close()

with lock:
memory_limit.value = memory_limit.value - resized_arr.nbytes
processed_variables = processed_variables + 1

for process in processes:
Expand All @@ -133,7 +164,7 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
process.join()


def _run_worker(in_queue, out_queue, max_dims, var_info):
def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
"""
A method to be executed in a separate process which reads variables from a
granule, performs resizing, and queues the processed data up for the writer
Expand All @@ -150,7 +181,12 @@ def _run_worker(in_queue, out_queue, max_dims, var_info):
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""

# want to use max 95% of the memory size of disk
max_memory_size = round(shared_memory_size() * .95)

while not in_queue.empty():

try:
i, file = in_queue.get_nowait()
except queue.Empty:
Expand All @@ -160,15 +196,30 @@ def _run_worker(in_queue, out_queue, max_dims, var_info):
origin_dataset.set_auto_maskandscale(False)

for var_path, var_meta in var_info.items():

ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables[var_name]
ds_var = ds_group.variables.get(var_name)

if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
resized_arr = np.full(target_shape, fill_value)
else:
resized_arr = resize_var(ds_var, var_meta, max_dims)

if resized_arr.nbytes > max_memory_size:
raise RuntimeError(f'Merging failed - MAX MEMORY REACHED: {resized_arr.nbytes}')

resized_arr = resize_var(ds_var, var_meta, max_dims)
# Limit to how much memory we allocate to max memory size
while memory_limit.value + resized_arr.nbytes > max_memory_size and not out_queue.empty():
time.sleep(.5)

# Copy resized array to shared memory
shared_mem = SharedMemory(create=True, size=resized_arr.nbytes)
shared_arr = np.ndarray(resized_arr.shape, resized_arr.dtype, buffer=shared_mem.buf)
np.copyto(shared_arr, resized_arr)
with lock:
memory_limit.value = memory_limit.value + resized_arr.nbytes

out_queue.put((i, var_path, shared_arr.shape, shared_mem.name))
shared_mem.close()
Expand Down
10 changes: 9 additions & 1 deletion podaac/merger/preprocess_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,14 @@ def _run_multi_core(file_list, process_count):
if var_info is None:
var_info = result['var_info']
elif var_info != result['var_info']:
raise RuntimeError('Variable schemas are inconsistent between granules')
if set(var_info.keys()).difference(result['var_info']):
# If not all variables match, only compare variables that intersect
intersecting_vars = set(var_info).intersection(result['var_info'])
if list(
map(var_info.get, intersecting_vars)
) != list(map(result['var_info'].get, intersecting_vars)):
raise RuntimeError('Variable schemas are inconsistent between granules')
var_info.update(result['var_info'])

# The following data requires accumulation methods
merge_max_dims(max_dims, result['max_dims'])
Expand Down Expand Up @@ -417,6 +424,7 @@ def get_variable_data(group, var_info, var_metadata):
"""

for var in group.variables.values():

# Generate VariableInfo map
info = VariableInfo(var)
var_path = get_group_path(group, var.name)
Expand Down
3 changes: 3 additions & 0 deletions podaac/merger/variable_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __setattr__(self, name, value):

self.__dict__[name] = value

def __str__(self):
return f"name:{self.name} dim_order:{self.dim_order} fill_value:{self.fill_value} datatype:{self.datatype} group_path:{self.group_path}"

def __eq__(self, other):
return (
self.dim_order == other.dim_order and
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "podaac-concise"
version = "0.2.0"
version = "0.3.0-rc.3"
description = "Harmony service that merges granules"
authors = ["podaac-tva <podaac-tva@jpl.nasa.gov>"]
license = "Apache-2.0"
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit dc973a5

Please sign in to comment.