Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mag l1a processing #384

Merged
merged 13 commits into from
Apr 8, 2024
67 changes: 14 additions & 53 deletions imap_processing/mag/l0/decom_mag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
logger = logging.getLogger(__name__)


def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
def decom_packets(packet_file_path: str | Path) -> dict[str, list[MagL0]]:
"""Decom MAG data packets using MAG packet definition.

Parameters
Expand All @@ -31,9 +31,9 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:

Returns
-------
data : list[MagL0]
A list of MAG L0 data classes, including both burst and normal packets. (the
packet type is defined in each instance of L0.)
data_dict : dict[str, list[MagL0]]
A dict with 2 keys pointing to lists of MAG L0 data classes. "norm" corresponds
to normal mode packets, "burst" corresponds to burst mode packets.
"""
# Define paths
xtce_document = Path(
Expand All @@ -43,7 +43,8 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
packet_definition = xtcedef.XtcePacketDefinition(xtce_document)
mag_parser = parser.PacketParser(packet_definition)

data_list = []
norm_data = []
burst_data = []

with open(packet_file_path, "rb") as binary_data:
mag_packets = mag_parser.generator(binary_data)
Expand All @@ -57,59 +58,17 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
else item.raw_value
for item in packet.data.values()
]
data_list.append(MagL0(CcsdsData(packet.header), *values))

return data_list


def export_to_xarray(l0_data: list[MagL0]):
"""Generate xarray files for "raw" MAG CDF files from MagL0 data.

Mag outputs "RAW" CDF files just after decomming. These have the immediately
post-decom data, with raw binary data for the vectors instead of vector values.

Parameters
----------
l0_data: list[MagL0]
A list of MagL0 datapoints

Returns
-------
norm_data : xr.Dataset
xarray dataset for generating burst data CDFs
burst_data : xr.Dataset
xarray dataset for generating burst data CDFs
"""
# TODO split by mago and magi using primary sensor
norm_data = []
burst_data = []
if apid == Mode.NORMAL:
norm_data.append(MagL0(CcsdsData(packet.header), *values))
else:
burst_data.append(MagL0(CcsdsData(packet.header), *values))

for packet in l0_data:
if packet.ccsds_header.PKT_APID == Mode.NORMAL:
norm_data.append(packet)
if packet.ccsds_header.PKT_APID == Mode.BURST:
burst_data.append(packet)

norm_dataset = None
burst_dataset = None

if len(norm_data) > 0:
norm_dataset = generate_dataset(
norm_data, mag_cdf_attrs.mag_l1a_norm_raw_attrs.output()
)
if len(burst_data) > 0:
burst_dataset = generate_dataset(
burst_data, mag_cdf_attrs.mag_l1a_burst_raw_attrs.output()
)

return norm_dataset, burst_dataset
return {"norm": norm_data, "burst": burst_data}


def generate_dataset(l0_data: list[MagL0], dataset_attrs: dict) -> xr.Dataset:
"""
Generate a CDF dataset from the sorted L0 MAG data.

Used to create 2 similar datasets, for norm and burst data.
Generate a CDF dataset from the sorted raw L0 MAG data.

Parameters
----------
Expand All @@ -124,6 +83,8 @@ def generate_dataset(l0_data: list[MagL0], dataset_attrs: dict) -> xr.Dataset:
dataset : xr.Dataset
xarray dataset with proper CDF attributes and shape.
"""
# TODO: Correct CDF attributes from email

vector_data = np.zeros((len(l0_data), len(l0_data[0].VECTORS)))
shcoarse_data = np.zeros(len(l0_data), dtype="datetime64[ns]")

Expand Down
21 changes: 15 additions & 6 deletions imap_processing/mag/l0/mag_l0_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Dataclasses for Level 0 MAG data."""

from __future__ import annotations

from dataclasses import dataclass
from enum import IntEnum

import numpy as np

from imap_processing.ccsds.ccsds_data import CcsdsData


Expand Down Expand Up @@ -90,19 +94,24 @@ class MagL0:
PRI_FNTM: int
SEC_COARSETM: int
SEC_FNTM: int
VECTORS: bytearray
VECTORS: np.ndarray | str
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self):
"""Convert Vectors attribute from string to bytearray if needed.

Also convert encoded "VECSEC" (vectors per second) into proper vectors per
second values
"""
if isinstance(self.VECTORS, str):
# Convert string output from space_packet_parser to bytearray
self.VECTORS = bytearray(
int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big")
)
# Convert string output from space_packet_parser to numpy array of
# big-endian bytes
self.VECTORS = np.frombuffer(
int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big"),
dtype=np.dtype(">b"),
)

# Remove buffer from end of vectors
if len(self.VECTORS) % 2:
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
self.VECTORS = self.VECTORS[:-1]

self.PRI_VECSEC = 2**self.PRI_VECSEC
self.SEC_VECSEC = 2**self.SEC_VECSEC
120 changes: 112 additions & 8 deletions imap_processing/mag/l1a/mag_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import logging

from imap_processing.cdf.utils import write_cdf
from imap_processing.mag import mag_cdf_attrs
from imap_processing.mag.l0 import decom_mag
from imap_processing.mag.l0.mag_l0_data import MagL0
from imap_processing.mag.l1a.mag_l1a_data import MagL1a, TimeTuple

logger = logging.getLogger(__name__)

Expand All @@ -17,14 +20,115 @@ def mag_l1a(packet_filepath):
packet_filepath :
Packet files for processing
"""
mag_l0 = decom_mag.decom_packets(packet_filepath)
packets = decom_mag.decom_packets(packet_filepath)

mag_norm, mag_burst = decom_mag.export_to_xarray(mag_l0)
norm_data = packets["norm"]
burst_data = packets["burst"]

if mag_norm is not None:
file = write_cdf(mag_norm)
logger.info(f"Created CDF file at {file}")
if norm_data is not None:
mag_norm_raw = decom_mag.generate_dataset(
norm_data, mag_cdf_attrs.mag_l1a_norm_raw_attrs.output()
)
file = write_cdf(mag_norm_raw)
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Created RAW CDF file at {file}")

if mag_burst is not None:
file = write_cdf(mag_burst)
logger.info(f"Created CDF file at {file}")
if burst_data is not None:
mag_burst_raw = decom_mag.generate_dataset(
burst_data, mag_cdf_attrs.mag_l1a_burst_raw_attrs.output()
)
file = write_cdf(mag_burst_raw)
logger.info(f"Created RAW CDF file at {file}")


def process_packets(mag_l0_list: list[MagL0]) -> dict[str, list[MagL1a]]:
"""
Given a list of MagL0 packets, process them into MagO and MagI L1A data classes.

Parameters
----------
mag_l0_list : list[MagL0]
List of Mag L0 packets to process

Returns
-------
packet_dict: dict[str, list[MagL1a]]
Dictionary containing two keys: "mago" which points to a list of mago MagL1A
objects, and "magi" which points to a list of magi MagL1A objects.

"""
magi = []
mago = []

for mag_l0 in mag_l0_list:
if mag_l0.COMPRESSION:
raise NotImplementedError("Unable to process compressed data")

primary_start_time = TimeTuple(mag_l0.PRI_COARSETM, mag_l0.PRI_FNTM)
secondary_start_time = TimeTuple(mag_l0.SEC_COARSETM, mag_l0.SEC_FNTM)

# seconds of data in this packet is the SUBTYPE plus 1
seconds_per_packet = mag_l0.PUS_SSUBTYPE + 1

# now we know the number of secs of data in the packet, and the data rates of
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
# each sensor, we can calculate how much data is in this packet and where the
# byte boundaries are.

# VECSEC is already decoded in mag_l0
total_primary_vectors = seconds_per_packet * mag_l0.PRI_VECSEC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love these descriptive variable names! Makes it easy to follow

total_secondary_vectors = seconds_per_packet * mag_l0.SEC_VECSEC

primary_vectors, secondary_vectors = MagL1a.process_vector_data(
mag_l0.VECTORS, total_primary_vectors, total_secondary_vectors
)

# Primary sensor is MAGO (most common expected case)
if mag_l0.PRI_SENS == 0:
maxinelasp marked this conversation as resolved.
Show resolved Hide resolved
mago_l1a = MagL1a(
True,
bool(mag_l0.MAGO_ACT),
primary_start_time,
mag_l0.PRI_VECSEC,
total_primary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
primary_vectors,
)

magi_l1a = MagL1a(
False,
bool(mag_l0.MAGI_ACT),
secondary_start_time,
mag_l0.SEC_VECSEC,
total_secondary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
secondary_vectors,
)
# Primary sensor is MAGI
else:
magi_l1a = MagL1a(
False,
bool(mag_l0.MAGI_ACT),
primary_start_time,
mag_l0.PRI_VECSEC,
total_primary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
primary_vectors,
)

mago_l1a = MagL1a(
True,
bool(mag_l0.MAGO_ACT),
secondary_start_time,
mag_l0.SEC_VECSEC,
total_secondary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
secondary_vectors,
)

mago.append(mago_l1a)
magi.append(magi_l1a)

return {"mago": mago, "magi": magi}
Loading
Loading