diff --git a/imap_processing/mag/l1b/mag_l1b.py b/imap_processing/mag/l1b/mag_l1b.py index ea3839e0a..1d0c47d0a 100644 --- a/imap_processing/mag/l1b/mag_l1b.py +++ b/imap_processing/mag/l1b/mag_l1b.py @@ -65,8 +65,8 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset: # TODO: There is a time alignment step that will add a lot of complexity. # This needs to be done once we have some SPICE time data. - dims = [["direction"]] - new_dims = [["direction"]] + dims = [["direction"], ["compression"]] + new_dims = [["direction"], ["compression"]] # TODO: This should definitely be loaded from AWS calibration_dataset = load_cdf( Path(__file__).parent / "imap_calibration_mag_20240229_v01.cdf" @@ -78,8 +78,9 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset: calibration_matrix = calibration_dataset["MFITOURFI"] l1b_fields = xr.apply_ufunc( - calibrate, + update_vector, input_dataset["vectors"], + input_dataset["compression_flags"], input_core_dims=dims, output_core_dims=new_dims, vectorize=True, @@ -88,13 +89,80 @@ def mag_l1b_processing(input_dataset: xr.Dataset) -> xr.Dataset: ) output_dataset = input_dataset.copy() - output_dataset["vectors"] = l1b_fields + output_dataset["vectors"].data = l1b_fields[0].data # TODO add/update attributes return output_dataset -def calibrate( +def update_vector( + input_vector: np.ndarray, + input_compression: np.ndarray, + calibration_matrix: xr.DataArray, +) -> tuple[np.ndarray, np.ndarray]: + """ + Apply calibration and compression scaling to vector. + + This calls, in sequence, calibrate_vector and rescale_vector to apply L1B processing + to the input vector. + + Parameters + ---------- + input_vector : numpy.ndarray + One input vector to update, looking like (x, y, z, range). + input_compression : numpy.ndarray + Compression flags corresponding to the vector, looking like (is_compressed, + compression_width). + calibration_matrix : xr.DataArray + DataArray containing the full set of calibration matrices, for each range. + Size is ((3, 3, 4)). + + Returns + ------- + tuple[numpy.ndarray, numpy.ndarray] + Updated vector and the same compression flags. + """ + vector = calibrate_vector(input_vector, calibration_matrix) + return rescale_vector(vector, input_compression), input_compression + + +def rescale_vector( + input_vector: np.ndarray, compression_flags: np.ndarray +) -> np.ndarray: + """ + Rescale vector based on compression flags. + + If the first value of compression_flags is zero, this just returns the input_vector + unchanged. Otherwise, the vector is scaled using the compression width, which is + the second part of compression_flags. + + The vector is scaled using the following equation: + M = 2 ^ (16-width) + output_vector = input_vector * M + + Therefore, for a 16 bit width, the same vector is returned. + + Parameters + ---------- + input_vector : numpy.ndarray + One input vector to update, looking like (x, y, z, range). + compression_flags : numpy.ndarray + Compression flags corresponding to the vector, looking like (is_compressed, + compression_width). + + Returns + ------- + output_vector : numpy.ndarray + Updated vector. + """ + if not compression_flags[0]: + return input_vector + else: + factor = float(2 ** (16 - compression_flags[1])) + return input_vector * factor # type: ignore + + +def calibrate_vector( input_vector: np.ndarray, calibration_matrix: xr.DataArray = None ) -> np.ndarray: """ diff --git a/imap_processing/tests/mag/test_mag_l1b.py b/imap_processing/tests/mag/test_mag_l1b.py index 9000def9e..f8cd155d3 100644 --- a/imap_processing/tests/mag/test_mag_l1b.py +++ b/imap_processing/tests/mag/test_mag_l1b.py @@ -12,18 +12,21 @@ def mag_l1a_dataset(): epoch = xr.DataArray(np.arange(20), name="epoch", dims=["epoch"]) direction = xr.DataArray(np.arange(4), name="direction", dims=["direction"]) + compression = xr.DataArray(np.arange(2), name="compression", dims=["compression"]) vectors = xr.DataArray( np.zeros((20, 4)), dims=["epoch", "direction"], coords={"epoch": epoch, "direction": direction}, ) + compression_flags = xr.DataArray(np.zeros((20, 2)), dims=["epoch", "compression"]) vectors[0, :] = np.array([1, 1, 1, 0]) output_dataset = xr.Dataset( - coords={"epoch": epoch, "direction": direction}, + coords={"epoch": epoch, "direction": direction, "compression": compression}, ) output_dataset["vectors"] = vectors + output_dataset["compression_flags"] = compression_flags return output_dataset @@ -76,3 +79,40 @@ def test_cdf_output(): output_path = write_cdf(l1b_dataset) assert Path.exists(output_path) + + +def test_mag_compression_scale(mag_l1a_dataset): + test_calibration = np.array( + [ + [2.2972202, 0.0, 0.0], + [0.00348625, 2.23802879, 0.0], + [-0.00250788, -0.00888437, 2.24950008], + ] + ) + mag_l1a_dataset["vectors"][0, :] = np.array([1, 1, 1, 0]) + mag_l1a_dataset["vectors"][1, :] = np.array([1, 1, 1, 0]) + mag_l1a_dataset["vectors"][2, :] = np.array([1, 1, 1, 0]) + mag_l1a_dataset["vectors"][3, :] = np.array([1, 1, 1, 0]) + + mag_l1a_dataset["compression_flags"][0, :] = np.array([1, 16]) + mag_l1a_dataset["compression_flags"][1, :] = np.array([0, 0]) + mag_l1a_dataset["compression_flags"][2, :] = np.array([1, 18]) + mag_l1a_dataset["compression_flags"][3, :] = np.array([1, 14]) + + mag_l1a_dataset.attrs["Logical_source"] = ["imap_mag_l1a_norm-mago"] + output = mag_l1b(mag_l1a_dataset, "v001") + + calibrated_vectors = np.matmul(np.array([1, 1, 1]), test_calibration) + # 16 bit width is the standard + assert np.allclose(output["vectors"].data[0][:3], calibrated_vectors) + # uncompressed data is uncorrected + assert np.allclose(output["vectors"].data[1][:3], calibrated_vectors) + + # width of 18 should be multiplied by 1/4 + scaled_vectors = calibrated_vectors * 1 / 4 + # should be corrected + assert np.allclose(output["vectors"].data[2][:3], scaled_vectors) + + # width of 14 should be multiplied by 4 + scaled_vectors = calibrated_vectors * 4 + assert np.allclose(output["vectors"].data[3][:3], scaled_vectors)