diff --git a/imap_processing/cdf/utils.py b/imap_processing/cdf/utils.py index 52d67c6f8..dca6cf2da 100644 --- a/imap_processing/cdf/utils.py +++ b/imap_processing/cdf/utils.py @@ -103,7 +103,7 @@ def load_cdf( return dataset -def write_cdf(dataset: xr.Dataset) -> xr.Dataset: +def write_cdf(dataset: xr.Dataset, **extra_cdf_kwargs: dict) -> Path: """ Write the contents of "data" to a CDF file using cdflib.xarray_to_cdf. @@ -118,6 +118,8 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset: ---------- dataset : xarray.Dataset The dataset object to convert to a CDF. + **extra_cdf_kwargs : dict + Additional keyword arguments to pass to the ``xarray_to_cdf`` function. Returns ------- @@ -149,7 +151,7 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset: version=version, repointing=repointing, ) - file_path = science_file.construct_path() + file_path: Path = science_file.construct_path() if not file_path.parent.exists(): logger.info( "The directory does not exist, creating directory %s", file_path.parent @@ -166,6 +168,7 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset: dataset, str(file_path), terminate_on_warning=True, + **extra_cdf_kwargs, ) # Terminate if not ISTP compliant return file_path diff --git a/imap_processing/cli.py b/imap_processing/cli.py index 95919868c..315413b50 100644 --- a/imap_processing/cli.py +++ b/imap_processing/cli.py @@ -305,7 +305,7 @@ def download_dependencies(self) -> list: ) return file_list - def upload_products(self, products: list[list[Path]]) -> None: + def upload_products(self, products: list[Path]) -> None: """ Upload data products to the IMAP SDC. diff --git a/imap_processing/swapi/l1/swapi_l1.py b/imap_processing/swapi/l1/swapi_l1.py index 1becd7d87..f89752e28 100644 --- a/imap_processing/swapi/l1/swapi_l1.py +++ b/imap_processing/swapi/l1/swapi_l1.py @@ -8,8 +8,6 @@ from imap_processing import imap_module_directory from imap_processing.cdf.global_attrs import ConstantCoordinates -from imap_processing.cdf.utils import met_to_j2000ns -from imap_processing.decom import decom_packets from imap_processing.swapi.swapi_cdf_attrs import ( compression_attrs, counts_attrs, @@ -19,12 +17,7 @@ uncertainty_attrs, ) from imap_processing.swapi.swapi_utils import SWAPIAPID, SWAPIMODE -from imap_processing.utils import ( - create_dataset, - group_by_apid, - sort_by_time, - update_epoch_to_datetime, -) +from imap_processing.utils import packet_file_to_datasets def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray: @@ -60,7 +53,7 @@ def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray: # TODO: change comparison to SWAPIMODE.HVSCI once we have # some HVSCI data # MODE should be HVSCI - mode_indices = (mode == SWAPIMODE.HVENG).all(axis=1) + mode_indices = (mode == SWAPIMODE.HVENG.value).all(axis=1) bad_data_indices = sweep_indices & plan_id_indices & mode_indices # TODO: add checks for checksum @@ -172,7 +165,7 @@ def find_sweep_starts(packets: xr.Dataset) -> np.ndarray: # # [0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0] # And all? - ione = diff == 1 + ione = diff == 1e9 # 1 second valid = ( (packets["seq_number"] == 0)[:-11] @@ -475,7 +468,8 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data # epoch time. Should be same dimension as number of good sweeps epoch_time = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0] - epoch_converted_time = met_to_j2000ns(epoch_time) + # epoch_converted_time = met_to_j2000ns(epoch_time) + epoch_converted_time = epoch_time epoch_time = xr.DataArray( epoch_converted_time, name="epoch", @@ -532,8 +526,9 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data ) # L1 quality flags + # TODO: Should these be kept in raw format rather than derived into strings? dataset["swp_pcem_flags"] = xr.DataArray( - np.array(pcem_compression_flags, dtype=np.uint8), + np.array(pcem_compression_flags == "RAW", dtype=np.uint8), dims=["epoch", "energy"], attrs=dataclasses.replace( compression_attrs, @@ -543,7 +538,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data ).output(), ) dataset["swp_scem_flags"] = xr.DataArray( - np.array(scem_compression_flags, dtype=np.uint8), + np.array(scem_compression_flags == "RAW", dtype=np.uint8), dims=["epoch", "energy"], attrs=dataclasses.replace( compression_attrs, @@ -553,7 +548,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data ).output(), ) dataset["swp_coin_flags"] = xr.DataArray( - np.array(coin_compression_flags, dtype=np.uint8), + np.array(coin_compression_flags == "RAW", dtype=np.uint8), dims=["epoch", "energy"], attrs=dataclasses.replace( compression_attrs, @@ -626,25 +621,17 @@ def swapi_l1(file_path: str, data_version: str) -> xr.Dataset: xtce_definition = ( f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml" ) - packets = decom_packets( - packet_file=file_path, xtce_packet_definition=xtce_definition - ) - grouped_packets = group_by_apid(packets) + datasets = packet_file_to_datasets(file_path, xtce_definition) processed_data = [] - for apid in grouped_packets.keys(): - sorted_packets = sort_by_time(grouped_packets[apid], "SHCOARSE") + for apid, ds_data in datasets.items(): # Right now, we only process SWP_HK and SWP_SCI # other packets are not process in this processing pipeline # If appId is science, then the file should contain all data of science appId - ds_data = create_dataset(sorted_packets, include_header=False) if apid == SWAPIAPID.SWP_SCI.value: data = process_swapi_science(ds_data, data_version) processed_data.append(data) if apid == SWAPIAPID.SWP_HK.value: - # convert epoch to datetime - ds_data = update_epoch_to_datetime(ds_data) - # Add datalevel attrs ds_data.attrs.update(swapi_l1_hk_attrs.output()) ds_data.attrs["Data_version"] = data_version diff --git a/imap_processing/swapi/swapi_utils.py b/imap_processing/swapi/swapi_utils.py index 1f48fae85..b621c12d4 100644 --- a/imap_processing/swapi/swapi_utils.py +++ b/imap_processing/swapi/swapi_utils.py @@ -5,7 +5,7 @@ other SWAPI processing modules. """ -from enum import IntEnum +from enum import Enum, IntEnum class SWAPIAPID(IntEnum): @@ -16,10 +16,10 @@ class SWAPIAPID(IntEnum): SWP_AUT = 1192 -class SWAPIMODE(IntEnum): +class SWAPIMODE(Enum): """Create ENUM for MODE.""" - LVENG = 0 - LVSCI = 1 - HVENG = 2 - HVSCI = 3 + LVENG = "LVENG" + LVSCI = "LVSCI" + HVENG = "HVENG" + HVSCI = "HVSCI" diff --git a/imap_processing/tests/swapi/test_swapi_l1.py b/imap_processing/tests/swapi/test_swapi_l1.py index 12efc71c3..4a0a9fcc0 100644 --- a/imap_processing/tests/swapi/test_swapi_l1.py +++ b/imap_processing/tests/swapi/test_swapi_l1.py @@ -4,7 +4,6 @@ from imap_processing import imap_module_directory from imap_processing.cdf.utils import met_to_j2000ns, write_cdf -from imap_processing.decom import decom_packets from imap_processing.swapi.l1.swapi_l1 import ( SWAPIAPID, decompress_count, @@ -15,21 +14,18 @@ process_sweep_data, swapi_l1, ) -from imap_processing.utils import create_dataset, group_by_apid, sort_by_time +from imap_processing.utils import packet_file_to_datasets @pytest.fixture(scope="session") def decom_test_data(): """Read test data from file""" - test_folder_path = "tests/swapi/l0_data" - packet_files = list(imap_module_directory.glob(f"{test_folder_path}/*.pkts")) + test_file = "tests/swapi/l0_data/imap_swapi_l0_raw_20231012_v001.pkts" + packet_files = imap_module_directory / test_file packet_definition = ( f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml" ) - data_list = [] - for packet_file in packet_files: - data_list.extend(decom_packets(packet_file, packet_definition)) - return data_list + return packet_file_to_datasets(packet_files, packet_definition) def test_filter_good_data(): @@ -40,7 +36,7 @@ def test_filter_good_data(): { "plan_id_science": xr.DataArray(np.full((total_sweeps * 12), 1)), "sweep_table": xr.DataArray(np.repeat(np.arange(total_sweeps), 12)), - "mode": xr.DataArray(np.full((total_sweeps * 12), 2)), + "mode": xr.DataArray(np.full((total_sweeps * 12), "HVENG")), }, coords={"epoch": np.arange(total_sweeps * 12)}, ) @@ -49,17 +45,15 @@ def test_filter_good_data(): bad_data_indices = filter_good_data(ds) assert len(bad_data_indices) == 36 - # Check for bad MODE data. - # This test returns this indices because MODE has 0, 1 values - # for the first two sweeps. + # Check for bad MODE data, only HVENG is "good" # TODO: update test when we update MODE from HVENG to HVSCI - ds["mode"] = xr.DataArray(np.repeat(np.arange(total_sweeps), 12)) + ds["mode"] = xr.DataArray(np.repeat(["LVENG", "LVSCI", "HVENG"], 12)) bad_data_indices = filter_good_data(ds) np.testing.assert_array_equal(bad_data_indices, np.arange(24, 36)) # Check for bad sweep_table data. # Reset MODE data and create first sweep to be mixed value - ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), 2)) + ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), "HVENG")) ds["sweep_table"][:12] = np.arange(0, 12) np.testing.assert_array_equal(filter_good_data(ds), np.arange(12, 36)) @@ -87,7 +81,9 @@ def test_find_sweep_starts(): """Test for find sweep starts""" time = np.arange(26) sequence_number = time % 12 - ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time}) + ds = xr.Dataset( + {"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)} + ) start_indices = find_sweep_starts(ds) np.testing.assert_array_equal(start_indices, [0, 12]) @@ -107,7 +103,9 @@ def test_get_full_indices(): """Test for correct full sweep indices""" time = np.arange(26) sequence_number = time % 12 - ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time}) + ds = xr.Dataset( + {"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)} + ) sweep_indices = get_indices_of_full_sweep(ds) np.testing.assert_array_equal(sweep_indices, np.arange(0, 24)) @@ -115,10 +113,7 @@ def test_get_full_indices(): def test_swapi_algorithm(decom_test_data): """Test SWAPI L1 algorithm""" - grouped_data = group_by_apid(decom_test_data) - science_data = grouped_data[SWAPIAPID.SWP_SCI] - sorted_packets = sort_by_time(science_data, "SHCOARSE") - ds_data = create_dataset(sorted_packets, include_header=False) + ds_data = decom_test_data[SWAPIAPID.SWP_SCI] full_sweep_indices = get_indices_of_full_sweep(ds_data) full_sweep_sci = ds_data.isel({"epoch": full_sweep_indices}) total_packets = len(full_sweep_sci["seq_number"].data) @@ -208,10 +203,7 @@ def test_swapi_algorithm(decom_test_data): def test_process_swapi_science(decom_test_data): """Test process swapi science""" - grouped_data = group_by_apid(decom_test_data) - science_data = grouped_data[SWAPIAPID.SWP_SCI] - sorted_packets = sort_by_time(science_data, "SHCOARSE") - ds_data = create_dataset(sorted_packets, include_header=False) + ds_data = decom_test_data[SWAPIAPID.SWP_SCI] processed_data = process_swapi_science(ds_data, data_version="001") # Test dataset dimensions @@ -333,5 +325,6 @@ def test_swapi_l1_cdf(): # hk cdf file cdf_filename = "imap_swapi_l1_hk_20100101_v001.cdf" - cdf_path = write_cdf(processed_data[1]) + # Ignore ISTP checks for HK data + cdf_path = write_cdf(processed_data[1], istp=False) assert cdf_path.name == cdf_filename diff --git a/imap_processing/utils.py b/imap_processing/utils.py index 0553df428..36de8fa2d 100644 --- a/imap_processing/utils.py +++ b/imap_processing/utils.py @@ -3,12 +3,13 @@ import collections import dataclasses import logging +from pathlib import Path from typing import Optional import numpy as np import pandas as pd import xarray as xr -from space_packet_parser.parser import Packet +from space_packet_parser import parser, xtcedef from imap_processing.cdf.global_attrs import ConstantCoordinates from imap_processing.cdf.utils import met_to_j2000ns @@ -142,7 +143,7 @@ def convert_raw_to_eu( def create_dataset( - packets: list[Packet], + packets: list[parser.Packet], spacecraft_time_key: str = "shcoarse", include_header: bool = True, skip_keys: Optional[list[str]] = None, @@ -254,3 +255,86 @@ def update_epoch_to_datetime(dataset: xr.Dataset) -> xr.Dataset: dataset = dataset.assign_coords(epoch=epoch) return dataset + + +def packet_file_to_datasets( + packet_file: str | Path, xtce_packet_definition: str +) -> dict[int, xr.Dataset]: + """ + Convert a packet file to xarray datasets. + + The packet file can contain multiple apids and these will be separated + into distinct datasets, one per apid. The datasets will contain the + ``derived_value``s of the data fields, and the ``raw_value``s if no + ``derived_value`` is available. If there are conversions in the XTCE + packet definition, the ``derived_value`` will be the converted value. + The dimension of the dataset will be the time field in J2000 nanoseconds. + + Parameters + ---------- + packet_file : str + Path to data packet path with filename. + xtce_packet_definition : str + Path to XTCE file with filename. + + Returns + ------- + datasets : dict + Mapping from apid to xarray dataset, one dataset per apid. + """ + # Set up containers to store our data + # We are getting a packet file that may contain multiple apids + # Each apid has consistent data fields, so we want to create a + # dataset per apid. + # {apid1: dataset1, apid2: dataset2, ...} + data_dict: dict[int, dict] = dict() + attribute_dict: dict[int, dict] = dict() + + # Set up the parser from the input packet definition + packet_definition = xtcedef.XtcePacketDefinition(xtce_packet_definition) + packet_parser = parser.PacketParser(packet_definition) + + with open(packet_file, "rb") as binary_data: + packet_generator = packet_parser.generator(binary_data) + for packet in packet_generator: + apid = packet.header["PKT_APID"].raw_value + if apid not in data_dict: + # This is the first packet for this APID + data_dict[apid] = collections.defaultdict(list) + attribute_dict[apid] = collections.defaultdict(dict) + + # Fill in the attributes + for key, value in packet.data.items(): + attribute_dict[apid][key]["short_description"] = ( + value.short_description + ) + attribute_dict[apid][key]["long_description"] = ( + value.long_description + ) + # TODO: Additional info from XTCE like units + + # TODO: Include the header too? (packet.header | packet.data) + for key, value in packet.data.items(): + # TODO: Do we want derived_value or raw_value? + # Right now we use a derived value is there is one, otherwise raw value + # Maybe we want these to be separated into l1a (raw) and l1b (derived)? + data_dict[apid][key].append(value.derived_value or value.raw_value) + + dataset_by_apid = {} + # Convert each apid's data to an xarray dataset + for apid, data in data_dict.items(): + # The time key is always the first key in the data dictionary on IMAP + time_key = next(iter(data.keys())) + # Convert to J2000 time and use that as our primary dimension + time_data = met_to_j2000ns(data[time_key]) + ds = xr.Dataset( + {key.lower(): ("epoch", val) for key, val in data.items()}, + coords={"epoch": time_data}, + ) + for key, value in attribute_dict[apid].items(): + ds[key.lower()].attrs.update(value) + ds = ds.sortby("epoch") + + dataset_by_apid[apid] = ds + + return dataset_by_apid