Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.3.0 #24

Merged
merged 16 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
tag: ${{ env.software_version }}
message: "Version ${{ env.software_version }}"
- name: Publish UMM-S with new version
uses: podaac/cmr-umm-updater@0.1.1
uses: podaac/cmr-umm-updater@0.2.1
if: |
github.ref == 'refs/heads/main' ||
startsWith(github.ref, 'refs/heads/release')
Expand All @@ -136,6 +136,7 @@ jobs:
provider: 'POCLOUD'
env: ${{ env.venue }}
version: ${{ env.software_version }}
timeout: 60
env:
cmr_user: ${{secrets.CMR_USER}}
cmr_pass: ${{secrets.CMR_PASS}}
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
### Security

## [0.3.0]

### Added
- PODAAC-4171
- Add AVHRRMTA_G-NAVO-L2P-v1.0 to associations
- Added in shared memory allocation limit in fork process
- PODAAC-4173
- Add AVHRRMTB_G-NAVO-L2P-v1.0 to associations
- [issue 10](https://github.com/podaac/concise/issues/10):
Handle empty granule files.
- [issue-14](https://github.com/podaac/concise/issues/14): Added support
for concatenating granules together that have different variables
- Added `timeout` option to `cmr-umm-updater`
### Changed
- Upgraded `cmr-umm-updater` to 0.2.1
### Deprecated
### Removed
### Fixed
### Security

## [0.2.0]

### Added
Expand Down
6 changes: 5 additions & 1 deletion cmr/ops_associations.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
C1940473819-POCLOUD
C1940475563-POCLOUD
C1940475563-POCLOUD
C2205618215-POCLOUD
C2205618339-POCLOUD
C2205121384-POCLOUD
C2205121394-POCLOUD
6 changes: 5 additions & 1 deletion cmr/uat_associations.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
C1234724470-POCLOUD
C1234724471-POCLOUD
C1234724471-POCLOUD
C1240739577-POCLOUD
C1240739709-POCLOUD
C1240739734-POCLOUD
C1240739726-POCLOUD
26 changes: 24 additions & 2 deletions podaac/merger/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@
from podaac.merger.preprocess_worker import run_preprocess


def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None):
def is_file_empty(parent_group):
"""
Function to test if a all variable size in a dataset is 0
"""

for var in parent_group.variables.values():
if var.size != 0:
return False
for child_group in parent_group.groups.values():
return is_file_empty(child_group)
return True


def merge_netcdf_files(original_input_files, output_file, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
"""
Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
granule. Named in reference to original Java implementation.
Expand Down Expand Up @@ -42,6 +55,15 @@ def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), per
logger.info('Preprocessing data...')
start = perf_counter()

input_files = []

# only concatinate files that are not empty
for file in original_input_files:
with nc.Dataset(file, 'r') as dataset:
is_empty = is_file_empty(dataset)
if is_empty is False:
input_files.append(file)

preprocess = run_preprocess(input_files, process_count)
group_list = preprocess['group_list']
max_dims = preprocess['max_dims']
Expand All @@ -59,7 +81,7 @@ def merge_netcdf_files(input_files, output_file, logger=getLogger(__name__), per
# -- merge datasets --
logger.info('Merging datasets...')
start = perf_counter()
run_merge(merged_dataset, input_files, var_info, max_dims, process_count)
run_merge(merged_dataset, input_files, var_info, max_dims, process_count, logger)

perf_stats['merge'] = perf_counter() - start
logger.info('Merging completed: %f', perf_stats['merge'])
Expand Down
81 changes: 66 additions & 15 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,29 @@
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
import time
import os
import shutil
import netCDF4 as nc
import numpy as np

from podaac.merger.path_utils import resolve_dim, resolve_group


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count):
def shared_memory_size():
"""
try to get the shared memory space size by reading the /dev/shm on linux machines
"""
try:
stat = shutil.disk_usage("/dev/shm")
return stat.total
except FileNotFoundError:
# Get memory size via env or default to 60 MB
default_memory_size = os.getenv("SHARED_MEMORY_SIZE", "60000000")
return int(default_memory_size)


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
"""
Automagically run merging in an optimized mode determined by the environment

Expand All @@ -33,7 +49,7 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count):
# Merging is bottlenecked at the write process which is single threaded
# so spinning up more than 2 processes for read/write won't scale the
# optimization
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2)
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims):
Expand All @@ -55,18 +71,24 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims):
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)

for item in var_info.items():
ds_group = resolve_group(origin_dataset, item[0])
merged_group = resolve_group(merged_dataset, item[0])
for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
merged_group = resolve_group(merged_dataset, var_path)
ds_var = ds_group.variables.get(var_name)

merged_var = merged_group[0].variables[var_name]

ds_var = ds_group[0].variables[ds_group[1]]
merged_var = merged_group[0].variables[ds_group[1]]
if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
merged_var[i] = np.full(target_shape, fill_value)
continue

resized = resize_var(ds_var, item[1], max_dims)
resized = resize_var(ds_var, var_meta, max_dims)
merged_var[i] = resized


def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count): # pylint: disable=too-many-locals
def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
"""
Run the variable merge in multi-core mode. This method creates (process_count - 1)
read processes which read data from an origin granule, resize it, then queue it
Expand All @@ -88,26 +110,35 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
process_count : int
Number of worker processes to run (expected >= 2)
"""

logger.info("Running multicore ......")
total_variables = len(file_list) * len(var_info)
logger.info(f"total variables {total_variables}")

# Ensure SharedMemory doesn't get cleaned up before being processed
context = multiprocessing.get_context('forkserver')

with context.Manager() as manager:
in_queue = manager.Queue(len(file_list))
out_queue = manager.Queue((process_count - 1) * len(var_info)) # Store (process_count - 1) granules in buffer
memory_limit = manager.Value('i', 0)
lock = manager.Lock()

logger.info(file_list)
for i, file in enumerate(file_list):
in_queue.put((i, file))

processes = []

logger.info("creating read processes")
for _ in range(process_count - 1):
process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info))
process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info, memory_limit, lock))
processes.append(process)
process.start()

processed_variables = 0

logger.info("Start processing variables in main process")
while processed_variables < total_variables:
try:
i, var_path, shape, memory_name = out_queue.get_nowait()
Expand All @@ -121,10 +152,10 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
resized_arr = np.ndarray(shape, var_meta.datatype, shared_memory.buf)

merged_var[i] = resized_arr # The write operation itself

shared_memory.unlink()
shared_memory.close()

with lock:
memory_limit.value = memory_limit.value - resized_arr.nbytes
processed_variables = processed_variables + 1

for process in processes:
Expand All @@ -133,7 +164,7 @@ def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count
process.join()


def _run_worker(in_queue, out_queue, max_dims, var_info):
def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
"""
A method to be executed in a separate process which reads variables from a
granule, performs resizing, and queues the processed data up for the writer
Expand All @@ -150,7 +181,12 @@ def _run_worker(in_queue, out_queue, max_dims, var_info):
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""

# want to use max 95% of the memory size of disk
max_memory_size = round(shared_memory_size() * .95)

while not in_queue.empty():

try:
i, file = in_queue.get_nowait()
except queue.Empty:
Expand All @@ -160,15 +196,30 @@ def _run_worker(in_queue, out_queue, max_dims, var_info):
origin_dataset.set_auto_maskandscale(False)

for var_path, var_meta in var_info.items():

ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables[var_name]
ds_var = ds_group.variables.get(var_name)

if ds_var is None:
fill_value = var_meta.fill_value
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
resized_arr = np.full(target_shape, fill_value)
else:
resized_arr = resize_var(ds_var, var_meta, max_dims)

if resized_arr.nbytes > max_memory_size:
raise RuntimeError(f'Merging failed - MAX MEMORY REACHED: {resized_arr.nbytes}')

resized_arr = resize_var(ds_var, var_meta, max_dims)
# Limit to how much memory we allocate to max memory size
while memory_limit.value + resized_arr.nbytes > max_memory_size and not out_queue.empty():
time.sleep(.5)

# Copy resized array to shared memory
shared_mem = SharedMemory(create=True, size=resized_arr.nbytes)
shared_arr = np.ndarray(resized_arr.shape, resized_arr.dtype, buffer=shared_mem.buf)
np.copyto(shared_arr, resized_arr)
with lock:
memory_limit.value = memory_limit.value + resized_arr.nbytes

out_queue.put((i, var_path, shared_arr.shape, shared_mem.name))
shared_mem.close()
Expand Down
10 changes: 9 additions & 1 deletion podaac/merger/preprocess_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,14 @@ def _run_multi_core(file_list, process_count):
if var_info is None:
var_info = result['var_info']
elif var_info != result['var_info']:
raise RuntimeError('Variable schemas are inconsistent between granules')
if set(var_info.keys()).difference(result['var_info']):
# If not all variables match, only compare variables that intersect
intersecting_vars = set(var_info).intersection(result['var_info'])
if list(
map(var_info.get, intersecting_vars)
) != list(map(result['var_info'].get, intersecting_vars)):
raise RuntimeError('Variable schemas are inconsistent between granules')
var_info.update(result['var_info'])

# The following data requires accumulation methods
merge_max_dims(max_dims, result['max_dims'])
Expand Down Expand Up @@ -417,6 +424,7 @@ def get_variable_data(group, var_info, var_metadata):
"""

for var in group.variables.values():

# Generate VariableInfo map
info = VariableInfo(var)
var_path = get_group_path(group, var.name)
Expand Down
3 changes: 3 additions & 0 deletions podaac/merger/variable_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __setattr__(self, name, value):

self.__dict__[name] = value

def __str__(self):
return f"name:{self.name} dim_order:{self.dim_order} fill_value:{self.fill_value} datatype:{self.datatype} group_path:{self.group_path}"

def __eq__(self, other):
return (
self.dim_order == other.dim_order and
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "podaac-concise"
version = "0.2.0"
version = "0.3.0-rc.3"
description = "Harmony service that merges granules"
authors = ["podaac-tva <podaac-tva@jpl.nasa.gov>"]
license = "Apache-2.0"
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading