Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH/MNT: Add a common initial dataset creation routine #687

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion imap_processing/cdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def load_cdf(
return dataset


def write_cdf(dataset: xr.Dataset) -> Path:
def write_cdf(dataset: xr.Dataset, **extra_cdf_kwargs: dict) -> Path:
"""
Write the contents of "data" to a CDF file using cdflib.xarray_to_cdf.
Expand All @@ -118,6 +118,8 @@ def write_cdf(dataset: xr.Dataset) -> Path:
----------
dataset : xarray.Dataset
The dataset object to convert to a CDF.
**extra_cdf_kwargs : dict
Additional keyword arguments to pass to the ``xarray_to_cdf`` function.
Returns
-------
Expand Down Expand Up @@ -166,6 +168,7 @@ def write_cdf(dataset: xr.Dataset) -> Path:
dataset,
str(file_path),
terminate_on_warning=True,
**extra_cdf_kwargs,
) # Terminate if not ISTP compliant

return file_path
26 changes: 15 additions & 11 deletions imap_processing/hi/l1a/hi_l1a.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
"""IMAP-HI L1A processing module."""

import logging
from pathlib import Path
from typing import Union

import xarray as xr

from imap_processing.hi.l0 import decom_hi
from imap_processing import imap_module_directory
from imap_processing.hi.l1a.histogram import create_dataset as hist_create_dataset
from imap_processing.hi.l1a.housekeeping import process_housekeeping
from imap_processing.hi.l1a.science_direct_event import science_direct_event
from imap_processing.hi.utils import HIAPID
from imap_processing.utils import group_by_apid
from imap_processing.utils import packet_file_to_datasets

logger = logging.getLogger(__name__)


def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]:
def hi_l1a(packet_file_path: Union[str, Path], data_version: str) -> list[xr.Dataset]:
"""
Will process IMAP raw data to l1a.
Expand All @@ -30,30 +32,32 @@ def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]:
processed_data : list[xarray.Dataset]
List of processed xarray dataset.
"""
unpacked_data = decom_hi.decom_packets(packet_file_path)

# group data by apid
grouped_data = group_by_apid(unpacked_data)
packet_def_file = (
imap_module_directory / "hi/packet_definitions/hi_packet_definition.xml"
)
datasets_by_apid = packet_file_to_datasets(
packet_file=packet_file_path, xtce_packet_definition=packet_def_file
)

# Process science to l1a.
processed_data = []
for apid in grouped_data.keys():
for apid in datasets_by_apid:
if apid == HIAPID.H45_SCI_CNT:
logger.info(
"Processing histogram data for [%s] packets", HIAPID.H45_SCI_CNT.name
)
data = hist_create_dataset(grouped_data[apid])
data = hist_create_dataset(datasets_by_apid[apid])
elif apid == HIAPID.H45_SCI_DE:
logger.info(
"Processing direct event data for [%s] packets", HIAPID.H45_SCI_DE.name
)

data = science_direct_event(grouped_data[apid])
data = science_direct_event(datasets_by_apid[apid])
elif apid == HIAPID.H45_APP_NHK:
logger.info(
"Processing housekeeping data for [%s] packets", HIAPID.H45_APP_NHK.name
)
data = process_housekeeping(grouped_data[apid])
data = process_housekeeping(datasets_by_apid[apid])
else:
raise RuntimeError(f"Encountered unexpected APID [{apid}]")

Expand Down
26 changes: 14 additions & 12 deletions imap_processing/hi/l1a/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import numpy as np
import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import met_to_j2000ns

# define the names of the 24 counter arrays
# contained in the histogram packet
Expand Down Expand Up @@ -35,30 +33,34 @@
TOTAL_COUNTERS = ("total_a", "total_b", "total_c", "fee_de_sent", "fee_de_recd")


def create_dataset(packets: list[Packet]) -> xr.Dataset:
def create_dataset(input_ds: xr.Dataset) -> xr.Dataset:
"""
Create dataset for a number of Hi Histogram packets.

Parameters
----------
packets : list[space_packet_parser.ParsedPacket]
Packet list.
input_ds : xarray.Dataset
Dataset of packets.

Returns
-------
dataset : xarray.Dataset
Dataset with all metadata field data in xr.DataArray.
"""
dataset = allocate_histogram_dataset(len(packets))
dataset = allocate_histogram_dataset(len(input_ds.epoch))

# unpack the packets data into the Dataset
for i_epoch, packet in enumerate(packets):
dataset.epoch.data[i_epoch] = met_to_j2000ns(packet.data["CCSDS_MET"].raw_value)
dataset.ccsds_met[i_epoch] = packet.data["CCSDS_MET"].raw_value
dataset.esa_stepping_num[i_epoch] = packet.data["ESA_STEP"].raw_value
# TODO: Move into the allocate dataset function?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes look fine for now. The allocate_histogram_dataset function was really just my way of avoiding having an intermediate data storage. I wanted to go directly from the packet into the xr.DataSet arrays. With your new function, this is no longer achieving that goal. I will write a ticket to look at this Hi code and address the TODOs.

dataset["epoch"].data[:] = input_ds["epoch"].data
dataset["ccsds_met"].data = input_ds["ccsds_met"].data
dataset["esa_stepping_num"].data = input_ds["esa_step"].data

# unpack the packets data into the Dataset
# (npackets, 24 * 90 * 12)
# TODO: Look into avoiding the for-loops below
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes... Good recommendation!

# It seems like we could try to reshape the arrays and do some numpy
# broadcasting rather than for-loops directly here
for i_epoch, counters_binary_data in enumerate(input_ds["counters"].data):
# unpack 24 arrays of 90 12-bit unsigned integers
counters_binary_data = packet.data["COUNTERS"].raw_value
counter_ints = [
int(counters_binary_data[i * 12 : (i + 1) * 12], 2) for i in range(90 * 24)
]
Expand Down
14 changes: 3 additions & 11 deletions imap_processing/hi/l1a/housekeeping.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
"""Unpack IMAP-Hi housekeeping data."""

import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing.hi.hi_cdf_attrs import (
hi_hk_l1a_attrs,
)
from imap_processing.utils import create_dataset, update_epoch_to_datetime


def process_housekeeping(packets: list[Packet]) -> xr.Dataset:
def process_housekeeping(dataset: xr.Dataset) -> xr.Dataset:
"""
Create dataset for each metadata field.
Parameters
----------
packets : list[space_packet_parser.ParsedPacket]
Packet list.
dataset : xarray.Dataset
Packet input dataset.
Returns
-------
dataset : xarray.Dataset
Dataset with all metadata field data in xr.DataArray.
"""
dataset = create_dataset(
packets=packets, spacecraft_time_key="ccsds_met", skip_keys=["INSTR_SPECIFIC"]
)
# Update epoch to datetime
dataset = update_epoch_to_datetime(dataset)

# Add datalevel attrs
dataset.attrs.update(hi_hk_l1a_attrs.output())
return dataset
13 changes: 6 additions & 7 deletions imap_processing/hi/l1a/science_direct_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import numpy as np
import xarray as xr
from space_packet_parser.parser import Packet

from imap_processing import imap_module_directory
from imap_processing.cdf.cdf_attribute_manager import CdfAttributeManager
Expand Down Expand Up @@ -297,7 +296,7 @@ def create_dataset(de_data_list: list, packet_met_time: list) -> xr.Dataset:
return dataset


def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:
def science_direct_event(packets_data: xr.Dataset) -> xr.Dataset:
"""
Unpack IMAP-Hi direct event data.

Expand All @@ -309,8 +308,8 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:

Parameters
----------
packets_data : list[space_packet_parser.ParsedPacket]
List of packets data.
packets_data : xarray.Dataset
Packets extracted into a dataset.

Returns
-------
Expand All @@ -324,14 +323,14 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset:
# I am using extend to add another list to the
# end of the list. This way, I don't need to flatten
# the list later.
for data in packets_data:
for i, data in enumerate(packets_data["de_tof"].data):
# break binary stream data into unit of 48-bits
event_48bits_list = break_into_bits_size(data.data["DE_TOF"].raw_value)
event_48bits_list = break_into_bits_size(data)
# parse 48-bits into meaningful data such as metaevent or direct event
de_data_list.extend([parse_direct_event(event) for event in event_48bits_list])
# add packet time to packet_met_time
packet_met_time.extend(
[data.data["CCSDS_MET"].raw_value] * len(event_48bits_list)
[packets_data["ccsds_met"].data[i]] * len(event_48bits_list)
)

# create dataset
Expand Down
30 changes: 9 additions & 21 deletions imap_processing/swapi/l1/swapi_l1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.global_attrs import ConstantCoordinates
from imap_processing.cdf.utils import met_to_j2000ns
from imap_processing.decom import decom_packets
from imap_processing.swapi.swapi_cdf_attrs import (
compression_attrs,
counts_attrs,
Expand All @@ -19,12 +17,7 @@
uncertainty_attrs,
)
from imap_processing.swapi.swapi_utils import SWAPIAPID, SWAPIMODE
from imap_processing.utils import (
create_dataset,
group_by_apid,
sort_by_time,
update_epoch_to_datetime,
)
from imap_processing.utils import packet_file_to_datasets


def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray:
Expand Down Expand Up @@ -172,7 +165,7 @@ def find_sweep_starts(packets: xr.Dataset) -> np.ndarray:
#
# [0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0] # And all?

ione = diff == 1
ione = diff == 1e9 # 1 second

valid = (
(packets["seq_number"] == 0)[:-11]
Expand Down Expand Up @@ -474,10 +467,10 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
# ===================================================================

# epoch time. Should be same dimension as number of good sweeps
epoch_time = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0]
epoch_converted_time = met_to_j2000ns(epoch_time)
epoch_values = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0]

epoch_time = xr.DataArray(
epoch_converted_time,
epoch_values,
name="epoch",
dims=["epoch"],
attrs=ConstantCoordinates.EPOCH,
Expand Down Expand Up @@ -532,6 +525,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
)

# L1 quality flags
# TODO: Should these be kept in raw format rather than derived into strings?
tech3371 marked this conversation as resolved.
Show resolved Hide resolved
dataset["swp_pcem_flags"] = xr.DataArray(
np.array(pcem_compression_flags, dtype=np.uint8),
dims=["epoch", "energy"],
Expand Down Expand Up @@ -626,25 +620,19 @@ def swapi_l1(file_path: str, data_version: str) -> xr.Dataset:
xtce_definition = (
f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml"
)
packets = decom_packets(
packet_file=file_path, xtce_packet_definition=xtce_definition
datasets = packet_file_to_datasets(
file_path, xtce_definition, use_derived_value=False
)
grouped_packets = group_by_apid(packets)
processed_data = []
for apid in grouped_packets.keys():
sorted_packets = sort_by_time(grouped_packets[apid], "SHCOARSE")
for apid, ds_data in datasets.items():
# Right now, we only process SWP_HK and SWP_SCI
# other packets are not process in this processing pipeline
# If appId is science, then the file should contain all data of science appId
ds_data = create_dataset(sorted_packets, include_header=False)

if apid == SWAPIAPID.SWP_SCI.value:
data = process_swapi_science(ds_data, data_version)
processed_data.append(data)
if apid == SWAPIAPID.SWP_HK.value:
# convert epoch to datetime
ds_data = update_epoch_to_datetime(ds_data)

# Add datalevel attrs
ds_data.attrs.update(swapi_l1_hk_attrs.output())
ds_data.attrs["Data_version"] = data_version
Expand Down
2 changes: 1 addition & 1 deletion imap_processing/tests/hi/test_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_app_nhk_decom():
# TODO: compare with validation data once we have it

# Write CDF
cem_raw_cdf_filepath = write_cdf(processed_data[0])
cem_raw_cdf_filepath = write_cdf(processed_data[0], istp=False)

# TODO: ask Vivek about this date mismatch between the file name
# and the data. May get resolved when we have good sample data.
Expand Down
Loading
Loading