Skip to content

Commit

Permalink
updates to decom in order to accomodate no duplicate timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
laspsandoval committed Mar 13, 2024
1 parent f075c84 commit 336f7f7
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 84 deletions.
3 changes: 2 additions & 1 deletion imap_processing/cdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def calc_start_time(shcoarse_time: int):
"""
# Get the datetime of Jan 1 2010 as the start date
launch_time = np.datetime64("2010-01-01T00:01:06.184")
return launch_time + np.timedelta64(shcoarse_time, "s")
time_delta = np.timedelta64(int(shcoarse_time * 1e9), "ns")
return launch_time + time_delta


def write_cdf(data: xr.Dataset, filepath: Path):
Expand Down
12 changes: 6 additions & 6 deletions imap_processing/tests/ultra/unit/test_decom_apid_883.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ def test_tof_decom(decom_ultra, tof_test_path):

df = pd.read_csv(tof_test_path, index_col="SequenceCount")

np.testing.assert_array_equal(df.Spin, decom_ultra["SPIN"])
np.testing.assert_array_equal(df.AbortFlag, decom_ultra["ABORTFLAG"])
np.testing.assert_array_equal(df.StartDelay, decom_ultra["STARTDELAY"])
assert json.loads(df["P00s"].values[0])[0] == decom_ultra["P00"][0]
np.testing.assert_array_equal(df.Spin, decom_ultra["SPIN"].flatten())
np.testing.assert_array_equal(df.AbortFlag, decom_ultra["ABORTFLAG"].flatten())
np.testing.assert_array_equal(df.StartDelay, decom_ultra["STARTDELAY"].flatten())
assert json.loads(df["P00s"].values[0])[0] == decom_ultra["P00"][0][0]

for count in df.index.get_level_values("SequenceCount").values:
df_data = df[df.index.get_level_values("SequenceCount") == count].Images.values[
0
]
index = decom_ultra["SRC_SEQ_CTR"].index(count)
decom_data = decom_ultra["PACKETDATA"][index]
rows, cols = np.where(decom_ultra["SRC_SEQ_CTR"] == count)
decom_data = decom_ultra["PACKETDATA"][rows[0]][cols[0]]
df_data_array = np.array(json.loads(df_data)[0])

np.testing.assert_array_equal(df_data_array, decom_data)
272 changes: 201 additions & 71 deletions imap_processing/ultra/l0/decom_ultra.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Decommutates Ultra CCSDS packets."""

import collections
import logging
from collections import defaultdict
from pathlib import Path

import numpy as np

from imap_processing import decom
from imap_processing.ccsds.ccsds_data import CcsdsData
from imap_processing.ultra.l0.decom_tools import (
Expand All @@ -25,8 +27,12 @@
logger = logging.getLogger(__name__)


def append_params(
decom_data: dict, packet, decompressed_data=None, decompressed_key=None
def append_tof_params(
decom_data: dict,
packet,
decompressed_data: list,
data_dict: dict,
stacked_dict: dict,
):
"""
Append parsed items to a dictionary, including decompressed data if available.
Expand All @@ -39,13 +45,53 @@ def append_params(
Individual packet.
decompressed_data : list
Data that has been decompressed.
decompressed_key : str
Key for decompressed data.
data_dict : dict
Dictionary used for stacking in SID dimension.
stacked_dict : dict
Dictionary used for stacking in time dimension.
"""
# TODO: add error handling to make certain every timestamp has 8 SID values
shcoarse = packet.data["SHCOARSE"].derived_value

for key in packet.data.keys():
# Keep appending packet data until SID = 7
if key == "PACKETDATA":
data_dict[key].append(decompressed_data)
# SHCOARSE should be unique
elif key == "SHCOARSE" and shcoarse not in decom_data["SHCOARSE"]:
decom_data[key].append(packet.data[key].derived_value)
# Keep appending all other data until SID = 7
else:
data_dict[key].append(packet.data[key].derived_value)

# Append CCSDS fields to the dictionary
ccsds_data = CcsdsData(packet.header)
append_ccsds_fields(data_dict, ccsds_data)

# Once "SID" reaches 7, we have all the images and data for the single timestamp
if packet.data["SID"].derived_value == 7:
for key in packet.data.keys():
if key != "SHCOARSE":
stacked_dict[key].append(np.stack(data_dict[key]))
data_dict[key].clear()
for key in packet.data.keys():
stacked_dict[key].append(np.stack(data_dict[key]))
data_dict[key].clear()


def append_params(decom_data: dict, packet):
"""
Append parsed items to a dictionary, including decompressed data if available.
Parameters
----------
decom_data : dict
Dictionary to which the data is appended.
packet : space_packet_parser.parser.Packet
Individual packet.
"""
for key, item in packet.data.items():
decom_data[key].append(
decompressed_data if key == decompressed_key else item.derived_value
)
decom_data[key].append(item.derived_value)

ccsds_data = CcsdsData(packet.header)
append_ccsds_fields(decom_data, ccsds_data)
Expand Down Expand Up @@ -73,71 +119,155 @@ def decom_ultra_apids(packet_file: Path, xtce: Path, apid: int):
grouped_data = group_by_apid(packets)
data = {apid: grouped_data[apid]}

decom_data = defaultdict(list)

# Convert decom_data to defaultdict(list) if it's not already
if not isinstance(decom_data, defaultdict):
decom_data = defaultdict(list, decom_data)

for apid in data:
if not any(
apid in category.apid
for category in [
ULTRA_EVENTS,
ULTRA_AUX,
ULTRA_TOF,
ULTRA_RATES,
]
):
logger.info(f"{apid} is currently not supported")
continue

sorted_packets = sort_by_time(data[apid], "SHCOARSE")

for packet in sorted_packets:
# Here there are multiple images in a single packet,
# so we need to loop through each image and decompress it.
if apid in ULTRA_EVENTS.apid:
decom_data = read_image_raw_events_binary(packet, decom_data)
count = packet.data["COUNT"].derived_value

if count == 0:
append_params(decom_data, packet)
else:
for i in range(count):
logging.info(f"Appending image #{i}")
append_params(decom_data, packet)

elif apid in ULTRA_AUX.apid:
append_params(decom_data, packet)
# Strategy dict maps APIDs to their respective processing functions
strategy_dict = {
ULTRA_TOF.apid[0]: process_ultra_tof,
ULTRA_EVENTS.apid[0]: process_ultra_events,
ULTRA_AUX.apid[0]: process_ultra_aux,
ULTRA_RATES.apid[0]: process_ultra_rates,
}

sorted_packets = sort_by_time(data[apid], "SHCOARSE")

process_function = strategy_dict.get(apid)
decom_data = process_function(sorted_packets, defaultdict(list))

return decom_data


def process_ultra_tof(sorted_packets: list, decom_data: collections.defaultdict):
"""
Unpack and decode Ultra TOF packets.
Parameters
----------
sorted_packets : list
TOF packets sorted by time.
decom_data : collections.defaultdict
Empty dictionary.
Returns
-------
decom_data : dict
A dictionary containing the decoded data.
"""
stacked_dict = defaultdict(list)
data_dict = defaultdict(list)

# For TOF we need to sort by time and then SID
sorted_packets = sorted(
sorted_packets,
key=lambda x: (x.data["SHCOARSE"].raw_value, x.data["SID"].raw_value),
)

for packet in sorted_packets:
# Decompress the image data
decompressed_data = decompress_image(
packet.data["P00"].derived_value,
packet.data["PACKETDATA"].raw_value,
ULTRA_TOF.width,
ULTRA_TOF.mantissa_bit_length,
)

# Append the decompressed data and other derived data
# to the dictionary
append_tof_params(
decom_data,
packet,
decompressed_data=decompressed_data,
data_dict=data_dict,
stacked_dict=stacked_dict,
)

# Stack the data to create required dimensions
for key in stacked_dict.keys():
decom_data[key] = np.stack(stacked_dict[key])

return decom_data


def process_ultra_events(sorted_packets: list, decom_data: dict):
"""
Unpack and decode Ultra EVENTS packets.
elif apid in ULTRA_TOF.apid:
decompressed_data = decompress_image(
packet.data["P00"].derived_value,
packet.data["PACKETDATA"].raw_value,
ULTRA_TOF.width,
ULTRA_TOF.mantissa_bit_length,
)

append_params(
decom_data,
packet,
decompressed_data=decompressed_data,
decompressed_key="PACKETDATA",
)

elif apid in ULTRA_RATES.apid:
decompressed_data = decompress_binary(
packet.data["FASTDATA_00"].raw_value,
ULTRA_RATES.width,
ULTRA_RATES.block,
ULTRA_RATES.len_array,
ULTRA_RATES.mantissa_bit_length,
)

for index in range(ULTRA_RATES.len_array):
decom_data[RATES_KEYS[index]].append(decompressed_data[index])
Parameters
----------
sorted_packets : list
TOF packets sorted by time.
decom_data : collections.defaultdict
Empty dictionary.
Returns
-------
decom_data : dict
A dictionary containing the decoded data.
"""
for packet in sorted_packets:
# Here there are multiple images in a single packet,
# so we need to loop through each image and decompress it.
decom_data = read_image_raw_events_binary(packet, decom_data)
count = packet.data["COUNT"].derived_value

if count == 0:
append_params(decom_data, packet)
else:
for i in range(count):
logging.info(f"Appending image #{i}")
append_params(decom_data, packet)

return decom_data


def process_ultra_aux(sorted_packets: list, decom_data: dict):
"""
Unpack and decode Ultra AUX packets.
Parameters
----------
sorted_packets : list
TOF packets sorted by time.
decom_data : collections.defaultdict
Empty dictionary.
Returns
-------
decom_data : dict
A dictionary containing the decoded data.
"""
for packet in sorted_packets:
append_params(decom_data, packet)

return decom_data


def process_ultra_rates(sorted_packets: list, decom_data: dict):
"""
Unpack and decode Ultra RATES packets.
Parameters
----------
sorted_packets : list
TOF packets sorted by time.
decom_data : collections.defaultdict
Empty dictionary.
Returns
-------
decom_data : dict
A dictionary containing the decoded data.
"""
for packet in sorted_packets:
decompressed_data = decompress_binary(
packet.data["FASTDATA_00"].raw_value,
ULTRA_RATES.width,
ULTRA_RATES.block,
ULTRA_RATES.len_array,
ULTRA_RATES.mantissa_bit_length,
)

for index in range(ULTRA_RATES.len_array):
decom_data[RATES_KEYS[index]].append(decompressed_data[index])

append_params(decom_data, packet)

return decom_data
13 changes: 7 additions & 6 deletions imap_processing/ultra/ultra_cdf_attrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,29 @@
instrument_base=ultra_base,
)

ultra_support_attrs = AttrBase(
ultra_support_attrs = ScienceAttrs(
validmin=GlobalConstants.INT_FILLVAL,
validmax=GlobalConstants.INT_MAXVAL,
display_type="no_plot",
display_type="time_series",
fill_val=GlobalConstants.INT_FILLVAL,
format="I12",
var_type="support_data",
label_axis="none",
depend_0="epoch",
)

ultra_metadata_attrs = ScienceAttrs(
ultra_metadata_attrs = AttrBase(
validmin=GlobalConstants.INT_FILLVAL,
validmax=GlobalConstants.INT_MAXVAL,
depend_0="Epoch",
display_type="no_plot",
display_type="time_series",
fill_val=GlobalConstants.INT_FILLVAL,
format="I12",
label_axis="none",
var_type="metadata",
)

# Required attrs for string data type,
# meaning array with string.
string_base = StringAttrs(
depend_0="Epoch",
depend_0="epoch",
)

0 comments on commit 336f7f7

Please sign in to comment.