Skip to content

Commit

Permalink
ENH/MNT: Add a common initial dataset creation routine
Browse files Browse the repository at this point in the history
This adds a common interface for returning xarray datasets from
a packet file. There is one dataset per apid, sorted by time, with
derived values automatically converted.
  • Loading branch information
greglucas committed Jul 8, 2024
1 parent 2488862 commit 19df700
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 61 deletions.
7 changes: 5 additions & 2 deletions imap_processing/cdf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def load_cdf(
return dataset


def write_cdf(dataset: xr.Dataset) -> xr.Dataset:
def write_cdf(dataset: xr.Dataset, **extra_cdf_kwargs: dict) -> Path:
"""
Write the contents of "data" to a CDF file using cdflib.xarray_to_cdf.
Expand All @@ -118,6 +118,8 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset:
----------
dataset : xarray.Dataset
The dataset object to convert to a CDF.
**extra_cdf_kwargs : dict
Additional keyword arguments to pass to the ``xarray_to_cdf`` function.
Returns
-------
Expand Down Expand Up @@ -149,7 +151,7 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset:
version=version,
repointing=repointing,
)
file_path = science_file.construct_path()
file_path: Path = science_file.construct_path()
if not file_path.parent.exists():
logger.info(
"The directory does not exist, creating directory %s", file_path.parent
Expand All @@ -166,6 +168,7 @@ def write_cdf(dataset: xr.Dataset) -> xr.Dataset:
dataset,
str(file_path),
terminate_on_warning=True,
**extra_cdf_kwargs,
) # Terminate if not ISTP compliant

return file_path
2 changes: 1 addition & 1 deletion imap_processing/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def download_dependencies(self) -> list:
)
return file_list

def upload_products(self, products: list[list[Path]]) -> None:
def upload_products(self, products: list[Path]) -> None:
"""
Upload data products to the IMAP SDC.
Expand Down
35 changes: 11 additions & 24 deletions imap_processing/swapi/l1/swapi_l1.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.global_attrs import ConstantCoordinates
from imap_processing.cdf.utils import met_to_j2000ns
from imap_processing.decom import decom_packets
from imap_processing.swapi.swapi_cdf_attrs import (
compression_attrs,
counts_attrs,
Expand All @@ -19,12 +17,7 @@
uncertainty_attrs,
)
from imap_processing.swapi.swapi_utils import SWAPIAPID, SWAPIMODE
from imap_processing.utils import (
create_dataset,
group_by_apid,
sort_by_time,
update_epoch_to_datetime,
)
from imap_processing.utils import packet_file_to_datasets


def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray:
Expand Down Expand Up @@ -60,7 +53,7 @@ def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray:
# TODO: change comparison to SWAPIMODE.HVSCI once we have
# some HVSCI data
# MODE should be HVSCI
mode_indices = (mode == SWAPIMODE.HVENG).all(axis=1)
mode_indices = (mode == SWAPIMODE.HVENG.value).all(axis=1)
bad_data_indices = sweep_indices & plan_id_indices & mode_indices

# TODO: add checks for checksum
Expand Down Expand Up @@ -172,7 +165,7 @@ def find_sweep_starts(packets: xr.Dataset) -> np.ndarray:
#
# [0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0] # And all?

ione = diff == 1
ione = diff == 1e9 # 1 second

valid = (
(packets["seq_number"] == 0)[:-11]
Expand Down Expand Up @@ -475,7 +468,8 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data

# epoch time. Should be same dimension as number of good sweeps
epoch_time = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0]
epoch_converted_time = met_to_j2000ns(epoch_time)
# epoch_converted_time = met_to_j2000ns(epoch_time)
epoch_converted_time = epoch_time
epoch_time = xr.DataArray(
epoch_converted_time,
name="epoch",
Expand Down Expand Up @@ -532,8 +526,9 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
)

# L1 quality flags
# TODO: Should these be kept in raw format rather than derived into strings?
dataset["swp_pcem_flags"] = xr.DataArray(
np.array(pcem_compression_flags, dtype=np.uint8),
np.array(pcem_compression_flags == "RAW", dtype=np.uint8),
dims=["epoch", "energy"],
attrs=dataclasses.replace(
compression_attrs,
Expand All @@ -543,7 +538,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
).output(),
)
dataset["swp_scem_flags"] = xr.DataArray(
np.array(scem_compression_flags, dtype=np.uint8),
np.array(scem_compression_flags == "RAW", dtype=np.uint8),
dims=["epoch", "energy"],
attrs=dataclasses.replace(
compression_attrs,
Expand All @@ -553,7 +548,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data
).output(),
)
dataset["swp_coin_flags"] = xr.DataArray(
np.array(coin_compression_flags, dtype=np.uint8),
np.array(coin_compression_flags == "RAW", dtype=np.uint8),
dims=["epoch", "energy"],
attrs=dataclasses.replace(
compression_attrs,
Expand Down Expand Up @@ -626,25 +621,17 @@ def swapi_l1(file_path: str, data_version: str) -> xr.Dataset:
xtce_definition = (
f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml"
)
packets = decom_packets(
packet_file=file_path, xtce_packet_definition=xtce_definition
)
grouped_packets = group_by_apid(packets)
datasets = packet_file_to_datasets(file_path, xtce_definition)
processed_data = []
for apid in grouped_packets.keys():
sorted_packets = sort_by_time(grouped_packets[apid], "SHCOARSE")
for apid, ds_data in datasets.items():
# Right now, we only process SWP_HK and SWP_SCI
# other packets are not process in this processing pipeline
# If appId is science, then the file should contain all data of science appId
ds_data = create_dataset(sorted_packets, include_header=False)

if apid == SWAPIAPID.SWP_SCI.value:
data = process_swapi_science(ds_data, data_version)
processed_data.append(data)
if apid == SWAPIAPID.SWP_HK.value:
# convert epoch to datetime
ds_data = update_epoch_to_datetime(ds_data)

# Add datalevel attrs
ds_data.attrs.update(swapi_l1_hk_attrs.output())
ds_data.attrs["Data_version"] = data_version
Expand Down
12 changes: 6 additions & 6 deletions imap_processing/swapi/swapi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
other SWAPI processing modules.
"""

from enum import IntEnum
from enum import Enum, IntEnum


class SWAPIAPID(IntEnum):
Expand All @@ -16,10 +16,10 @@ class SWAPIAPID(IntEnum):
SWP_AUT = 1192


class SWAPIMODE(IntEnum):
class SWAPIMODE(Enum):
"""Create ENUM for MODE."""

LVENG = 0
LVSCI = 1
HVENG = 2
HVSCI = 3
LVENG = "LVENG"
LVSCI = "LVSCI"
HVENG = "HVENG"
HVSCI = "HVSCI"
43 changes: 18 additions & 25 deletions imap_processing/tests/swapi/test_swapi_l1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from imap_processing import imap_module_directory
from imap_processing.cdf.utils import met_to_j2000ns, write_cdf
from imap_processing.decom import decom_packets
from imap_processing.swapi.l1.swapi_l1 import (
SWAPIAPID,
decompress_count,
Expand All @@ -15,21 +14,18 @@
process_sweep_data,
swapi_l1,
)
from imap_processing.utils import create_dataset, group_by_apid, sort_by_time
from imap_processing.utils import packet_file_to_datasets


@pytest.fixture(scope="session")
def decom_test_data():
"""Read test data from file"""
test_folder_path = "tests/swapi/l0_data"
packet_files = list(imap_module_directory.glob(f"{test_folder_path}/*.pkts"))
test_file = "tests/swapi/l0_data/imap_swapi_l0_raw_20231012_v001.pkts"
packet_files = imap_module_directory / test_file
packet_definition = (
f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml"
)
data_list = []
for packet_file in packet_files:
data_list.extend(decom_packets(packet_file, packet_definition))
return data_list
return packet_file_to_datasets(packet_files, packet_definition)


def test_filter_good_data():
Expand All @@ -40,7 +36,7 @@ def test_filter_good_data():
{
"plan_id_science": xr.DataArray(np.full((total_sweeps * 12), 1)),
"sweep_table": xr.DataArray(np.repeat(np.arange(total_sweeps), 12)),
"mode": xr.DataArray(np.full((total_sweeps * 12), 2)),
"mode": xr.DataArray(np.full((total_sweeps * 12), "HVENG")),
},
coords={"epoch": np.arange(total_sweeps * 12)},
)
Expand All @@ -49,17 +45,15 @@ def test_filter_good_data():
bad_data_indices = filter_good_data(ds)
assert len(bad_data_indices) == 36

# Check for bad MODE data.
# This test returns this indices because MODE has 0, 1 values
# for the first two sweeps.
# Check for bad MODE data, only HVENG is "good"
# TODO: update test when we update MODE from HVENG to HVSCI
ds["mode"] = xr.DataArray(np.repeat(np.arange(total_sweeps), 12))
ds["mode"] = xr.DataArray(np.repeat(["LVENG", "LVSCI", "HVENG"], 12))
bad_data_indices = filter_good_data(ds)
np.testing.assert_array_equal(bad_data_indices, np.arange(24, 36))

# Check for bad sweep_table data.
# Reset MODE data and create first sweep to be mixed value
ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), 2))
ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), "HVENG"))
ds["sweep_table"][:12] = np.arange(0, 12)
np.testing.assert_array_equal(filter_good_data(ds), np.arange(12, 36))

Expand Down Expand Up @@ -87,7 +81,9 @@ def test_find_sweep_starts():
"""Test for find sweep starts"""
time = np.arange(26)
sequence_number = time % 12
ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time})
ds = xr.Dataset(
{"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)}
)

start_indices = find_sweep_starts(ds)
np.testing.assert_array_equal(start_indices, [0, 12])
Expand All @@ -107,18 +103,17 @@ def test_get_full_indices():
"""Test for correct full sweep indices"""
time = np.arange(26)
sequence_number = time % 12
ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time})
ds = xr.Dataset(
{"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)}
)

sweep_indices = get_indices_of_full_sweep(ds)
np.testing.assert_array_equal(sweep_indices, np.arange(0, 24))


def test_swapi_algorithm(decom_test_data):
"""Test SWAPI L1 algorithm"""
grouped_data = group_by_apid(decom_test_data)
science_data = grouped_data[SWAPIAPID.SWP_SCI]
sorted_packets = sort_by_time(science_data, "SHCOARSE")
ds_data = create_dataset(sorted_packets, include_header=False)
ds_data = decom_test_data[SWAPIAPID.SWP_SCI]
full_sweep_indices = get_indices_of_full_sweep(ds_data)
full_sweep_sci = ds_data.isel({"epoch": full_sweep_indices})
total_packets = len(full_sweep_sci["seq_number"].data)
Expand Down Expand Up @@ -208,10 +203,7 @@ def test_swapi_algorithm(decom_test_data):

def test_process_swapi_science(decom_test_data):
"""Test process swapi science"""
grouped_data = group_by_apid(decom_test_data)
science_data = grouped_data[SWAPIAPID.SWP_SCI]
sorted_packets = sort_by_time(science_data, "SHCOARSE")
ds_data = create_dataset(sorted_packets, include_header=False)
ds_data = decom_test_data[SWAPIAPID.SWP_SCI]
processed_data = process_swapi_science(ds_data, data_version="001")

# Test dataset dimensions
Expand Down Expand Up @@ -333,5 +325,6 @@ def test_swapi_l1_cdf():

# hk cdf file
cdf_filename = "imap_swapi_l1_hk_20100101_v001.cdf"
cdf_path = write_cdf(processed_data[1])
# Ignore ISTP checks for HK data
cdf_path = write_cdf(processed_data[1], istp=False)
assert cdf_path.name == cdf_filename
Loading

0 comments on commit 19df700

Please sign in to comment.