From 336f7f73a3ff597ee240c6a2ccc12c951622b25b Mon Sep 17 00:00:00 2001 From: Laura Sandoval Date: Wed, 13 Mar 2024 16:41:51 -0600 Subject: [PATCH] updates to decom in order to accomodate no duplicate timestamps --- imap_processing/cdf/utils.py | 3 +- .../tests/ultra/unit/test_decom_apid_883.py | 12 +- imap_processing/ultra/l0/decom_ultra.py | 272 +++++++++++++----- imap_processing/ultra/ultra_cdf_attrs.py | 13 +- 4 files changed, 216 insertions(+), 84 deletions(-) diff --git a/imap_processing/cdf/utils.py b/imap_processing/cdf/utils.py index 301fe8b60..e6ca9442a 100644 --- a/imap_processing/cdf/utils.py +++ b/imap_processing/cdf/utils.py @@ -33,7 +33,8 @@ def calc_start_time(shcoarse_time: int): """ # Get the datetime of Jan 1 2010 as the start date launch_time = np.datetime64("2010-01-01T00:01:06.184") - return launch_time + np.timedelta64(shcoarse_time, "s") + time_delta = np.timedelta64(int(shcoarse_time * 1e9), "ns") + return launch_time + time_delta def write_cdf(data: xr.Dataset, filepath: Path): diff --git a/imap_processing/tests/ultra/unit/test_decom_apid_883.py b/imap_processing/tests/ultra/unit/test_decom_apid_883.py index 270db233d..27b48468c 100644 --- a/imap_processing/tests/ultra/unit/test_decom_apid_883.py +++ b/imap_processing/tests/ultra/unit/test_decom_apid_883.py @@ -21,17 +21,17 @@ def test_tof_decom(decom_ultra, tof_test_path): df = pd.read_csv(tof_test_path, index_col="SequenceCount") - np.testing.assert_array_equal(df.Spin, decom_ultra["SPIN"]) - np.testing.assert_array_equal(df.AbortFlag, decom_ultra["ABORTFLAG"]) - np.testing.assert_array_equal(df.StartDelay, decom_ultra["STARTDELAY"]) - assert json.loads(df["P00s"].values[0])[0] == decom_ultra["P00"][0] + np.testing.assert_array_equal(df.Spin, decom_ultra["SPIN"].flatten()) + np.testing.assert_array_equal(df.AbortFlag, decom_ultra["ABORTFLAG"].flatten()) + np.testing.assert_array_equal(df.StartDelay, decom_ultra["STARTDELAY"].flatten()) + assert json.loads(df["P00s"].values[0])[0] == decom_ultra["P00"][0][0] for count in df.index.get_level_values("SequenceCount").values: df_data = df[df.index.get_level_values("SequenceCount") == count].Images.values[ 0 ] - index = decom_ultra["SRC_SEQ_CTR"].index(count) - decom_data = decom_ultra["PACKETDATA"][index] + rows, cols = np.where(decom_ultra["SRC_SEQ_CTR"] == count) + decom_data = decom_ultra["PACKETDATA"][rows[0]][cols[0]] df_data_array = np.array(json.loads(df_data)[0]) np.testing.assert_array_equal(df_data_array, decom_data) diff --git a/imap_processing/ultra/l0/decom_ultra.py b/imap_processing/ultra/l0/decom_ultra.py index de1d0ba8e..87be8a2e7 100644 --- a/imap_processing/ultra/l0/decom_ultra.py +++ b/imap_processing/ultra/l0/decom_ultra.py @@ -1,9 +1,11 @@ """Decommutates Ultra CCSDS packets.""" - +import collections import logging from collections import defaultdict from pathlib import Path +import numpy as np + from imap_processing import decom from imap_processing.ccsds.ccsds_data import CcsdsData from imap_processing.ultra.l0.decom_tools import ( @@ -25,8 +27,12 @@ logger = logging.getLogger(__name__) -def append_params( - decom_data: dict, packet, decompressed_data=None, decompressed_key=None +def append_tof_params( + decom_data: dict, + packet, + decompressed_data: list, + data_dict: dict, + stacked_dict: dict, ): """ Append parsed items to a dictionary, including decompressed data if available. @@ -39,13 +45,53 @@ def append_params( Individual packet. decompressed_data : list Data that has been decompressed. - decompressed_key : str - Key for decompressed data. + data_dict : dict + Dictionary used for stacking in SID dimension. + stacked_dict : dict + Dictionary used for stacking in time dimension. + """ + # TODO: add error handling to make certain every timestamp has 8 SID values + shcoarse = packet.data["SHCOARSE"].derived_value + + for key in packet.data.keys(): + # Keep appending packet data until SID = 7 + if key == "PACKETDATA": + data_dict[key].append(decompressed_data) + # SHCOARSE should be unique + elif key == "SHCOARSE" and shcoarse not in decom_data["SHCOARSE"]: + decom_data[key].append(packet.data[key].derived_value) + # Keep appending all other data until SID = 7 + else: + data_dict[key].append(packet.data[key].derived_value) + + # Append CCSDS fields to the dictionary + ccsds_data = CcsdsData(packet.header) + append_ccsds_fields(data_dict, ccsds_data) + + # Once "SID" reaches 7, we have all the images and data for the single timestamp + if packet.data["SID"].derived_value == 7: + for key in packet.data.keys(): + if key != "SHCOARSE": + stacked_dict[key].append(np.stack(data_dict[key])) + data_dict[key].clear() + for key in packet.data.keys(): + stacked_dict[key].append(np.stack(data_dict[key])) + data_dict[key].clear() + + +def append_params(decom_data: dict, packet): + """ + Append parsed items to a dictionary, including decompressed data if available. + + Parameters + ---------- + decom_data : dict + Dictionary to which the data is appended. + packet : space_packet_parser.parser.Packet + Individual packet. """ for key, item in packet.data.items(): - decom_data[key].append( - decompressed_data if key == decompressed_key else item.derived_value - ) + decom_data[key].append(item.derived_value) ccsds_data = CcsdsData(packet.header) append_ccsds_fields(decom_data, ccsds_data) @@ -73,71 +119,155 @@ def decom_ultra_apids(packet_file: Path, xtce: Path, apid: int): grouped_data = group_by_apid(packets) data = {apid: grouped_data[apid]} - decom_data = defaultdict(list) - - # Convert decom_data to defaultdict(list) if it's not already - if not isinstance(decom_data, defaultdict): - decom_data = defaultdict(list, decom_data) - - for apid in data: - if not any( - apid in category.apid - for category in [ - ULTRA_EVENTS, - ULTRA_AUX, - ULTRA_TOF, - ULTRA_RATES, - ] - ): - logger.info(f"{apid} is currently not supported") - continue - - sorted_packets = sort_by_time(data[apid], "SHCOARSE") - - for packet in sorted_packets: - # Here there are multiple images in a single packet, - # so we need to loop through each image and decompress it. - if apid in ULTRA_EVENTS.apid: - decom_data = read_image_raw_events_binary(packet, decom_data) - count = packet.data["COUNT"].derived_value - - if count == 0: - append_params(decom_data, packet) - else: - for i in range(count): - logging.info(f"Appending image #{i}") - append_params(decom_data, packet) - - elif apid in ULTRA_AUX.apid: - append_params(decom_data, packet) + # Strategy dict maps APIDs to their respective processing functions + strategy_dict = { + ULTRA_TOF.apid[0]: process_ultra_tof, + ULTRA_EVENTS.apid[0]: process_ultra_events, + ULTRA_AUX.apid[0]: process_ultra_aux, + ULTRA_RATES.apid[0]: process_ultra_rates, + } + + sorted_packets = sort_by_time(data[apid], "SHCOARSE") + + process_function = strategy_dict.get(apid) + decom_data = process_function(sorted_packets, defaultdict(list)) + + return decom_data + + +def process_ultra_tof(sorted_packets: list, decom_data: collections.defaultdict): + """ + Unpack and decode Ultra TOF packets. + + Parameters + ---------- + sorted_packets : list + TOF packets sorted by time. + decom_data : collections.defaultdict + Empty dictionary. + + Returns + ------- + decom_data : dict + A dictionary containing the decoded data. + """ + stacked_dict = defaultdict(list) + data_dict = defaultdict(list) + + # For TOF we need to sort by time and then SID + sorted_packets = sorted( + sorted_packets, + key=lambda x: (x.data["SHCOARSE"].raw_value, x.data["SID"].raw_value), + ) + + for packet in sorted_packets: + # Decompress the image data + decompressed_data = decompress_image( + packet.data["P00"].derived_value, + packet.data["PACKETDATA"].raw_value, + ULTRA_TOF.width, + ULTRA_TOF.mantissa_bit_length, + ) + + # Append the decompressed data and other derived data + # to the dictionary + append_tof_params( + decom_data, + packet, + decompressed_data=decompressed_data, + data_dict=data_dict, + stacked_dict=stacked_dict, + ) + + # Stack the data to create required dimensions + for key in stacked_dict.keys(): + decom_data[key] = np.stack(stacked_dict[key]) + + return decom_data + + +def process_ultra_events(sorted_packets: list, decom_data: dict): + """ + Unpack and decode Ultra EVENTS packets. - elif apid in ULTRA_TOF.apid: - decompressed_data = decompress_image( - packet.data["P00"].derived_value, - packet.data["PACKETDATA"].raw_value, - ULTRA_TOF.width, - ULTRA_TOF.mantissa_bit_length, - ) - - append_params( - decom_data, - packet, - decompressed_data=decompressed_data, - decompressed_key="PACKETDATA", - ) - - elif apid in ULTRA_RATES.apid: - decompressed_data = decompress_binary( - packet.data["FASTDATA_00"].raw_value, - ULTRA_RATES.width, - ULTRA_RATES.block, - ULTRA_RATES.len_array, - ULTRA_RATES.mantissa_bit_length, - ) - - for index in range(ULTRA_RATES.len_array): - decom_data[RATES_KEYS[index]].append(decompressed_data[index]) + Parameters + ---------- + sorted_packets : list + TOF packets sorted by time. + decom_data : collections.defaultdict + Empty dictionary. + + Returns + ------- + decom_data : dict + A dictionary containing the decoded data. + """ + for packet in sorted_packets: + # Here there are multiple images in a single packet, + # so we need to loop through each image and decompress it. + decom_data = read_image_raw_events_binary(packet, decom_data) + count = packet.data["COUNT"].derived_value + if count == 0: + append_params(decom_data, packet) + else: + for i in range(count): + logging.info(f"Appending image #{i}") append_params(decom_data, packet) return decom_data + + +def process_ultra_aux(sorted_packets: list, decom_data: dict): + """ + Unpack and decode Ultra AUX packets. + + Parameters + ---------- + sorted_packets : list + TOF packets sorted by time. + decom_data : collections.defaultdict + Empty dictionary. + + Returns + ------- + decom_data : dict + A dictionary containing the decoded data. + """ + for packet in sorted_packets: + append_params(decom_data, packet) + + return decom_data + + +def process_ultra_rates(sorted_packets: list, decom_data: dict): + """ + Unpack and decode Ultra RATES packets. + + Parameters + ---------- + sorted_packets : list + TOF packets sorted by time. + decom_data : collections.defaultdict + Empty dictionary. + + Returns + ------- + decom_data : dict + A dictionary containing the decoded data. + """ + for packet in sorted_packets: + decompressed_data = decompress_binary( + packet.data["FASTDATA_00"].raw_value, + ULTRA_RATES.width, + ULTRA_RATES.block, + ULTRA_RATES.len_array, + ULTRA_RATES.mantissa_bit_length, + ) + + for index in range(ULTRA_RATES.len_array): + decom_data[RATES_KEYS[index]].append(decompressed_data[index]) + + append_params(decom_data, packet) + + return decom_data diff --git a/imap_processing/ultra/ultra_cdf_attrs.py b/imap_processing/ultra/ultra_cdf_attrs.py index 9547d55da..4ecd1d80f 100644 --- a/imap_processing/ultra/ultra_cdf_attrs.py +++ b/imap_processing/ultra/ultra_cdf_attrs.py @@ -32,28 +32,29 @@ instrument_base=ultra_base, ) -ultra_support_attrs = AttrBase( +ultra_support_attrs = ScienceAttrs( validmin=GlobalConstants.INT_FILLVAL, validmax=GlobalConstants.INT_MAXVAL, - display_type="no_plot", + display_type="time_series", fill_val=GlobalConstants.INT_FILLVAL, format="I12", var_type="support_data", label_axis="none", + depend_0="epoch", ) -ultra_metadata_attrs = ScienceAttrs( +ultra_metadata_attrs = AttrBase( validmin=GlobalConstants.INT_FILLVAL, validmax=GlobalConstants.INT_MAXVAL, - depend_0="Epoch", - display_type="no_plot", + display_type="time_series", fill_val=GlobalConstants.INT_FILLVAL, format="I12", + label_axis="none", var_type="metadata", ) # Required attrs for string data type, # meaning array with string. string_base = StringAttrs( - depend_0="Epoch", + depend_0="epoch", )