Skip to content

Commit

Permalink
1026 logical source parser (#1044)
Browse files Browse the repository at this point in the history
* Add function for parsing sensor number out of string

* Add function for parsing components of logical source, logical file id or full filename

* Fix order of sensor number string

* parse full sensor string

* Use logical source parser function in l1b and l1c

* Use parse_sensor_number

* move parse_filename_like into cdf.utils

* Add test coverage for parsing of repointing number
  • Loading branch information
subagonsouth authored Oct 29, 2024
1 parent c67d3ee commit 978bae9
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 16 deletions.
46 changes: 46 additions & 0 deletions imap_processing/cdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,49 @@ def write_cdf(
) # Terminate if not ISTP compliant

return file_path


def parse_filename_like(filename_like: str) -> re.Match:
"""
Parse a filename like string.
This function is based off of the more strict regex parsing of IMAP science
product filenames found in the `imap_data_access` package `ScienceFilePath`
class. This function implements a more relaxed regex that can be used on
`Logical_source` or `Logical_file_id` found in the CDF file. The required
components in the input string are `mission`, `instrument`, `data_level`,
and `descriptor`.
Parameters
----------
filename_like : str
A filename like string. This includes `Logical_source` or `Logical_file_id`
strings.
Returns
-------
match : re.Match
A dictionary like re.Match object resulting from parsing the input string.
Raises
------
ValueError if the regex fails to match the input string.
"""
regex_str = (
r"^(?P<mission>imap)_" # Required mission
r"(?P<instrument>[^_]+)_" # Required instrument
r"(?P<data_level>[^_]+)_" # Required data level
r"((?P<sensor>\d{2}sensor)?-)?" # Optional sensor number
r"(?P<descriptor>[^_]+)" # Required descriptor
r"(_(?P<start_date>\d{8}))?" # Optional start date
r"(-repoint(?P<repointing>\d{5}))?" # Optional repointing field
r"(?:_v(?P<version>\d{3}))?" # Optional version
r"(?:\.(?P<extension>cdf|pkts))?$" # Optional extension
)
match = re.match(regex_str, filename_like)
if match is None:
raise ValueError(
"Filename like string did not contain required fields"
"including mission, instrument, data_level, and descriptor."
)
return match
25 changes: 12 additions & 13 deletions imap_processing/hi/l1b/hi_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hi.utils import HIAPID, HiConstants, create_dataset_variables
from imap_processing.cdf.utils import parse_filename_like
from imap_processing.hi.utils import (
HIAPID,
HiConstants,
create_dataset_variables,
parse_sensor_number,
)
from imap_processing.spice.geometry import SpiceFrame, instrument_pointing
from imap_processing.spice.time import j2000ns_to_j2000s
from imap_processing.utils import convert_raw_to_eu
Expand Down Expand Up @@ -56,12 +62,12 @@ def hi_l1b(l1a_dataset: xr.Dataset, data_version: str) -> xr.Dataset:
logger.info(
f"Running Hi L1B processing on dataset: {l1a_dataset.attrs['Logical_source']}"
)
logical_source_parts = l1a_dataset.attrs["Logical_source"].split("_")
logical_source_parts = parse_filename_like(l1a_dataset.attrs["Logical_source"])
# TODO: apid is not currently stored in all L1A data but should be.
# Use apid to determine what L1B processing function to call

# Housekeeping processing
if logical_source_parts[-1].endswith("hk"):
if logical_source_parts["descriptor"].endswith("hk"):
# if packet_enum in (HIAPID.H45_APP_NHK, HIAPID.H90_APP_NHK):
packet_enum = HIAPID(l1a_dataset["pkt_apid"].data[0])
conversion_table_path = str(
Expand All @@ -78,20 +84,16 @@ def hi_l1b(l1a_dataset: xr.Dataset, data_version: str) -> xr.Dataset:
)

l1b_dataset.attrs.update(ATTR_MGR.get_global_attributes("imap_hi_l1b_hk_attrs"))
elif logical_source_parts[-1].endswith("de"):
elif logical_source_parts["descriptor"].endswith("de"):
l1b_dataset = annotate_direct_events(l1a_dataset)
else:
raise NotImplementedError(
f"No Hi L1B processing defined for file type: "
f"{l1a_dataset.attrs['Logical_source']}"
)
# Update global attributes
# TODO: write a function that extracts the sensor from Logical_source
# some functionality can be found in imap_data_access.file_validation but
# only works on full file names
sensor_str = logical_source_parts[-1].split("-")[0]
l1b_dataset.attrs["Logical_source"] = l1b_dataset.attrs["Logical_source"].format(
sensor=sensor_str
sensor=logical_source_parts["sensor"]
)
# TODO: revisit this
l1b_dataset.attrs["Data_version"] = data_version
Expand Down Expand Up @@ -281,10 +283,7 @@ def compute_hae_coordinates(dataset: xr.Dataset) -> xr.Dataset:
)
out_ds = dataset.assign(new_data_vars)
et = j2000ns_to_j2000s(out_ds.epoch.values)
# TODO: implement a Hi parser for getting the sensor number
sensor_number = int(
dataset.attrs["Logical_source"].split("_")[-1].split("-")[0][0:2]
)
sensor_number = parse_sensor_number(dataset.attrs["Logical_source"])
# TODO: For now, we are using SPICE to compute the look direction for each
# direct event. This will eventually be replaced by the algorithm Paul
# Janzen provided in the Hi Algorithm Document which should be faster
Expand Down
5 changes: 3 additions & 2 deletions imap_processing/hi/l1c/hi_l1c.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import xarray as xr

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import parse_filename_like
from imap_processing.hi.utils import full_dataarray

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,9 +65,9 @@ def generate_pset_dataset(de_dataset: xr.Dataset) -> xr.Dataset:
pset_dataset : xarray.Dataset
Ready to be written to CDF.
"""
sensor_str = de_dataset.attrs["Logical_source"].split("_")[-1].split("-")[0]
logical_source_parts = parse_filename_like(de_dataset.attrs["Logical_source"])
n_esa_step = de_dataset.esa_step.data.size
pset_dataset = allocate_pset_dataset(n_esa_step, sensor_str)
pset_dataset = allocate_pset_dataset(n_esa_step, logical_source_parts["sensor"])
# TODO: Stored epoch value needs to be consistent across ENA instruments.
# SPDF says this should be the center of the time bin, but instrument
# teams may disagree.
Expand Down
27 changes: 27 additions & 0 deletions imap_processing/hi/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""IMAP-Hi utils functions."""

import re
from collections.abc import Sequence
from dataclasses import dataclass
from enum import IntEnum
Expand Down Expand Up @@ -67,6 +68,32 @@ class HiConstants:
TOF3_BAD_VALUES = (1023,)


def parse_sensor_number(full_string: str) -> int:
"""
Parse the sensor number from a string.
This function uses regex to match any portion of the input string
containing "(45|90)sensor".
Parameters
----------
full_string : str
A string containing sensor number.
Returns
-------
sensor_number : int
The integer sensor number. For IMAP-Hi this is 45 or 90.
"""
regex_str = r".*(?P<sensor_num>(45|90))sensor.*?"
match = re.match(regex_str, full_string)
if match is None:
raise ValueError(
f"String 'sensor(45|90)' not found in input string: '{full_string}'"
)
return int(match["sensor_num"])


def full_dataarray(
name: str,
attrs: dict,
Expand Down
54 changes: 54 additions & 0 deletions imap_processing/tests/cdf/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import (
load_cdf,
parse_filename_like,
write_cdf,
)
from imap_processing.spice.time import met_to_j2000ns
Expand Down Expand Up @@ -107,3 +108,56 @@ def test_parents_injection(test_dataset):
parent_paths = [Path("test_parent1.cdf"), Path("/abc/test_parent2.cdf")]
new_dataset = load_cdf(write_cdf(test_dataset, parent_files=parent_paths))
assert new_dataset.attrs["Parents"] == [p.name for p in parent_paths]


@pytest.mark.parametrize(
"test_str, compare_dict",
[
(
"imap_hi_l1b_45sensor-de",
{
"mission": "imap",
"instrument": "hi",
"data_level": "l1b",
"sensor": "45sensor",
"descriptor": "de",
},
),
(
"imap_hi_l1a_hist_20250415_v001",
{
"mission": "imap",
"instrument": "hi",
"data_level": "l1a",
"descriptor": "hist",
"start_date": "20250415",
"version": "001",
},
),
(
"imap_hi_l1c_90sensor-pset_20250415-repoint12345_v001.cdf",
{
"mission": "imap",
"instrument": "hi",
"data_level": "l1c",
"sensor": "90sensor",
"descriptor": "pset",
"start_date": "20250415",
"repointing": "12345",
"version": "001",
"extension": "cdf",
},
),
("foo_hi_l1c_90sensor-pset_20250415_v001.cdf", None),
("imap_hi_l1c", None),
],
)
def test_parse_filename_like(test_str, compare_dict):
"""Test coverage for parse_filename_like function"""
if compare_dict:
match = parse_filename_like(test_str)
for key, value in compare_dict.items():
assert match[key] == value
else:
with pytest.raises(ValueError, match="Filename like string did not contain"):
_ = parse_filename_like(test_str)
25 changes: 24 additions & 1 deletion imap_processing/tests/hi/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import xarray as xr

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hi.utils import HIAPID, create_dataset_variables, full_dataarray
from imap_processing.hi.utils import (
HIAPID,
create_dataset_variables,
full_dataarray,
parse_sensor_number,
)


def test_hiapid():
Expand All @@ -20,6 +25,24 @@ def test_hiapid():
assert hi_apid.sensor == "90sensor"


@pytest.mark.parametrize(
"test_str, expected",
[
("imap_hi_l1b_45sensor-de", 45),
("imap_hi_l1c_90sensor-pset_20250415_v001.cdf", 90),
("imap_hi_l1c_{number}sensor", None),
],
)
def test_parse_sensor_number(test_str, expected):
"""Test coverage for parse_sensor_number function"""
if expected:
sensor_number = parse_sensor_number(test_str)
assert sensor_number == expected
else:
with pytest.raises(ValueError, match=r"String 'sensor\(45|90\)' not found.*"):
_ = parse_sensor_number(test_str)


@pytest.mark.parametrize(
"name, shape, expected_shape",
[
Expand Down

0 comments on commit 978bae9

Please sign in to comment.