Skip to content

Commit

Permalink
Mag l1a processing (#384)
Browse files Browse the repository at this point in the history
* Updating mag CDF generation to match new filenames

* PR updates

* Updating test to remove path

* removing bytearray from magl0

* First pass at vector processing and tests

* Updating vectors to use proper timestamps

* Finishing MAG L1A processing and tests

* Switching return to dictionaries instead of tuples

* Adding an additional test

* Updating from PR comments

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Updating comments based on PR

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
maxinelasp and pre-commit-ci[bot] authored Apr 8, 2024
1 parent d9b1565 commit acd380d
Show file tree
Hide file tree
Showing 7 changed files with 638 additions and 75 deletions.
67 changes: 14 additions & 53 deletions imap_processing/mag/l0/decom_mag.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
logger = logging.getLogger(__name__)


def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
def decom_packets(packet_file_path: str | Path) -> dict[str, list[MagL0]]:
"""Decom MAG data packets using MAG packet definition.
Parameters
Expand All @@ -31,9 +31,9 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
Returns
-------
data : list[MagL0]
A list of MAG L0 data classes, including both burst and normal packets. (the
packet type is defined in each instance of L0.)
data_dict : dict[str, list[MagL0]]
A dict with 2 keys pointing to lists of MAG L0 data classes. "norm" corresponds
to normal mode packets, "burst" corresponds to burst mode packets.
"""
# Define paths
xtce_document = Path(
Expand All @@ -43,7 +43,8 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
packet_definition = xtcedef.XtcePacketDefinition(xtce_document)
mag_parser = parser.PacketParser(packet_definition)

data_list = []
norm_data = []
burst_data = []

with open(packet_file_path, "rb") as binary_data:
mag_packets = mag_parser.generator(binary_data)
Expand All @@ -57,59 +58,17 @@ def decom_packets(packet_file_path: str | Path) -> list[MagL0]:
else item.raw_value
for item in packet.data.values()
]
data_list.append(MagL0(CcsdsData(packet.header), *values))

return data_list


def export_to_xarray(l0_data: list[MagL0]):
"""Generate xarray files for "raw" MAG CDF files from MagL0 data.
Mag outputs "RAW" CDF files just after decomming. These have the immediately
post-decom data, with raw binary data for the vectors instead of vector values.
Parameters
----------
l0_data: list[MagL0]
A list of MagL0 datapoints
Returns
-------
norm_data : xr.Dataset
xarray dataset for generating burst data CDFs
burst_data : xr.Dataset
xarray dataset for generating burst data CDFs
"""
# TODO split by mago and magi using primary sensor
norm_data = []
burst_data = []
if apid == Mode.NORMAL:
norm_data.append(MagL0(CcsdsData(packet.header), *values))
else:
burst_data.append(MagL0(CcsdsData(packet.header), *values))

for packet in l0_data:
if packet.ccsds_header.PKT_APID == Mode.NORMAL:
norm_data.append(packet)
if packet.ccsds_header.PKT_APID == Mode.BURST:
burst_data.append(packet)

norm_dataset = None
burst_dataset = None

if len(norm_data) > 0:
norm_dataset = generate_dataset(
norm_data, mag_cdf_attrs.mag_l1a_norm_raw_attrs.output()
)
if len(burst_data) > 0:
burst_dataset = generate_dataset(
burst_data, mag_cdf_attrs.mag_l1a_burst_raw_attrs.output()
)

return norm_dataset, burst_dataset
return {"norm": norm_data, "burst": burst_data}


def generate_dataset(l0_data: list[MagL0], dataset_attrs: dict) -> xr.Dataset:
"""
Generate a CDF dataset from the sorted L0 MAG data.
Used to create 2 similar datasets, for norm and burst data.
Generate a CDF dataset from the sorted raw L0 MAG data.
Parameters
----------
Expand All @@ -124,6 +83,8 @@ def generate_dataset(l0_data: list[MagL0], dataset_attrs: dict) -> xr.Dataset:
dataset : xr.Dataset
xarray dataset with proper CDF attributes and shape.
"""
# TODO: Correct CDF attributes from email

vector_data = np.zeros((len(l0_data), len(l0_data[0].VECTORS)))
shcoarse_data = np.zeros(len(l0_data), dtype="datetime64[ns]")

Expand Down
27 changes: 19 additions & 8 deletions imap_processing/mag/l0/mag_l0_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Dataclasses for Level 0 MAG data."""

from __future__ import annotations

from dataclasses import dataclass
from enum import IntEnum

import numpy as np

from imap_processing.ccsds.ccsds_data import CcsdsData


Expand Down Expand Up @@ -66,9 +70,10 @@ class MagL0:
Secondary Coarse Time for first vector, seconds
SEC_FNTM: int
Secondary Fine Time for first vector, subseconds
VECTORS: bin
VECTORS: np.ndarray | str
MAG Science Vectors - divide based on PRI_VECSEC and PUS_SSUBTYPE for vector
counts
counts. There is a post init call to convert a string into a numpy array -
the only place it is a string is in the class initialization.
"""

ccsds_header: CcsdsData
Expand All @@ -90,19 +95,25 @@ class MagL0:
PRI_FNTM: int
SEC_COARSETM: int
SEC_FNTM: int
VECTORS: bytearray
VECTORS: np.ndarray | str

def __post_init__(self):
"""Convert Vectors attribute from string to bytearray if needed.
Also convert encoded "VECSEC" (vectors per second) into proper vectors per
second values
"""
if isinstance(self.VECTORS, str):
# Convert string output from space_packet_parser to bytearray
self.VECTORS = bytearray(
int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big")
)
# Convert string output from space_packet_parser to numpy array of
# big-endian bytes
self.VECTORS = np.frombuffer(
int(self.VECTORS, 2).to_bytes(len(self.VECTORS) // 8, "big"),
dtype=np.dtype(">b"),
)

# Remove buffer from end of vectors. Vector data needs to be in 50 bit chunks,
# and may have an extra byte at the end from CCSDS padding.
if len(self.VECTORS) % 2:
self.VECTORS = self.VECTORS[:-1]

self.PRI_VECSEC = 2**self.PRI_VECSEC
self.SEC_VECSEC = 2**self.SEC_VECSEC
120 changes: 112 additions & 8 deletions imap_processing/mag/l1a/mag_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import logging

from imap_processing.cdf.utils import write_cdf
from imap_processing.mag import mag_cdf_attrs
from imap_processing.mag.l0 import decom_mag
from imap_processing.mag.l0.mag_l0_data import MagL0
from imap_processing.mag.l1a.mag_l1a_data import MagL1a, TimeTuple

logger = logging.getLogger(__name__)

Expand All @@ -17,14 +20,115 @@ def mag_l1a(packet_filepath):
packet_filepath :
Packet files for processing
"""
mag_l0 = decom_mag.decom_packets(packet_filepath)
packets = decom_mag.decom_packets(packet_filepath)

mag_norm, mag_burst = decom_mag.export_to_xarray(mag_l0)
norm_data = packets["norm"]
burst_data = packets["burst"]

if mag_norm is not None:
file = write_cdf(mag_norm)
logger.info(f"Created CDF file at {file}")
if norm_data is not None:
mag_norm_raw = decom_mag.generate_dataset(
norm_data, mag_cdf_attrs.mag_l1a_norm_raw_attrs.output()
)
file = write_cdf(mag_norm_raw)
logger.info(f"Created RAW CDF file at {file}")

if mag_burst is not None:
file = write_cdf(mag_burst)
logger.info(f"Created CDF file at {file}")
if burst_data is not None:
mag_burst_raw = decom_mag.generate_dataset(
burst_data, mag_cdf_attrs.mag_l1a_burst_raw_attrs.output()
)
file = write_cdf(mag_burst_raw)
logger.info(f"Created RAW CDF file at {file}")


def process_packets(mag_l0_list: list[MagL0]) -> dict[str, list[MagL1a]]:
"""
Given a list of MagL0 packets, process them into MagO and MagI L1A data classes.
Parameters
----------
mag_l0_list : list[MagL0]
List of Mag L0 packets to process
Returns
-------
packet_dict: dict[str, list[MagL1a]]
Dictionary containing two keys: "mago" which points to a list of mago MagL1A
objects, and "magi" which points to a list of magi MagL1A objects.
"""
magi = []
mago = []

for mag_l0 in mag_l0_list:
if mag_l0.COMPRESSION:
raise NotImplementedError("Unable to process compressed data")

primary_start_time = TimeTuple(mag_l0.PRI_COARSETM, mag_l0.PRI_FNTM)
secondary_start_time = TimeTuple(mag_l0.SEC_COARSETM, mag_l0.SEC_FNTM)

# seconds of data in this packet is the SUBTYPE plus 1
seconds_per_packet = mag_l0.PUS_SSUBTYPE + 1

# now we know the number of seconds of data in the packet, and the data rates of
# each sensor, we can calculate how much data is in this packet and where the
# byte boundaries are.

# VECSEC is already decoded in mag_l0
total_primary_vectors = seconds_per_packet * mag_l0.PRI_VECSEC
total_secondary_vectors = seconds_per_packet * mag_l0.SEC_VECSEC

primary_vectors, secondary_vectors = MagL1a.process_vector_data(
mag_l0.VECTORS, total_primary_vectors, total_secondary_vectors
)

# Primary sensor is MAGO (most common expected case)
if mag_l0.PRI_SENS == 0:
mago_l1a = MagL1a(
True,
bool(mag_l0.MAGO_ACT),
primary_start_time,
mag_l0.PRI_VECSEC,
total_primary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
primary_vectors,
)

magi_l1a = MagL1a(
False,
bool(mag_l0.MAGI_ACT),
secondary_start_time,
mag_l0.SEC_VECSEC,
total_secondary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
secondary_vectors,
)
# Primary sensor is MAGI
else:
magi_l1a = MagL1a(
False,
bool(mag_l0.MAGI_ACT),
primary_start_time,
mag_l0.PRI_VECSEC,
total_primary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
primary_vectors,
)

mago_l1a = MagL1a(
True,
bool(mag_l0.MAGO_ACT),
secondary_start_time,
mag_l0.SEC_VECSEC,
total_secondary_vectors,
seconds_per_packet,
mag_l0.SHCOARSE,
secondary_vectors,
)

mago.append(mago_l1a)
magi.append(magi_l1a)

return {"mago": mago, "magi": magi}
Loading

0 comments on commit acd380d

Please sign in to comment.