From 978bae9a1a39a2f6bb749b54c526c0f5de57e683 Mon Sep 17 00:00:00 2001 From: Tim Plummer Date: Tue, 29 Oct 2024 09:21:34 -0600 Subject: [PATCH] 1026 logical source parser (#1044) * Add function for parsing sensor number out of string * Add function for parsing components of logical source, logical file id or full filename * Fix order of sensor number string * parse full sensor string * Use logical source parser function in l1b and l1c * Use parse_sensor_number * move parse_filename_like into cdf.utils * Add test coverage for parsing of repointing number --- imap_processing/cdf/utils.py | 46 +++++++++++++++++++++ imap_processing/hi/l1b/hi_l1b.py | 25 ++++++------ imap_processing/hi/l1c/hi_l1c.py | 5 ++- imap_processing/hi/utils.py | 27 +++++++++++++ imap_processing/tests/cdf/test_utils.py | 54 +++++++++++++++++++++++++ imap_processing/tests/hi/test_utils.py | 25 +++++++++++- 6 files changed, 166 insertions(+), 16 deletions(-) diff --git a/imap_processing/cdf/utils.py b/imap_processing/cdf/utils.py index eddd5fde8..9e6062021 100644 --- a/imap_processing/cdf/utils.py +++ b/imap_processing/cdf/utils.py @@ -145,3 +145,49 @@ def write_cdf( ) # Terminate if not ISTP compliant return file_path + + +def parse_filename_like(filename_like: str) -> re.Match: + """ + Parse a filename like string. + + This function is based off of the more strict regex parsing of IMAP science + product filenames found in the `imap_data_access` package `ScienceFilePath` + class. This function implements a more relaxed regex that can be used on + `Logical_source` or `Logical_file_id` found in the CDF file. The required + components in the input string are `mission`, `instrument`, `data_level`, + and `descriptor`. + + Parameters + ---------- + filename_like : str + A filename like string. This includes `Logical_source` or `Logical_file_id` + strings. + + Returns + ------- + match : re.Match + A dictionary like re.Match object resulting from parsing the input string. + + Raises + ------ + ValueError if the regex fails to match the input string. + """ + regex_str = ( + r"^(?Pimap)_" # Required mission + r"(?P[^_]+)_" # Required instrument + r"(?P[^_]+)_" # Required data level + r"((?P\d{2}sensor)?-)?" # Optional sensor number + r"(?P[^_]+)" # Required descriptor + r"(_(?P\d{8}))?" # Optional start date + r"(-repoint(?P\d{5}))?" # Optional repointing field + r"(?:_v(?P\d{3}))?" # Optional version + r"(?:\.(?Pcdf|pkts))?$" # Optional extension + ) + match = re.match(regex_str, filename_like) + if match is None: + raise ValueError( + "Filename like string did not contain required fields" + "including mission, instrument, data_level, and descriptor." + ) + return match diff --git a/imap_processing/hi/l1b/hi_l1b.py b/imap_processing/hi/l1b/hi_l1b.py index a07ee5a66..9653e74e7 100644 --- a/imap_processing/hi/l1b/hi_l1b.py +++ b/imap_processing/hi/l1b/hi_l1b.py @@ -8,7 +8,13 @@ from imap_processing import imap_module_directory from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes -from imap_processing.hi.utils import HIAPID, HiConstants, create_dataset_variables +from imap_processing.cdf.utils import parse_filename_like +from imap_processing.hi.utils import ( + HIAPID, + HiConstants, + create_dataset_variables, + parse_sensor_number, +) from imap_processing.spice.geometry import SpiceFrame, instrument_pointing from imap_processing.spice.time import j2000ns_to_j2000s from imap_processing.utils import convert_raw_to_eu @@ -56,12 +62,12 @@ def hi_l1b(l1a_dataset: xr.Dataset, data_version: str) -> xr.Dataset: logger.info( f"Running Hi L1B processing on dataset: {l1a_dataset.attrs['Logical_source']}" ) - logical_source_parts = l1a_dataset.attrs["Logical_source"].split("_") + logical_source_parts = parse_filename_like(l1a_dataset.attrs["Logical_source"]) # TODO: apid is not currently stored in all L1A data but should be. # Use apid to determine what L1B processing function to call # Housekeeping processing - if logical_source_parts[-1].endswith("hk"): + if logical_source_parts["descriptor"].endswith("hk"): # if packet_enum in (HIAPID.H45_APP_NHK, HIAPID.H90_APP_NHK): packet_enum = HIAPID(l1a_dataset["pkt_apid"].data[0]) conversion_table_path = str( @@ -78,7 +84,7 @@ def hi_l1b(l1a_dataset: xr.Dataset, data_version: str) -> xr.Dataset: ) l1b_dataset.attrs.update(ATTR_MGR.get_global_attributes("imap_hi_l1b_hk_attrs")) - elif logical_source_parts[-1].endswith("de"): + elif logical_source_parts["descriptor"].endswith("de"): l1b_dataset = annotate_direct_events(l1a_dataset) else: raise NotImplementedError( @@ -86,12 +92,8 @@ def hi_l1b(l1a_dataset: xr.Dataset, data_version: str) -> xr.Dataset: f"{l1a_dataset.attrs['Logical_source']}" ) # Update global attributes - # TODO: write a function that extracts the sensor from Logical_source - # some functionality can be found in imap_data_access.file_validation but - # only works on full file names - sensor_str = logical_source_parts[-1].split("-")[0] l1b_dataset.attrs["Logical_source"] = l1b_dataset.attrs["Logical_source"].format( - sensor=sensor_str + sensor=logical_source_parts["sensor"] ) # TODO: revisit this l1b_dataset.attrs["Data_version"] = data_version @@ -281,10 +283,7 @@ def compute_hae_coordinates(dataset: xr.Dataset) -> xr.Dataset: ) out_ds = dataset.assign(new_data_vars) et = j2000ns_to_j2000s(out_ds.epoch.values) - # TODO: implement a Hi parser for getting the sensor number - sensor_number = int( - dataset.attrs["Logical_source"].split("_")[-1].split("-")[0][0:2] - ) + sensor_number = parse_sensor_number(dataset.attrs["Logical_source"]) # TODO: For now, we are using SPICE to compute the look direction for each # direct event. This will eventually be replaced by the algorithm Paul # Janzen provided in the Hi Algorithm Document which should be faster diff --git a/imap_processing/hi/l1c/hi_l1c.py b/imap_processing/hi/l1c/hi_l1c.py index c52a5a1e4..135795ed9 100644 --- a/imap_processing/hi/l1c/hi_l1c.py +++ b/imap_processing/hi/l1c/hi_l1c.py @@ -6,6 +6,7 @@ import xarray as xr from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes +from imap_processing.cdf.utils import parse_filename_like from imap_processing.hi.utils import full_dataarray logger = logging.getLogger(__name__) @@ -64,9 +65,9 @@ def generate_pset_dataset(de_dataset: xr.Dataset) -> xr.Dataset: pset_dataset : xarray.Dataset Ready to be written to CDF. """ - sensor_str = de_dataset.attrs["Logical_source"].split("_")[-1].split("-")[0] + logical_source_parts = parse_filename_like(de_dataset.attrs["Logical_source"]) n_esa_step = de_dataset.esa_step.data.size - pset_dataset = allocate_pset_dataset(n_esa_step, sensor_str) + pset_dataset = allocate_pset_dataset(n_esa_step, logical_source_parts["sensor"]) # TODO: Stored epoch value needs to be consistent across ENA instruments. # SPDF says this should be the center of the time bin, but instrument # teams may disagree. diff --git a/imap_processing/hi/utils.py b/imap_processing/hi/utils.py index b67e065ca..a1c5bab85 100644 --- a/imap_processing/hi/utils.py +++ b/imap_processing/hi/utils.py @@ -1,5 +1,6 @@ """IMAP-Hi utils functions.""" +import re from collections.abc import Sequence from dataclasses import dataclass from enum import IntEnum @@ -67,6 +68,32 @@ class HiConstants: TOF3_BAD_VALUES = (1023,) +def parse_sensor_number(full_string: str) -> int: + """ + Parse the sensor number from a string. + + This function uses regex to match any portion of the input string + containing "(45|90)sensor". + + Parameters + ---------- + full_string : str + A string containing sensor number. + + Returns + ------- + sensor_number : int + The integer sensor number. For IMAP-Hi this is 45 or 90. + """ + regex_str = r".*(?P(45|90))sensor.*?" + match = re.match(regex_str, full_string) + if match is None: + raise ValueError( + f"String 'sensor(45|90)' not found in input string: '{full_string}'" + ) + return int(match["sensor_num"]) + + def full_dataarray( name: str, attrs: dict, diff --git a/imap_processing/tests/cdf/test_utils.py b/imap_processing/tests/cdf/test_utils.py index 7b4ecf394..ec8de2ca7 100644 --- a/imap_processing/tests/cdf/test_utils.py +++ b/imap_processing/tests/cdf/test_utils.py @@ -10,6 +10,7 @@ from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes from imap_processing.cdf.utils import ( load_cdf, + parse_filename_like, write_cdf, ) from imap_processing.spice.time import met_to_j2000ns @@ -107,3 +108,56 @@ def test_parents_injection(test_dataset): parent_paths = [Path("test_parent1.cdf"), Path("/abc/test_parent2.cdf")] new_dataset = load_cdf(write_cdf(test_dataset, parent_files=parent_paths)) assert new_dataset.attrs["Parents"] == [p.name for p in parent_paths] + + +@pytest.mark.parametrize( + "test_str, compare_dict", + [ + ( + "imap_hi_l1b_45sensor-de", + { + "mission": "imap", + "instrument": "hi", + "data_level": "l1b", + "sensor": "45sensor", + "descriptor": "de", + }, + ), + ( + "imap_hi_l1a_hist_20250415_v001", + { + "mission": "imap", + "instrument": "hi", + "data_level": "l1a", + "descriptor": "hist", + "start_date": "20250415", + "version": "001", + }, + ), + ( + "imap_hi_l1c_90sensor-pset_20250415-repoint12345_v001.cdf", + { + "mission": "imap", + "instrument": "hi", + "data_level": "l1c", + "sensor": "90sensor", + "descriptor": "pset", + "start_date": "20250415", + "repointing": "12345", + "version": "001", + "extension": "cdf", + }, + ), + ("foo_hi_l1c_90sensor-pset_20250415_v001.cdf", None), + ("imap_hi_l1c", None), + ], +) +def test_parse_filename_like(test_str, compare_dict): + """Test coverage for parse_filename_like function""" + if compare_dict: + match = parse_filename_like(test_str) + for key, value in compare_dict.items(): + assert match[key] == value + else: + with pytest.raises(ValueError, match="Filename like string did not contain"): + _ = parse_filename_like(test_str) diff --git a/imap_processing/tests/hi/test_utils.py b/imap_processing/tests/hi/test_utils.py index 893aa8ee8..56ea6c5b8 100644 --- a/imap_processing/tests/hi/test_utils.py +++ b/imap_processing/tests/hi/test_utils.py @@ -5,7 +5,12 @@ import xarray as xr from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes -from imap_processing.hi.utils import HIAPID, create_dataset_variables, full_dataarray +from imap_processing.hi.utils import ( + HIAPID, + create_dataset_variables, + full_dataarray, + parse_sensor_number, +) def test_hiapid(): @@ -20,6 +25,24 @@ def test_hiapid(): assert hi_apid.sensor == "90sensor" +@pytest.mark.parametrize( + "test_str, expected", + [ + ("imap_hi_l1b_45sensor-de", 45), + ("imap_hi_l1c_90sensor-pset_20250415_v001.cdf", 90), + ("imap_hi_l1c_{number}sensor", None), + ], +) +def test_parse_sensor_number(test_str, expected): + """Test coverage for parse_sensor_number function""" + if expected: + sensor_number = parse_sensor_number(test_str) + assert sensor_number == expected + else: + with pytest.raises(ValueError, match=r"String 'sensor\(45|90\)' not found.*"): + _ = parse_sensor_number(test_str) + + @pytest.mark.parametrize( "name, shape, expected_shape", [