Skip to content

Commit

Permalink
Lo DE Segmented Packets (#1016)
Browse files Browse the repository at this point in the history
* added function for combining segmented packets

* using np.nonzero instead of np.where

* updated for padding identification
  • Loading branch information
sdhoyt authored Oct 28, 2024
1 parent 04e8a9a commit ce33595
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 0 deletions.
86 changes: 86 additions & 0 deletions imap_processing/lo/l0/lo_science.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import xarray as xr

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hit.l0.decom_hit import is_sequential
from imap_processing.lo.l0.decompression_tables.decompression_tables import (
CASE_DECODER,
DE_BIT_SHIFT,
Expand Down Expand Up @@ -321,3 +322,88 @@ def parse_de_bin(
<< bit_shift
)
return parsed_int


def combine_segmented_packets(dataset: xr.Dataset) -> xr.Dataset:
"""
Combine segmented packets.
If the number of bits needed to pack the direct events exceeds the
maximum number of bits allowed in a packet, the direct events
will be spread across multiple packets. This function will combine
the segmented binary into a single binary string for each epoch.
Parameters
----------
dataset : xr.Dataset
Lo science direct events from packets_to_dataset function.
Returns
-------
dataset : xr.Dataset
Updated dataset with any segmented direct events combined.
"""
seq_flgs = dataset.seq_flgs.values
seq_ctrs = dataset.src_seq_ctr.values

# Find the start and end of each segment of direct events
# 1 = start of a group of segmented packet
# 2 = end of a group of segmented packets
# 3 = unsegmented packet
seg_starts = np.nonzero((seq_flgs == 1) | (seq_flgs == 3))[0]
seg_ends = np.nonzero((seq_flgs == 2) | (seq_flgs == 3))[0]
# Swap the epoch dimension for the shcoarse
# the epoch dimension will be reduced to the
# first epoch in each segment
dataset.coords["shcoarse"] = dataset["shcoarse"]
dataset = dataset.swap_dims({"epoch": "shcoarse"})

# Find the valid groups of segmented packets
# returns a list of booleans for each group of segmented packets
# where true means the group is valid
valid_groups = find_valid_groups(seq_ctrs, seg_starts, seg_ends)

# Combine the segmented packets into a single binary string
# Mark the segment splits with comma to identify padding bits
# when parsing the binary
dataset["events"] = [
",".join(dataset["data"].values[start : end + 1])
for start, end in zip(seg_starts, seg_ends)
]
# drop any group of segmented packets that aren't sequential
dataset["events"] = dataset["events"].values[valid_groups]

# Update the epoch to the first epoch in the segment
dataset.coords["epoch"] = dataset["epoch"].values[seg_starts]
# drop any group of segmented epochs that aren't sequential
dataset.coords["epoch"] = dataset["epoch"].values[valid_groups]

return dataset


def find_valid_groups(
seq_ctrs: np.ndarray, seg_starts: np.ndarray, seg_ends: np.ndarray
) -> list[np.bool_]:
"""
Find the valid groups of segmented packets.
Parameters
----------
seq_ctrs : np.ndarray
Sequence counters from the CCSDS header.
seg_starts : np.ndarray
Start index of each group of segmented direct event packet.
seg_ends : np.ndarray
End index of each group of segmented direct event packet.
Returns
-------
valid_groups : list[np.bool_]
Valid groups of segmented packets.
"""
# Check if the sequence counters from the CCSDS header are sequential
grouped_seq_ctrs = [
np.array(seq_ctrs[start : end + 1]) for start, end in zip(seg_starts, seg_ends)
]
valid_groups = [is_sequential(seq_ctrs) for seq_ctrs in grouped_seq_ctrs]
return valid_groups
56 changes: 56 additions & 0 deletions imap_processing/tests/lo/test_lo_science.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.lo.l0.lo_science import (
combine_segmented_packets,
parse_de_bin,
parse_events,
parse_fixed_fields,
Expand Down Expand Up @@ -61,6 +62,36 @@ def fake_de_dataset():
return dataset


@pytest.fixture()
def segmented_pkts_fake_data():
dataset = xr.Dataset(
data_vars=dict(
seq_flgs=(["epoch"], np.array([1, 0, 0, 2, 3, 1, 0, 2, 1, 2])),
src_seq_ctr=(["epoch"], np.array([0, 1, 2, 3, 4, 5, 7, 8, 9, 10])),
shcoarse=(["epoch"], np.array([0, 0, 0, 0, 10, 20, 20, 20, 30, 30])),
data=(
["epoch"],
np.array(
[
"0000000001",
"0000000010",
"0000000100",
"0000001000",
"0000010000",
"0000100000",
"0001000000",
"0010000000",
"0100000000",
"1000000000",
]
),
),
),
coords=dict(epoch=(["epoch"], np.array([0, 0, 0, 0, 10, 20, 20, 20, 30, 30]))),
)
return dataset


@pytest.fixture()
def attr_mgr():
attr_mgr = ImapCdfAttributes()
Expand Down Expand Up @@ -146,3 +177,28 @@ def test_parse_de_bin(initialized_dataset):
parsed_int = parse_de_bin(initialized_dataset, 0, 4, 0)
# Assert
assert parsed_int == 0


def test_combine_segmented_packets(segmented_pkts_fake_data):
dataset = combine_segmented_packets(segmented_pkts_fake_data)

np.testing.assert_array_equal(
dataset["seq_flgs"].values, np.array([1, 0, 0, 2, 3, 1, 0, 2, 1, 2])
)
np.testing.assert_array_equal(
dataset["src_seq_ctr"].values, np.array([0, 1, 2, 3, 4, 5, 7, 8, 9, 10])
)
np.testing.assert_array_equal(
dataset["shcoarse"].values, np.array([0, 0, 0, 0, 10, 20, 20, 20, 30, 30])
)
np.testing.assert_array_equal(
dataset["events"].values,
np.array(
[
"0000000001,0000000010,0000000100,0000001000",
"0000010000",
"0100000000,1000000000",
]
),
)
np.testing.assert_array_equal(dataset["epoch"].values, np.array([0, 10, 30]))

0 comments on commit ce33595

Please sign in to comment.