Skip to content

Commit

Permalink
ENH/MNT: Add a common initial dataset creation routine (#687)
Browse files Browse the repository at this point in the history
* ENH/MNT: Add a common initial dataset creation routine

This adds a common interface for returning xarray datasets from
a packet file. There is one dataset per apid, sorted by time, with
derived values automatically converted.

* Update Hi and SWAPI to use this new feature
  • Loading branch information
greglucas authored Jul 22, 2024
1 parent 2ba60f1 commit 1e5692b
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 111 deletions.
5 changes: 4 additions & 1 deletion imap_processing/cdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def load_cdf(
return dataset


def write_cdf(dataset: xr.Dataset) -> Path:
def write_cdf(dataset: xr.Dataset, **extra_cdf_kwargs: dict) -> Path:
"""
Write the contents of "data" to a CDF file using cdflib.xarray_to_cdf.
Expand All @@ -118,6 +118,8 @@ def write_cdf(dataset: xr.Dataset) -> Path:
----------
dataset : xarray.Dataset
The dataset object to convert to a CDF.
**extra_cdf_kwargs : dict
Additional keyword arguments to pass to the ``xarray_to_cdf`` function.
Returns
-------
Expand Down Expand Up @@ -166,6 +168,7 @@ def write_cdf(dataset: xr.Dataset) -> Path:
dataset,
str(file_path),
terminate_on_warning=True,
**extra_cdf_kwargs,
) # Terminate if not ISTP compliant

return file_path
26 changes: 15 additions & 11 deletions imap_processing/hi/l1a/hi_l1a.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""IMAP-HI L1A processing module."""

import logging
from pathlib import Path
from typing import Union

import xarray as xr

from imap_processing.hi.l0 import decom_hi
from imap_processing import imap_module_directory
from imap_processing.hi.l1a.histogram import create_dataset as hist_create_dataset
from imap_processing.hi.l1a.housekeeping import process_housekeeping
from imap_processing.hi.l1a.science_direct_event import science_direct_event
from imap_processing.hi.utils import HIAPID
from imap_processing.utils import group_by_apid
from imap_processing.utils import packet_file_to_datasets

logger = logging.getLogger(__name__)


def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]:
def hi_l1a(packet_file_path: Union[str, Path], data_version: str) -> list[xr.Dataset]:
"""
Will process IMAP raw data to l1a.
Expand All @@ -30,30 +32,32 @@ def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]:
processed_data : list[xarray.Dataset]
List of processed xarray dataset.
"""
unpacked_data = decom_hi.decom_packets(packet_file_path)

# group data by apid
grouped_data = group_by_apid(unpacked_data)
packet_def_file = (
imap_module_directory / "hi/packet_definitions/hi_packet_definition.xml"
)
datasets_by_apid = packet_file_to_datasets(
packet_file=packet_file_path, xtce_packet_definition=packet_def_file
)

# Process science to l1a.
processed_data = []
for apid in grouped_data.keys():
for apid in datasets_by_apid:
if apid == HIAPID.H45_SCI_CNT:
logger.info(
"Processing histogram data for [%s] packets", HIAPID.H45_SCI_CNT.name
)
data = hist_create_dataset(grouped_data[apid])
data = hist_create_dataset(datasets_by_apid[apid])
elif apid == HIAPID.H45_SCI_DE:
logger.info(
"Processing direct event data for [%s] packets", HIAPID.H45_SCI_DE.name
)

data = science_direct_event(grouped_data[apid])
data = science_direct_event(datasets_by_apid[apid])
elif apid == HIAPID.H45_APP_NHK:
logger.info(
"Processing housekeeping data for [%s] packets", HIAPID.H45_APP_NHK.name
)
data = process_housekeeping(grouped_data[apid])
data = process_housekeeping(datasets_by_apid[apid])
else:
raise RuntimeError(f"Encountered unexpected APID [{apid}]")

Expand Down
26 changes: 14 additions & 12 deletions imap_processing/hi/l1a/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import numpy as np
import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import met_to_j2000ns

# define the names of the 24 counter arrays
# contained in the histogram packet
Expand Down Expand Up @@ -35,30 +33,34 @@
TOTAL_COUNTERS = ("total_a", "total_b", "total_c", "fee_de_sent", "fee_de_recd")


def create_dataset(packets: list[Packet]) -> xr.Dataset:
def create_dataset(input_ds: xr.Dataset) -> xr.Dataset:
"""
Create dataset for a number of Hi Histogram packets.
Parameters
----------
packets : list[space_packet_parser.ParsedPacket]
Packet list.
input_ds : xarray.Dataset
Dataset of packets.
Returns
-------
dataset : xarray.Dataset
Dataset with all metadata field data in xr.DataArray.
"""
dataset = allocate_histogram_dataset(len(packets))
dataset = allocate_histogram_dataset(len(input_ds.epoch))

# unpack the packets data into the Dataset
for i_epoch, packet in enumerate(packets):
dataset.epoch.data[i_epoch] = met_to_j2000ns(packet.data["CCSDS_MET"].raw_value)
dataset.ccsds_met[i_epoch] = packet.data["CCSDS_MET"].raw_value
dataset.esa_stepping_num[i_epoch] = packet.data["ESA_STEP"].raw_value
# TODO: Move into the allocate dataset function?
dataset["epoch"].data[:] = input_ds["epoch"].data
dataset["ccsds_met"].data = input_ds["ccsds_met"].data
dataset["esa_stepping_num"].data = input_ds["esa_step"].data

# unpack the packets data into the Dataset
# (npackets, 24 * 90 * 12)
# TODO: Look into avoiding the for-loops below
# It seems like we could try to reshape the arrays and do some numpy
# broadcasting rather than for-loops directly here
for i_epoch, counters_binary_data in enumerate(input_ds["counters"].data):
# unpack 24 arrays of 90 12-bit unsigned integers
counters_binary_data = packet.data["COUNTERS"].raw_value
counter_ints = [
int(counters_binary_data[i * 12 : (i + 1) * 12], 2) for i in range(90 * 24)
]
Expand Down
14 changes: 3 additions & 11 deletions imap_processing/hi/l1a/housekeeping.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
"""Unpack IMAP-Hi housekeeping data."""

import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing.hi.hi_cdf_attrs import (
hi_hk_l1a_attrs,
)
from imap_processing.utils import create_dataset, update_epoch_to_datetime


def process_housekeeping(packets: list[Packet]) -> xr.Dataset:
def process_housekeeping(dataset: xr.Dataset) -> xr.Dataset:
"""
Create dataset for each metadata field.
Parameters
----------
packets : list[space_packet_parser.ParsedPacket]
Packet list.
dataset : xarray.Dataset
Packet input dataset.
Returns
-------
dataset : xarray.Dataset
Dataset with all metadata field data in xr.DataArray.
"""
dataset = create_dataset(
packets=packets, spacecraft_time_key="ccsds_met", skip_keys=["INSTR_SPECIFIC"]
)
# Update epoch to datetime
dataset = update_epoch_to_datetime(dataset)

# Add datalevel attrs
dataset.attrs.update(hi_hk_l1a_attrs.output())
return dataset
13 changes: 6 additions & 7 deletions imap_processing/hi/l1a/science_direct_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import numpy as np
import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing import imap_module_directory
from imap_processing.cdf.cdf_attribute_manager import CdfAttributeManager
Expand Down Expand Up @@ -297,7 +296,7 @@ def create_dataset(de_data_list: list, packet_met_time: list) -> xr.Dataset:
return dataset


def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:
def science_direct_event(packets_data: xr.Dataset) -> xr.Dataset:
"""
Unpack IMAP-Hi direct event data.
Expand All @@ -309,8 +308,8 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:
Parameters
----------
packets_data : list[space_packet_parser.ParsedPacket]
List of packets data.
packets_data : xarray.Dataset
Packets extracted into a dataset.
Returns
-------
Expand All @@ -324,14 +323,14 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:
# I am using extend to add another list to the
# end of the list. This way, I don't need to flatten
# the list later.
for data in packets_data:
for i, data in enumerate(packets_data["de_tof"].data):
# break binary stream data into unit of 48-bits
event_48bits_list = break_into_bits_size(data.data["DE_TOF"].raw_value)
event_48bits_list = break_into_bits_size(data)
# parse 48-bits into meaningful data such as metaevent or direct event
de_data_list.extend([parse_direct_event(event) for event in event_48bits_list])
# add packet time to packet_met_time
packet_met_time.extend(
[data.data["CCSDS_MET"].raw_value] * len(event_48bits_list)
[packets_data["ccsds_met"].data[i]] * len(event_48bits_list)
)

# create dataset
Expand Down
30 changes: 9 additions & 21 deletions imap_processing/swapi/l1/swapi_l1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.global_attrs import ConstantCoordinates
from imap_processing.cdf.utils import met_to_j2000ns
from imap_processing.decom import decom_packets
from imap_processing.swapi.swapi_cdf_attrs import (
compression_attrs,
counts_attrs,
Expand All @@ -19,12 +17,7 @@
uncertainty_attrs,
)
from imap_processing.swapi.swapi_utils import SWAPIAPID, SWAPIMODE
from imap_processing.utils import (
create_dataset,
group_by_apid,
sort_by_time,
update_epoch_to_datetime,
)
from imap_processing.utils import packet_file_to_datasets


def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray:
Expand Down Expand Up @@ -172,7 +165,7 @@ def find_sweep_starts(packets: xr.Dataset) -> np.ndarray:
#
# [0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0] # And all?

ione = diff == 1
ione = diff == 1e9 # 1 second

valid = (
(packets["seq_number"] == 0)[:-11]
Expand Down Expand Up @@ -474,10 +467,10 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
# ===================================================================

# epoch time. Should be same dimension as number of good sweeps
epoch_time = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0]
epoch_converted_time = met_to_j2000ns(epoch_time)
epoch_values = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0]

epoch_time = xr.DataArray(
epoch_converted_time,
epoch_values,
name="epoch",
dims=["epoch"],
attrs=ConstantCoordinates.EPOCH,
Expand Down Expand Up @@ -532,6 +525,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
)

# L1 quality flags
# TODO: Should these be kept in raw format rather than derived into strings?
dataset["swp_pcem_flags"] = xr.DataArray(
np.array(pcem_compression_flags, dtype=np.uint8),
dims=["epoch", "energy"],
Expand Down Expand Up @@ -626,25 +620,19 @@ def swapi_l1(file_path: str, data_version: str) -> xr.Dataset:
xtce_definition = (
f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml"
)
packets = decom_packets(
packet_file=file_path, xtce_packet_definition=xtce_definition
datasets = packet_file_to_datasets(
file_path, xtce_definition, use_derived_value=False
)
grouped_packets = group_by_apid(packets)
processed_data = []
for apid in grouped_packets.keys():
sorted_packets = sort_by_time(grouped_packets[apid], "SHCOARSE")
for apid, ds_data in datasets.items():
# Right now, we only process SWP_HK and SWP_SCI
# other packets are not process in this processing pipeline
# If appId is science, then the file should contain all data of science appId
ds_data = create_dataset(sorted_packets, include_header=False)

if apid == SWAPIAPID.SWP_SCI.value:
data = process_swapi_science(ds_data, data_version)
processed_data.append(data)
if apid == SWAPIAPID.SWP_HK.value:
# convert epoch to datetime
ds_data = update_epoch_to_datetime(ds_data)

# Add datalevel attrs
ds_data.attrs.update(swapi_l1_hk_attrs.output())
ds_data.attrs["Data_version"] = data_version
Expand Down
2 changes: 1 addition & 1 deletion imap_processing/tests/hi/test_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_app_nhk_decom():
# TODO: compare with validation data once we have it

# Write CDF
cem_raw_cdf_filepath = write_cdf(processed_data[0])
cem_raw_cdf_filepath = write_cdf(processed_data[0], istp=False)

# TODO: ask Vivek about this date mismatch between the file name
# and the data. May get resolved when we have good sample data.
Expand Down
Loading

0 comments on commit 1e5692b

Please sign in to comment.