Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to MAG L1B to scale based on compression flags #1117

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 73 additions & 5 deletions imap_processing/mag/l1b/mag_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset:
# TODO: There is a time alignment step that will add a lot of complexity.
# This needs to be done once we have some SPICE time data.

dims = [["direction"]]
new_dims = [["direction"]]
dims = [["direction"], ["compression"]]
new_dims = [["direction"], ["compression"]]
# TODO: This should definitely be loaded from AWS
calibration_dataset = load_cdf(
Path(__file__).parent / "imap_calibration_mag_20240229_v01.cdf"
Expand All @@ -78,8 +78,9 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset:
calibration_matrix = calibration_dataset["MFITOURFI"]

l1b_fields = xr.apply_ufunc(
calibrate,
update_vector,
input_dataset["vectors"],
input_dataset["compression_flags"],
input_core_dims=dims,
output_core_dims=new_dims,
vectorize=True,
Expand All @@ -88,13 +89,80 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset:
)

output_dataset = input_dataset.copy()
output_dataset["vectors"] = l1b_fields
output_dataset["vectors"].data = l1b_fields[0].data

# TODO add/update attributes
return output_dataset


def calibrate(
def update_vector(
input_vector: np.ndarray,
input_compression: np.ndarray,
calibration_matrix: xr.DataArray,
) -> tuple[np.ndarray, np.ndarray]:
"""
Apply calibration and compression scaling to vector.

This calls, in sequence, calibrate_vector and rescale_vector to apply L1B processing
to the input vector.

Parameters
----------
input_vector : numpy.ndarray
One input vector to update, looking like (x, y, z, range).
input_compression : numpy.ndarray
Compression flags corresponding to the vector, looking like (is_compressed,
compression_width).
calibration_matrix : xr.DataArray
DataArray containing the full set of calibration matrices, for each range.
Size is ((3, 3, 4)).

Returns
-------
tuple[numpy.ndarray, numpy.ndarray]
Updated vector and the same compression flags.
"""
vector = calibrate_vector(input_vector, calibration_matrix)
return rescale_vector(vector, input_compression), input_compression


def rescale_vector(
input_vector: np.ndarray, compression_flags: np.ndarray
) -> np.ndarray:
"""
Rescale vector based on compression flags.

If the first value of compression_flags is zero, this just returns the input_vector
unchanged. Otherwise, the vector is scaled using the compression width, which is
the second part of compression_flags.

The vector is scaled using the following equation:
M = 2 ^ (16-width)
output_vector = input_vector * M

Therefore, for a 16 bit width, the same vector is returned.

Parameters
----------
input_vector : numpy.ndarray
One input vector to update, looking like (x, y, z, range).
compression_flags : numpy.ndarray
Compression flags corresponding to the vector, looking like (is_compressed,
compression_width).

Returns
-------
output_vector : numpy.ndarray
Updated vector.
"""
if not compression_flags[0]:
return input_vector
else:
factor = float(2 ** (16 - compression_flags[1]))
return input_vector * factor # type: ignore


def calibrate_vector(
input_vector: np.ndarray, calibration_matrix: xr.DataArray = None
) -> np.ndarray:
"""
Expand Down
42 changes: 41 additions & 1 deletion imap_processing/tests/mag/test_mag_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@
def mag_l1a_dataset():
epoch = xr.DataArray(np.arange(20), name="epoch", dims=["epoch"])
direction = xr.DataArray(np.arange(4), name="direction", dims=["direction"])
compression = xr.DataArray(np.arange(2), name="compression", dims=["compression"])
vectors = xr.DataArray(
np.zeros((20, 4)),
dims=["epoch", "direction"],
coords={"epoch": epoch, "direction": direction},
)
compression_flags = xr.DataArray(np.zeros((20, 2)), dims=["epoch", "compression"])

vectors[0, :] = np.array([1, 1, 1, 0])

output_dataset = xr.Dataset(
coords={"epoch": epoch, "direction": direction},
coords={"epoch": epoch, "direction": direction, "compression": compression},
)
output_dataset["vectors"] = vectors
output_dataset["compression_flags"] = compression_flags

return output_dataset

Expand Down Expand Up @@ -76,3 +79,40 @@ def test_cdf_output():
output_path = write_cdf(l1b_dataset)

assert Path.exists(output_path)


def test_mag_compression_scale(mag_l1a_dataset):
test_calibration = np.array(
[
[2.2972202, 0.0, 0.0],
[0.00348625, 2.23802879, 0.0],
[-0.00250788, -0.00888437, 2.24950008],
]
)
mag_l1a_dataset["vectors"][0, :] = np.array([1, 1, 1, 0])
mag_l1a_dataset["vectors"][1, :] = np.array([1, 1, 1, 0])
mag_l1a_dataset["vectors"][2, :] = np.array([1, 1, 1, 0])
mag_l1a_dataset["vectors"][3, :] = np.array([1, 1, 1, 0])

mag_l1a_dataset["compression_flags"][0, :] = np.array([1, 16])
mag_l1a_dataset["compression_flags"][1, :] = np.array([0, 0])
mag_l1a_dataset["compression_flags"][2, :] = np.array([1, 18])
mag_l1a_dataset["compression_flags"][3, :] = np.array([1, 14])

mag_l1a_dataset.attrs["Logical_source"] = ["imap_mag_l1a_norm-mago"]
output = mag_l1b(mag_l1a_dataset, "v001")

calibrated_vectors = np.matmul(np.array([1, 1, 1]), test_calibration)
# 16 bit width is the standard
assert np.allclose(output["vectors"].data[0][:3], calibrated_vectors)
# uncompressed data is uncorrected
assert np.allclose(output["vectors"].data[1][:3], calibrated_vectors)

# width of 18 should be multiplied by 1/4
scaled_vectors = calibrated_vectors * 1 / 4
# should be corrected
assert np.allclose(output["vectors"].data[2][:3], scaled_vectors)

# width of 14 should be multiplied by 4
scaled_vectors = calibrated_vectors * 4
assert np.allclose(output["vectors"].data[3][:3], scaled_vectors)
Loading