From 8f3ba8bec66c7e6cdc2ba4f57a8568820fdf39ea Mon Sep 17 00:00:00 2001 From: Dr Matthieu Wes <69257591+mwest007@users.noreply.github.com> Date: Mon, 12 Aug 2024 18:28:42 -0600 Subject: [PATCH 1/8] Ruff passes --- punchbowl/exceptions.py | 9 ++++-- punchbowl/level1/stray_light.py | 47 ++++++++++++++++------------- punchbowl/level1/vignette.py | 52 ++++++++++++++++++--------------- 3 files changed, 60 insertions(+), 48 deletions(-) diff --git a/punchbowl/exceptions.py b/punchbowl/exceptions.py index 8ef0ac01..c6136041 100644 --- a/punchbowl/exceptions.py +++ b/punchbowl/exceptions.py @@ -4,15 +4,12 @@ class PUNCHBowlError(Exception): class InvalidDataError(PUNCHBowlError): """Invalid data error.""" - class InvalidHeaderError(PUNCHBowlError): """Header is not properly formatted.""" - class MissingMetadataError(PUNCHBowlError): """Metadata missing for processing.""" - class PUNCHBowlWarning(Warning): """Base class for warnings in punchbowl.""" @@ -24,3 +21,9 @@ class NoCalibrationDataWarning(PUNCHBowlWarning): class ExtraMetadataWarning(PUNCHBowlWarning): """Extra metadata found but ignored.""" + +class IncorrectPolarizationState(PUNCHBowlWarning): + """Mismatched polarization state detected but ignored.""" + +class IncorrectTelescope(PUNCHBowlWarning): + """Mismatched telescope detected but ignored.""" diff --git a/punchbowl/level1/stray_light.py b/punchbowl/level1/stray_light.py index 4c7832e5..3b7aa792 100644 --- a/punchbowl/level1/stray_light.py +++ b/punchbowl/level1/stray_light.py @@ -7,30 +7,30 @@ from punchbowl.data import load_ndcube_from_fits from punchbowl.exceptions import InvalidDataError -from punchbowl.exceptions import NoCalibrationDataWarning from punchbowl.exceptions import LargeTimeDeltaWarning - +from punchbowl.exceptions import IncorrectPolarizationState +from punchbowl.exceptions import IncorrectTelescope @task -def remove_stray_light_task(data_object: NDCube, stray_light_path: pathlib) -> NDCube: +def remove_stray_light_task(data_object: NDCube, stray_light_file: pathlib) -> NDCube: """ Prefect task to remove stray light from an image. - Stray light is light in an optical system which was not intended in the + Stray light is light in an optical system which was not intended in the design. The PUNCH instrument stray light will be mapped periodically as part of the - ongoing in-flight calibration effort. The stray light maps will be + ongoing in-flight calibration effort. The stray light maps will be generated directly from the L0 and L1 science data. Separating instrumental - stray light from the F-corona. This has been demonstrated with SOHO/LASCO - and with STEREO/COR2 observations. It requires an instrumental roll to hold - the stray light pattern fixed while the F-corona rotates in the field of + stray light from the F-corona. This has been demonstrated with SOHO/LASCO + and with STEREO/COR2 observations. It requires an instrumental roll to hold + the stray light pattern fixed while the F-corona rotates in the field of view. PUNCH orbital rolls will be used to create similar effects. Uncertainty across the image plane is calculated using a known stray light model and the difference between the calculated stray light and the ground - truth. The uncertainty is convolved with the input uncertainty layer to + truth. The uncertainty is convolved with the input uncertainty layer to produce the output uncertainty layer. @@ -38,8 +38,8 @@ def remove_stray_light_task(data_object: NDCube, stray_light_path: pathlib) -> N ---------- data_object : PUNCHData data to operate on - - stray_light_path: pathlib + + stray_light_file: pathlib path to stray light model to apply to data Returns @@ -51,25 +51,30 @@ def remove_stray_light_task(data_object: NDCube, stray_light_path: pathlib) -> N logger = get_run_logger() logger.info("remove_stray_light started") - if stray_light_path is None: + if stray_light_file is None: data_object.meta.history.add_now("LEVEL1-remove_stray_light", "Stray light correction skipped") - elif not stray_light_path.exists(): - msg = f"File {stray_light_path} does not exist." + + + elif not stray_light_file.exists(): + msg = f"File {stray_light_file} does not exist." raise InvalidDataError(msg) else: - stray_light_model = load_ndcube_from_fits(stray_light_path) + stray_light_model = load_ndcube_from_fits(stray_light_file) - stray_light_function_date = Time(stray_light_function.meta["DATE-OBS"].value) + stray_light_model_date = Time(stray_light_model.meta["DATE-OBS"].value) observation_date = Time(data_object.meta["DATE-OBS"].value) - if abs((stray_light_function_date - observation_date).to('day').value) > 14: - warnings.warn(f"Calibration file {stray_light_file} contains data created greater than 2 weeks from the obsveration", LargeTimeDeltaWarning) + if abs((stray_light_model_date - observation_date).to("day").value) > 14: + msg=f"Calibration file {stray_light_file} contains data created greater than 2 weeks from the obsveration" + warnings.warn(msg,LargeTimeDeltaWarning) if stray_light_model.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value: - warnings.warn(f"Incorrect TELESCOP value within {stray_light_path}", UserWarning) + msg=f"Incorrect TELESCOP value within {stray_light_file}" + warnings.warn(msg, IncorrectTelescope) elif stray_light_model.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value: - warnings.warn(f"Incorrect polarization state within {stray_light_path}", UserWarning) + msg=f"Incorrect polarization state within {stray_light_file}" + warnings.warn(msg, IncorrectPolarizationState) elif stray_light_model.data.shape != data_object.data.shape: - msg = f"Incorrect stray light function shape within {stray_light_path}" + msg = f"Incorrect stray light function shape within {stray_light_file}" raise InvalidDataError(msg) else: data_object.data[:, :] -= stray_light_model.data[:, :] diff --git a/punchbowl/level1/vignette.py b/punchbowl/level1/vignette.py index b566b1c0..eac7f000 100644 --- a/punchbowl/level1/vignette.py +++ b/punchbowl/level1/vignette.py @@ -9,8 +9,8 @@ from punchbowl.exceptions import InvalidDataError from punchbowl.exceptions import NoCalibrationDataWarning from punchbowl.exceptions import LargeTimeDeltaWarning - - +from punchbowl.exceptions import IncorrectPolarizationState +from punchbowl.exceptions import IncorrectTelescope @task def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> NDCube: @@ -18,26 +18,26 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND Prefect task to correct the vignetting of an image. Vignetting is a reduction of an image's brightness or saturation toward the - periphery compared to the image center, created by the optical path. The - Vignetting Module will transform the data through a flat-field correction - map, to cancel out the effects of optical vignetting created by distortions - in the optical path. This module also corrects detector gain variation and + periphery compared to the image center, created by the optical path. The + Vignetting Module will transform the data through a flat-field correction + map, to cancel out the effects of optical vignetting created by distortions + in the optical path. This module also corrects detector gain variation and offset. - Correction maps will be 2048×2048 arrays, to match the input data, and + Correction maps will be 2048*2048 arrays, to match the input data, and built using the starfield brightness pattern. Mathematical Operation: - - I'_{i,j} = I_i,j ⨉ FF_{i,j} - + + I'_{i,j} = I_i,j * FF_{i,j} + Where I_{i,j} is the number of counts in pixel i, j. I'_{i,j} refers to the - modified value. FF_{i,j} is the small-scale flat field factor for pixel i, - j. The correction mapping will take into account the orientation of the + modified value. FF_{i,j} is the small-scale flat field factor for pixel i, + j. The correction mapping will take into account the orientation of the spacecraft and its position in the orbit. - Uncertainty across the image plane is calculated using the modelled - flat-field correction with stim lamp calibration data. Deviations from the - known flat-field are used to calculate the uncertainty in a given pixel. - The uncertainty is convolved with the input uncertainty layer to produce + Uncertainty across the image plane is calculated using the modelled + flat-field correction with stim lamp calibration data. Deviations from the + known flat-field are used to calculate the uncertainty in a given pixel. + The uncertainty is convolved with the input uncertainty layer to produce the output uncertainty layer. @@ -45,7 +45,7 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND ---------- data_object : PUNCHData data on which to operate - + vignetting_file : pathlib path to vignetting function to apply to input data @@ -60,8 +60,9 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND if vignetting_file is None: data_object.meta.history.add_now("LEVEL1-correct_vignetting", "Vignetting skipped") - warnings.warn(f"Calibration file {vignetting_file} is unavailable, vignetting correction not applied", NoCalibrationDataWarning) - + msg=f"Calibration file {vignetting_file} is unavailable, vignetting correction not applied" + warnings.warn(msg, NoCalibrationDataWarning) + elif not vignetting_file.exists(): msg = f"File {vignetting_file} does not exist." raise InvalidDataError(msg) @@ -69,15 +70,18 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND vignetting_function = load_ndcube_from_fits(vignetting_file) vignetting_function_date = Time(vignetting_function.meta["DATE-OBS"].value) observation_date = Time(data_object.meta["DATE-OBS"].value) - if abs((vignetting_function_date - observation_date).to('day').value) > 14: - warnings.warn(f"Calibration file {vignetting_file} contains data created greater than 2 weeks from the obsveration", LargeTimeDeltaWarning) + if abs((vignetting_function_date - observation_date).to("day").value) > 14: + msg=f"Calibration file {vignetting_file} contains data created greater than 2 weeks from the obsveration" + warnings.warn(msg, LargeTimeDeltaWarning) if vignetting_function.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value: - warnings.warn(f"Incorrect TELESCOP value within {vignetting_file}", UserWarning) + msg=f"Incorrect TELESCOP value within {vignetting_file}" + warnings.warn(msg, IncorrectTelescope) elif vignetting_function.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value: - warnings.warn(f"Incorrect polarization state within {vignetting_file}", UserWarning) + msg=f"Incorrect polarization state within {vignetting_file}" + warnings.warn(msg, IncorrectPolarizationState) elif vignetting_function.data.shape != data_object.data.shape: - msg = f"Incorrect vignetting function shape within {vignetting_file}" + msg=f"Incorrect vignetting function shape within {vignetting_file}" raise InvalidDataError(msg) else: data_object.data[:, :] /= vignetting_function.data[:, :] From 93718ca95201ad509b876f5467933fbd4426db2e Mon Sep 17 00:00:00 2001 From: Dr Matthieu Wes <69257591+mwest007@users.noreply.github.com> Date: Tue, 13 Aug 2024 13:44:29 -0600 Subject: [PATCH 2/8] cleaned up vignetting test --- punchbowl/level1/tests/test_vignette.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/punchbowl/level1/tests/test_vignette.py b/punchbowl/level1/tests/test_vignette.py index 4f1db583..8282a470 100644 --- a/punchbowl/level1/tests/test_vignette.py +++ b/punchbowl/level1/tests/test_vignette.py @@ -10,7 +10,8 @@ from punchbowl.exceptions import InvalidDataError from punchbowl.level1.vignette import correct_vignetting_task from punchbowl.exceptions import LargeTimeDeltaWarning - +from punchbowl.exceptions import IncorrectPolarizationState +from punchbowl.exceptions import IncorrectTelescope THIS_DIRECTORY = pathlib.Path(__file__).parent.resolve() @@ -70,7 +71,7 @@ def test_invalid_polarization_state(sample_ndcube) -> None: vignetting_filename = THIS_DIRECTORY / "data" / "PUNCH_L1_GR1_20240222163425.fits" with disable_run_logger(): - with pytest.warns(UserWarning): + with pytest.warns(IncorrectPolarizationState): corrected_punchdata = correct_vignetting_task.fn(sample_data, vignetting_filename) assert isinstance(corrected_punchdata, NDCube) @@ -86,7 +87,7 @@ def test_invalid_telescope(sample_ndcube) -> None: vignetting_filename = THIS_DIRECTORY / "data" / "PUNCH_L1_GR1_20240222163425.fits" with disable_run_logger(): - with pytest.warns(UserWarning): + with pytest.warns(IncorrectTelescope): corrected_punchdata = correct_vignetting_task.fn(sample_data, vignetting_filename) assert isinstance(corrected_punchdata, NDCube) From 4db668e03a8f3f60ecd89dd6c71d60cbdcd57e18 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 03:42:19 -0600 Subject: [PATCH 3/8] creates working version for simpunch --- punchbowl/data/tests/test_wcs.py | 56 +++++++-- punchbowl/data/wcs.py | 194 ++++++++++++++++++++++++------- 2 files changed, 199 insertions(+), 51 deletions(-) diff --git a/punchbowl/data/tests/test_wcs.py b/punchbowl/data/tests/test_wcs.py index 76d1d342..22bebab0 100644 --- a/punchbowl/data/tests/test_wcs.py +++ b/punchbowl/data/tests/test_wcs.py @@ -12,7 +12,12 @@ from sunpy.coordinates import frames from punchbowl.data.meta import NormalizedMetadata -from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial, load_trefoil_wcs +from punchbowl.data.wcs import ( + calculate_celestial_wcs_from_helio, + calculate_helio_wcs_from_celestial, + calculate_pc_matrix, + load_trefoil_wcs, +) _ROOT = os.path.abspath(os.path.dirname(__file__)) @@ -35,6 +40,25 @@ def test_sun_location(): assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec +def test_extract_crota(): + pc = calculate_pc_matrix(10*u.deg, [1, 1]) + wcs_celestial = WCS({"CRVAL1": 0, + "CRVAL2": 0, + "CRPIX1": 2047.5, + "CRPIX2": 2047.5, + "CDELT1": -0.0225, + "CDELT2": 0.0225, + "CUNIT1": "deg", + "CUNIT2": "deg", + "CTYPE1": "RA---AZP", + "CTYPE2": "DEC--AZP", + "PC1_1": pc[0, 0], + "PC1_2": pc[1, 0], + "PC2_1": pc[0, 1], + "PC2_2": pc[1, 1] + }) + assert extract_crota_from_wcs(wcs_celestial) == 10 * u.deg + def test_wcs_many_point_2d_check(): m = NormalizedMetadata.load_template("CTM", "2") date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc') @@ -152,11 +176,13 @@ def test_load_trefoil_wcs(): def test_helio_celestial_wcs(): header = fits.Header.fromtextfile(os.path.join(_ROOT, "example_header.txt")) + date_obs = Time(header['DATE-OBS'], format='isot', scale='utc') wcs_helio = WCS(header) - wcs_celestial = WCS(header, key='A') + wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (3, 4096, 4096)) + + wcs_celestial2 = WCS(header, key='A') - date_obs = Time(header['DATE-OBS'], format='isot', scale='utc') test_loc = EarthLocation.from_geocentric(0, 0, 0, unit=u.m) test_gcrs = SkyCoord(test_loc.get_gcrs(date_obs)) @@ -189,10 +215,13 @@ def test_helio_celestial_wcs(): assert np.nanmean(distances) < 0.1 +from punchbowl.data.wcs import extract_crota_from_wcs, get_p_angle + def test_back_and_forth_wcs_from_celestial(): date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc') sun_radec = get_sun(date_obs) + pc = calculate_pc_matrix(0, [0.0225, 0.0225]) wcs_celestial = WCS({"CRVAL1": sun_radec.ra.to(u.deg).value, "CRVAL2": sun_radec.dec.to(u.deg).value, "CRPIX1": 2047.5, @@ -201,11 +230,19 @@ def test_back_and_forth_wcs_from_celestial(): "CDELT2": 0.0225, "CUNIT1": "deg", "CUNIT2": "deg", - "CTYPE1": "RA---ARC", - "CTYPE2": "DEC--ARC"}) - + "CTYPE1": "RA---AZP", + "CTYPE2": "DEC--AZP", + "PC1_1": pc[0, 0], + "PC1_2": pc[1, 0], + "PC2_1": pc[0, 1], + "PC2_2": pc[1, 1] + }) + + print(extract_crota_from_wcs(wcs_celestial)) wcs_helio, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio)) wcs_celestial_recovered = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_celestial_recovered)) assert np.allclose(wcs_celestial.wcs.crval, wcs_celestial_recovered.wcs.crval) assert np.allclose(wcs_celestial.wcs.crpix, wcs_celestial_recovered.wcs.crpix) @@ -227,8 +264,11 @@ def test_back_and_forth_wcs_from_helio(): "CTYPE1": "HPLN-ARC", "CTYPE2": "HPLT-ARC"}) - wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10)) - wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio)) + wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio.copy(), date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_celestial)) + wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial.copy(), date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio_recovered)) assert np.allclose(wcs_helio.wcs.crval, wcs_helio_recovered.wcs.crval) assert np.allclose(wcs_helio.wcs.crpix, wcs_helio_recovered.wcs.crpix) diff --git a/punchbowl/data/wcs.py b/punchbowl/data/wcs.py index f1614c7f..93490c6e 100644 --- a/punchbowl/data/wcs.py +++ b/punchbowl/data/wcs.py @@ -3,9 +3,11 @@ import os from datetime import datetime +import astropy.coordinates import astropy.units as u import astropy.wcs.wcsapi import numpy as np +import sunpy.coordinates import sunpy.map from astropy.coordinates import GCRS, EarthLocation, SkyCoord, StokesSymbol, custom_stokes_symbol_mapping, get_sun from astropy.time import Time @@ -13,7 +15,6 @@ from astropy.wcs.utils import add_stokes_axis_to_wcs from sunpy.coordinates import frames from sunpy.coordinates.sun import _sun_north_angle_to_z -from sunpy.map import make_fitswcs_header _ROOT = os.path.abspath(os.path.dirname(__file__)) PUNCH_STOKES_MAPPING = custom_stokes_symbol_mapping({10: StokesSymbol("pB", "polarized brightness"), @@ -55,7 +56,7 @@ def calculate_helio_wcs_from_celestial(wcs_celestial: WCS, geocentric = GCRS(obstime=date_obs) p_angle = _sun_north_angle_to_z(geocentric) - crota = extract_crota_from_wcs(wcs_celestial) + rotation_angle = extract_crota_from_wcs(wcs_celestial).to(u.rad).value - get_p_angle(date_obs).rad new_header = sunpy.map.make_fitswcs_header( data_shape[1:] if is_3d else data_shape, @@ -64,9 +65,10 @@ def calculate_helio_wcs_from_celestial(wcs_celestial: WCS, [wcs_celestial.wcs.crpix[0] - 1, wcs_celestial.wcs.crpix[1] - 1] * u.pixel, ), scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), - rotation_angle=-p_angle - crota, + rotation_angle=rotation_angle*u.rad,#-crota - P(date_obs),#-(P(date_obs) + crota), observatory="PUNCH", projection_code=wcs_celestial.wcs.ctype[0][-3:], + unit=u.deg, ) wcs_helio = WCS(new_header) @@ -83,55 +85,161 @@ def get_sun_ra_dec(dt: datetime) -> (float, float): return position.ra.value, position.dec.value -def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_shape: tuple[int, int]) -> WCS: - """Calculate the celestial WCS from a helio WCS.""" - is_3d = len(data_shape) == 3 - - # we're at the center of the Earth - test_loc = EarthLocation.from_geocentric(0, 0, 0, unit=u.m) - test_gcrs = SkyCoord(test_loc.get_gcrs(date_obs)) - - reference_coord = SkyCoord( - wcs_helio.wcs.crval[0] * u.Unit(wcs_helio.wcs.cunit[0]), - wcs_helio.wcs.crval[1] * u.Unit(wcs_helio.wcs.cunit[1]), - frame="gcrs", - obstime=date_obs, - obsgeoloc=test_gcrs.cartesian, - obsgeovel=test_gcrs.velocity.to_cartesian(), - distance=test_gcrs.hcrs.distance, +def calculate_pc_matrix(crota: float, cdelt: (float, float)) -> np.ndarray: + """ + Calculate a PC matrix given CROTA and CDELT. + + Parameters + ---------- + crota : float + rotation angle from the WCS + cdelt : float + pixel size from the WCS + + Returns + ------- + np.ndarray + PC matrix + """ + return np.array( + [ + [np.cos(crota), np.sin(crota) * (cdelt[1] / cdelt[0])], + [-np.sin(crota) * (cdelt[0] / cdelt[1]), np.cos(crota)], + ], ) - reference_coord_arcsec = reference_coord.transform_to(frames.Helioprojective(observer=test_gcrs)) - - cdelt1 = (np.abs(wcs_helio.wcs.cdelt[0]) * u.deg).to(u.arcsec) - cdelt2 = (np.abs(wcs_helio.wcs.cdelt[1]) * u.deg).to(u.arcsec) - geocentric = GCRS(obstime=date_obs) - p_angle = _sun_north_angle_to_z(geocentric) +def _times_are_equal(time_1: astropy.time.Time, time_2: astropy.time.Time) -> bool: + # Ripped from sunpy, modified + # Checks whether times are equal + if isinstance(time_1, astropy.time.Time) and isinstance(time_2, astropy.time.Time): + # We explicitly perform the check in TAI to avoid possible numerical precision differences + # between a time in UTC and the same time after a UTC->TAI->UTC conversion + return np.all(time_1.tai == time_2.tai) + + # We also deem the times equal if one is None + return time_1 is None or time_2 is None + + +def get_p_angle(time: str="now") -> u.deg: + """ + Get the P angle. + + Return the position (P) angle for the Sun at a specified time, which is the angle between + geocentric north and solar north as seen from Earth, measured eastward from geocentric north. + The range of P is +/-26.3 degrees. + + Parameters + ---------- + time : {parse_time_types} + Time to use in a parse_time-compatible format + + Returns + ------- + out : `~astropy.coordinates.Angle` + The position angle + """ + obstime = sunpy.coordinates.sun.parse_time(time) + + # Define the frame where its Z axis is aligned with geocentric north + geocentric = astropy.coordinates.GCRS(obstime=obstime) + + return sunpy.coordinates.sun._sun_north_angle_to_z(geocentric) # noqa: SLF001 + + +@astropy.coordinates.frame_transform_graph.transform( + astropy.coordinates.FunctionTransform, + sunpy.coordinates.Helioprojective, + astropy.coordinates.GCRS) +def hpc_to_gcrs(HPcoord, GCRSframe): # noqa: ANN201, N803, ANN001 + """Convert helioprojective to GCRS.""" + if not _times_are_equal(HPcoord.obstime, GCRSframe.obstime): + raise ValueError("Obstimes are not equal") # noqa: TRY003, EM101 + obstime = HPcoord.obstime or GCRSframe.obstime + + # Compute the three angles we need + position = astropy.coordinates.get_sun(obstime) + ra, dec = position.ra.rad, position.dec.rad + p = get_p_angle(obstime).rad + + # Prepare rotation matrices for each + p_matrix = np.array([ + [1, 0, 0], + [0, np.cos(p), -np.sin(p)], + [0, np.sin(p), np.cos(p)], + ]) + + ra_matrix = np.array([ + [np.cos(ra), -np.sin(ra), 0], + [np.sin(ra), np.cos(ra), 0], + [0, 0, 1], + ]) + + dec_matrix = np.array([ + [np.cos(-dec), 0, np.sin(-dec)], + [0, 1, 0], + [-np.sin(-dec), 0, np.cos(-dec)], + ]) + + # Compose the matrices + matrix = ra_matrix @ dec_matrix @ p_matrix + + # Extract the input coordinates and negate the HP latitude, + # since it increases in the opposite direction from RA. + if HPcoord._is_2d: # noqa: SLF001 + rep = astropy.coordinates.UnitSphericalRepresentation( + -HPcoord.Tx, HPcoord.Ty) + else: + rep = astropy.coordinates.SphericalRepresentation( + -HPcoord.Tx, HPcoord.Ty, HPcoord.distance) + + # Apply the transformation + rep = rep.to_cartesian() + rep = rep.transform(matrix) + + # Match the input representation. (If the input was UnitSpherical, meaning there's no + # distance coordinate, this drops the distance coordinate.) + rep = rep.represent_as(type(HPcoord.data)) + + # Put the computed coordinates into the output frame + return GCRSframe.realize_frame(rep) - crota = extract_crota_from_wcs(wcs_helio) - new_header = make_fitswcs_header( - data_shape[1:] if is_3d else data_shape, - reference_coord_arcsec, - reference_pixel=u.Quantity( - [wcs_helio.wcs.crpix[0] - 1, wcs_helio.wcs.crpix[1] - 1] * u.pixel, - ), - scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), - rotation_angle=-p_angle - crota, - observatory="PUNCH", - projection_code=wcs_helio.wcs.ctype[0][-3:], - ) +def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_shape: tuple[int, int]) -> WCS: + """Calculate the celestial WCS from a helio WCS.""" + is_3d = len(data_shape) == 3 - wcs_celestial = WCS(new_header) - wcs_celestial.wcs.ctype = "RA---ARC", "DEC--ARC" - sun_location = get_sun_ra_dec(date_obs) - wcs_celestial.wcs.crval = sun_location[0], sun_location[1] - wcs_celestial.wcs.cdelt = wcs_celestial.wcs.cdelt * (-1, 1) + old_crval = SkyCoord(wcs_helio.wcs.crval[0] * u.deg, wcs_helio.wcs.crval[1] * u.deg, + frame="helioprojective", observer="earth", obstime=date_obs) + new_crval = old_crval.transform_to(GCRS) + + rotation_angle = -(extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad) + rotation_angle = extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad + new_pc_matrix = calculate_pc_matrix(rotation_angle, wcs_helio.wcs.cdelt) + + cdelt1 = np.abs(wcs_helio.wcs.cdelt[0]) * u.deg + cdelt2 = np.abs(wcs_helio.wcs.cdelt[1]) * u.deg + + projection_code = wcs_helio.wcs.ctype[0][-3:] + + wcs_celestial = WCS({"CRVAL1": new_crval.ra.to(u.deg).value, + "CRVAL2": new_crval.dec.to(u.deg).value, + "CUNIT1": "deg", + "CUNIT2": "deg", + "CRPIX1": wcs_helio.wcs.crpix[0], + "CRPIX2": wcs_helio.wcs.crpix[1], + "CDELT1": -cdelt1.to(u.deg).value, + "CDELT2": cdelt2.to(u.deg).value, + "CTYPE1": f"RA---{projection_code}", + "CTYPE2": f"DEC--{projection_code}", + "PC1_1": new_pc_matrix[0, 0], + "PC1_2": new_pc_matrix[1, 0], + "PC2_1": new_pc_matrix[0, 1], + "PC2_2": new_pc_matrix[1, 1]}, + ) if is_3d: wcs_celestial = add_stokes_axis_to_wcs(wcs_celestial, 2) - wcs_celestial.array_shape = data_shape return wcs_celestial From 19439f4d5b45a8241c0ba728a06921a63460d817 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 03:44:55 -0600 Subject: [PATCH 4/8] last cleanups --- punchbowl/exceptions.py | 4 +-- punchbowl/level1/stray_light.py | 36 +++++++++++---------- punchbowl/level1/tests/test_vignette.py | 16 +++++---- punchbowl/level1/vignette.py | 43 +++++++++++++------------ 4 files changed, 53 insertions(+), 46 deletions(-) diff --git a/punchbowl/exceptions.py b/punchbowl/exceptions.py index c6136041..4089900a 100644 --- a/punchbowl/exceptions.py +++ b/punchbowl/exceptions.py @@ -22,8 +22,8 @@ class NoCalibrationDataWarning(PUNCHBowlWarning): class ExtraMetadataWarning(PUNCHBowlWarning): """Extra metadata found but ignored.""" -class IncorrectPolarizationState(PUNCHBowlWarning): +class IncorrectPolarizationStateWarning(PUNCHBowlWarning): """Mismatched polarization state detected but ignored.""" -class IncorrectTelescope(PUNCHBowlWarning): +class IncorrectTelescopeWarning(PUNCHBowlWarning): """Mismatched telescope detected but ignored.""" diff --git a/punchbowl/level1/stray_light.py b/punchbowl/level1/stray_light.py index 3b7aa792..1ca3d50c 100644 --- a/punchbowl/level1/stray_light.py +++ b/punchbowl/level1/stray_light.py @@ -1,19 +1,21 @@ import pathlib import warnings +from astropy.time import Time from ndcube import NDCube from prefect import get_run_logger, task -from astropy.time import Time from punchbowl.data import load_ndcube_from_fits -from punchbowl.exceptions import InvalidDataError -from punchbowl.exceptions import LargeTimeDeltaWarning -from punchbowl.exceptions import IncorrectPolarizationState -from punchbowl.exceptions import IncorrectTelescope +from punchbowl.exceptions import ( + IncorrectPolarizationStateWarning, + IncorrectTelescopeWarning, + InvalidDataError, + LargeTimeDeltaWarning, +) @task -def remove_stray_light_task(data_object: NDCube, stray_light_file: pathlib) -> NDCube: +def remove_stray_light_task(data_object: NDCube, stray_light_path: pathlib) -> NDCube: """ Prefect task to remove stray light from an image. @@ -39,7 +41,7 @@ def remove_stray_light_task(data_object: NDCube, stray_light_file: pathlib) -> N data_object : PUNCHData data to operate on - stray_light_file: pathlib + stray_light_path: pathlib path to stray light model to apply to data Returns @@ -51,30 +53,30 @@ def remove_stray_light_task(data_object: NDCube, stray_light_file: pathlib) -> N logger = get_run_logger() logger.info("remove_stray_light started") - if stray_light_file is None: + if stray_light_path is None: data_object.meta.history.add_now("LEVEL1-remove_stray_light", "Stray light correction skipped") - elif not stray_light_file.exists(): - msg = f"File {stray_light_file} does not exist." + elif not stray_light_path.exists(): + msg = f"File {stray_light_path} does not exist." raise InvalidDataError(msg) else: - stray_light_model = load_ndcube_from_fits(stray_light_file) + stray_light_model = load_ndcube_from_fits(stray_light_path) stray_light_model_date = Time(stray_light_model.meta["DATE-OBS"].value) observation_date = Time(data_object.meta["DATE-OBS"].value) if abs((stray_light_model_date - observation_date).to("day").value) > 14: - msg=f"Calibration file {stray_light_file} contains data created greater than 2 weeks from the obsveration" + msg=f"Calibration file {stray_light_path} contains data created greater than 2 weeks from the obsveration" warnings.warn(msg,LargeTimeDeltaWarning) if stray_light_model.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value: - msg=f"Incorrect TELESCOP value within {stray_light_file}" - warnings.warn(msg, IncorrectTelescope) + msg=f"Incorrect TELESCOP value within {stray_light_path}" + warnings.warn(msg, IncorrectTelescopeWarning) elif stray_light_model.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value: - msg=f"Incorrect polarization state within {stray_light_file}" - warnings.warn(msg, IncorrectPolarizationState) + msg=f"Incorrect polarization state within {stray_light_path}" + warnings.warn(msg, IncorrectPolarizationStateWarning) elif stray_light_model.data.shape != data_object.data.shape: - msg = f"Incorrect stray light function shape within {stray_light_file}" + msg = f"Incorrect stray light function shape within {stray_light_path}" raise InvalidDataError(msg) else: data_object.data[:, :] -= stray_light_model.data[:, :] diff --git a/punchbowl/level1/tests/test_vignette.py b/punchbowl/level1/tests/test_vignette.py index 8282a470..f97cfec0 100644 --- a/punchbowl/level1/tests/test_vignette.py +++ b/punchbowl/level1/tests/test_vignette.py @@ -1,17 +1,19 @@ import pathlib +from datetime import datetime import numpy as np import pytest from ndcube import NDCube from prefect.logging import disable_run_logger -from datetime import datetime from punchbowl.data.tests.test_io import sample_ndcube -from punchbowl.exceptions import InvalidDataError +from punchbowl.exceptions import ( + IncorrectPolarizationStateWarning, + IncorrectTelescopeWarning, + InvalidDataError, + LargeTimeDeltaWarning, +) from punchbowl.level1.vignette import correct_vignetting_task -from punchbowl.exceptions import LargeTimeDeltaWarning -from punchbowl.exceptions import IncorrectPolarizationState -from punchbowl.exceptions import IncorrectTelescope THIS_DIRECTORY = pathlib.Path(__file__).parent.resolve() @@ -71,7 +73,7 @@ def test_invalid_polarization_state(sample_ndcube) -> None: vignetting_filename = THIS_DIRECTORY / "data" / "PUNCH_L1_GR1_20240222163425.fits" with disable_run_logger(): - with pytest.warns(IncorrectPolarizationState): + with pytest.warns(IncorrectPolarizationStateWarning): corrected_punchdata = correct_vignetting_task.fn(sample_data, vignetting_filename) assert isinstance(corrected_punchdata, NDCube) @@ -87,7 +89,7 @@ def test_invalid_telescope(sample_ndcube) -> None: vignetting_filename = THIS_DIRECTORY / "data" / "PUNCH_L1_GR1_20240222163425.fits" with disable_run_logger(): - with pytest.warns(IncorrectTelescope): + with pytest.warns(IncorrectTelescopeWarning): corrected_punchdata = correct_vignetting_task.fn(sample_data, vignetting_filename) assert isinstance(corrected_punchdata, NDCube) diff --git a/punchbowl/level1/vignette.py b/punchbowl/level1/vignette.py index eac7f000..e588e9dd 100644 --- a/punchbowl/level1/vignette.py +++ b/punchbowl/level1/vignette.py @@ -1,19 +1,22 @@ import pathlib import warnings +from astropy.time import Time from ndcube import NDCube from prefect import get_run_logger, task -from astropy.time import Time from punchbowl.data import load_ndcube_from_fits -from punchbowl.exceptions import InvalidDataError -from punchbowl.exceptions import NoCalibrationDataWarning -from punchbowl.exceptions import LargeTimeDeltaWarning -from punchbowl.exceptions import IncorrectPolarizationState -from punchbowl.exceptions import IncorrectTelescope +from punchbowl.exceptions import ( + IncorrectPolarizationStateWarning, + IncorrectTelescopeWarning, + InvalidDataError, + LargeTimeDeltaWarning, + NoCalibrationDataWarning, +) + @task -def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> NDCube: +def correct_vignetting_task(data_object: NDCube, vignetting_path: pathlib) -> NDCube: """ Prefect task to correct the vignetting of an image. @@ -46,7 +49,7 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND data_object : PUNCHData data on which to operate - vignetting_file : pathlib + vignetting_path : pathlib path to vignetting function to apply to input data Returns @@ -58,35 +61,35 @@ def correct_vignetting_task(data_object: NDCube, vignetting_file: pathlib) -> ND logger = get_run_logger() logger.info("correct_vignetting started") - if vignetting_file is None: + if vignetting_path is None: data_object.meta.history.add_now("LEVEL1-correct_vignetting", "Vignetting skipped") - msg=f"Calibration file {vignetting_file} is unavailable, vignetting correction not applied" + msg=f"Calibration file {vignetting_path} is unavailable, vignetting correction not applied" warnings.warn(msg, NoCalibrationDataWarning) - elif not vignetting_file.exists(): - msg = f"File {vignetting_file} does not exist." + elif not vignetting_path.exists(): + msg = f"File {vignetting_path} does not exist." raise InvalidDataError(msg) else: - vignetting_function = load_ndcube_from_fits(vignetting_file) + vignetting_function = load_ndcube_from_fits(vignetting_path) vignetting_function_date = Time(vignetting_function.meta["DATE-OBS"].value) observation_date = Time(data_object.meta["DATE-OBS"].value) if abs((vignetting_function_date - observation_date).to("day").value) > 14: - msg=f"Calibration file {vignetting_file} contains data created greater than 2 weeks from the obsveration" + msg=f"Calibration file {vignetting_path} contains data created greater than 2 weeks from the obsveration" warnings.warn(msg, LargeTimeDeltaWarning) if vignetting_function.meta["TELESCOP"].value != data_object.meta["TELESCOP"].value: - msg=f"Incorrect TELESCOP value within {vignetting_file}" - warnings.warn(msg, IncorrectTelescope) + msg=f"Incorrect TELESCOP value within {vignetting_path}" + warnings.warn(msg, IncorrectTelescopeWarning) elif vignetting_function.meta["OBSLAYR1"].value != data_object.meta["OBSLAYR1"].value: - msg=f"Incorrect polarization state within {vignetting_file}" - warnings.warn(msg, IncorrectPolarizationState) + msg=f"Incorrect polarization state within {vignetting_path}" + warnings.warn(msg, IncorrectPolarizationStateWarning) elif vignetting_function.data.shape != data_object.data.shape: - msg=f"Incorrect vignetting function shape within {vignetting_file}" + msg=f"Incorrect vignetting function shape within {vignetting_path}" raise InvalidDataError(msg) else: data_object.data[:, :] /= vignetting_function.data[:, :] data_object.meta.history.add_now("LEVEL1-correct_vignetting", - f"Vignetting corrected using {vignetting_file}") + f"Vignetting corrected using {vignetting_path}") logger.info("correct_vignetting finished") return data_object From ecf3fbf14001e5249b526d7934abd039369d3c90 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 03:42:19 -0600 Subject: [PATCH 5/8] creates working version for simpunch --- punchbowl/data/tests/test_wcs.py | 56 +++++++-- punchbowl/data/wcs.py | 194 ++++++++++++++++++++++++------- 2 files changed, 199 insertions(+), 51 deletions(-) diff --git a/punchbowl/data/tests/test_wcs.py b/punchbowl/data/tests/test_wcs.py index 76d1d342..22bebab0 100644 --- a/punchbowl/data/tests/test_wcs.py +++ b/punchbowl/data/tests/test_wcs.py @@ -12,7 +12,12 @@ from sunpy.coordinates import frames from punchbowl.data.meta import NormalizedMetadata -from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial, load_trefoil_wcs +from punchbowl.data.wcs import ( + calculate_celestial_wcs_from_helio, + calculate_helio_wcs_from_celestial, + calculate_pc_matrix, + load_trefoil_wcs, +) _ROOT = os.path.abspath(os.path.dirname(__file__)) @@ -35,6 +40,25 @@ def test_sun_location(): assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec +def test_extract_crota(): + pc = calculate_pc_matrix(10*u.deg, [1, 1]) + wcs_celestial = WCS({"CRVAL1": 0, + "CRVAL2": 0, + "CRPIX1": 2047.5, + "CRPIX2": 2047.5, + "CDELT1": -0.0225, + "CDELT2": 0.0225, + "CUNIT1": "deg", + "CUNIT2": "deg", + "CTYPE1": "RA---AZP", + "CTYPE2": "DEC--AZP", + "PC1_1": pc[0, 0], + "PC1_2": pc[1, 0], + "PC2_1": pc[0, 1], + "PC2_2": pc[1, 1] + }) + assert extract_crota_from_wcs(wcs_celestial) == 10 * u.deg + def test_wcs_many_point_2d_check(): m = NormalizedMetadata.load_template("CTM", "2") date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc') @@ -152,11 +176,13 @@ def test_load_trefoil_wcs(): def test_helio_celestial_wcs(): header = fits.Header.fromtextfile(os.path.join(_ROOT, "example_header.txt")) + date_obs = Time(header['DATE-OBS'], format='isot', scale='utc') wcs_helio = WCS(header) - wcs_celestial = WCS(header, key='A') + wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (3, 4096, 4096)) + + wcs_celestial2 = WCS(header, key='A') - date_obs = Time(header['DATE-OBS'], format='isot', scale='utc') test_loc = EarthLocation.from_geocentric(0, 0, 0, unit=u.m) test_gcrs = SkyCoord(test_loc.get_gcrs(date_obs)) @@ -189,10 +215,13 @@ def test_helio_celestial_wcs(): assert np.nanmean(distances) < 0.1 +from punchbowl.data.wcs import extract_crota_from_wcs, get_p_angle + def test_back_and_forth_wcs_from_celestial(): date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc') sun_radec = get_sun(date_obs) + pc = calculate_pc_matrix(0, [0.0225, 0.0225]) wcs_celestial = WCS({"CRVAL1": sun_radec.ra.to(u.deg).value, "CRVAL2": sun_radec.dec.to(u.deg).value, "CRPIX1": 2047.5, @@ -201,11 +230,19 @@ def test_back_and_forth_wcs_from_celestial(): "CDELT2": 0.0225, "CUNIT1": "deg", "CUNIT2": "deg", - "CTYPE1": "RA---ARC", - "CTYPE2": "DEC--ARC"}) - + "CTYPE1": "RA---AZP", + "CTYPE2": "DEC--AZP", + "PC1_1": pc[0, 0], + "PC1_2": pc[1, 0], + "PC2_1": pc[0, 1], + "PC2_2": pc[1, 1] + }) + + print(extract_crota_from_wcs(wcs_celestial)) wcs_helio, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio)) wcs_celestial_recovered = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_celestial_recovered)) assert np.allclose(wcs_celestial.wcs.crval, wcs_celestial_recovered.wcs.crval) assert np.allclose(wcs_celestial.wcs.crpix, wcs_celestial_recovered.wcs.crpix) @@ -227,8 +264,11 @@ def test_back_and_forth_wcs_from_helio(): "CTYPE1": "HPLN-ARC", "CTYPE2": "HPLT-ARC"}) - wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10)) - wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio)) + wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio.copy(), date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_celestial)) + wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial.copy(), date_obs, (10, 10)) + print(extract_crota_from_wcs(wcs_helio_recovered)) assert np.allclose(wcs_helio.wcs.crval, wcs_helio_recovered.wcs.crval) assert np.allclose(wcs_helio.wcs.crpix, wcs_helio_recovered.wcs.crpix) diff --git a/punchbowl/data/wcs.py b/punchbowl/data/wcs.py index f1614c7f..93490c6e 100644 --- a/punchbowl/data/wcs.py +++ b/punchbowl/data/wcs.py @@ -3,9 +3,11 @@ import os from datetime import datetime +import astropy.coordinates import astropy.units as u import astropy.wcs.wcsapi import numpy as np +import sunpy.coordinates import sunpy.map from astropy.coordinates import GCRS, EarthLocation, SkyCoord, StokesSymbol, custom_stokes_symbol_mapping, get_sun from astropy.time import Time @@ -13,7 +15,6 @@ from astropy.wcs.utils import add_stokes_axis_to_wcs from sunpy.coordinates import frames from sunpy.coordinates.sun import _sun_north_angle_to_z -from sunpy.map import make_fitswcs_header _ROOT = os.path.abspath(os.path.dirname(__file__)) PUNCH_STOKES_MAPPING = custom_stokes_symbol_mapping({10: StokesSymbol("pB", "polarized brightness"), @@ -55,7 +56,7 @@ def calculate_helio_wcs_from_celestial(wcs_celestial: WCS, geocentric = GCRS(obstime=date_obs) p_angle = _sun_north_angle_to_z(geocentric) - crota = extract_crota_from_wcs(wcs_celestial) + rotation_angle = extract_crota_from_wcs(wcs_celestial).to(u.rad).value - get_p_angle(date_obs).rad new_header = sunpy.map.make_fitswcs_header( data_shape[1:] if is_3d else data_shape, @@ -64,9 +65,10 @@ def calculate_helio_wcs_from_celestial(wcs_celestial: WCS, [wcs_celestial.wcs.crpix[0] - 1, wcs_celestial.wcs.crpix[1] - 1] * u.pixel, ), scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), - rotation_angle=-p_angle - crota, + rotation_angle=rotation_angle*u.rad,#-crota - P(date_obs),#-(P(date_obs) + crota), observatory="PUNCH", projection_code=wcs_celestial.wcs.ctype[0][-3:], + unit=u.deg, ) wcs_helio = WCS(new_header) @@ -83,55 +85,161 @@ def get_sun_ra_dec(dt: datetime) -> (float, float): return position.ra.value, position.dec.value -def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_shape: tuple[int, int]) -> WCS: - """Calculate the celestial WCS from a helio WCS.""" - is_3d = len(data_shape) == 3 - - # we're at the center of the Earth - test_loc = EarthLocation.from_geocentric(0, 0, 0, unit=u.m) - test_gcrs = SkyCoord(test_loc.get_gcrs(date_obs)) - - reference_coord = SkyCoord( - wcs_helio.wcs.crval[0] * u.Unit(wcs_helio.wcs.cunit[0]), - wcs_helio.wcs.crval[1] * u.Unit(wcs_helio.wcs.cunit[1]), - frame="gcrs", - obstime=date_obs, - obsgeoloc=test_gcrs.cartesian, - obsgeovel=test_gcrs.velocity.to_cartesian(), - distance=test_gcrs.hcrs.distance, +def calculate_pc_matrix(crota: float, cdelt: (float, float)) -> np.ndarray: + """ + Calculate a PC matrix given CROTA and CDELT. + + Parameters + ---------- + crota : float + rotation angle from the WCS + cdelt : float + pixel size from the WCS + + Returns + ------- + np.ndarray + PC matrix + """ + return np.array( + [ + [np.cos(crota), np.sin(crota) * (cdelt[1] / cdelt[0])], + [-np.sin(crota) * (cdelt[0] / cdelt[1]), np.cos(crota)], + ], ) - reference_coord_arcsec = reference_coord.transform_to(frames.Helioprojective(observer=test_gcrs)) - - cdelt1 = (np.abs(wcs_helio.wcs.cdelt[0]) * u.deg).to(u.arcsec) - cdelt2 = (np.abs(wcs_helio.wcs.cdelt[1]) * u.deg).to(u.arcsec) - geocentric = GCRS(obstime=date_obs) - p_angle = _sun_north_angle_to_z(geocentric) +def _times_are_equal(time_1: astropy.time.Time, time_2: astropy.time.Time) -> bool: + # Ripped from sunpy, modified + # Checks whether times are equal + if isinstance(time_1, astropy.time.Time) and isinstance(time_2, astropy.time.Time): + # We explicitly perform the check in TAI to avoid possible numerical precision differences + # between a time in UTC and the same time after a UTC->TAI->UTC conversion + return np.all(time_1.tai == time_2.tai) + + # We also deem the times equal if one is None + return time_1 is None or time_2 is None + + +def get_p_angle(time: str="now") -> u.deg: + """ + Get the P angle. + + Return the position (P) angle for the Sun at a specified time, which is the angle between + geocentric north and solar north as seen from Earth, measured eastward from geocentric north. + The range of P is +/-26.3 degrees. + + Parameters + ---------- + time : {parse_time_types} + Time to use in a parse_time-compatible format + + Returns + ------- + out : `~astropy.coordinates.Angle` + The position angle + """ + obstime = sunpy.coordinates.sun.parse_time(time) + + # Define the frame where its Z axis is aligned with geocentric north + geocentric = astropy.coordinates.GCRS(obstime=obstime) + + return sunpy.coordinates.sun._sun_north_angle_to_z(geocentric) # noqa: SLF001 + + +@astropy.coordinates.frame_transform_graph.transform( + astropy.coordinates.FunctionTransform, + sunpy.coordinates.Helioprojective, + astropy.coordinates.GCRS) +def hpc_to_gcrs(HPcoord, GCRSframe): # noqa: ANN201, N803, ANN001 + """Convert helioprojective to GCRS.""" + if not _times_are_equal(HPcoord.obstime, GCRSframe.obstime): + raise ValueError("Obstimes are not equal") # noqa: TRY003, EM101 + obstime = HPcoord.obstime or GCRSframe.obstime + + # Compute the three angles we need + position = astropy.coordinates.get_sun(obstime) + ra, dec = position.ra.rad, position.dec.rad + p = get_p_angle(obstime).rad + + # Prepare rotation matrices for each + p_matrix = np.array([ + [1, 0, 0], + [0, np.cos(p), -np.sin(p)], + [0, np.sin(p), np.cos(p)], + ]) + + ra_matrix = np.array([ + [np.cos(ra), -np.sin(ra), 0], + [np.sin(ra), np.cos(ra), 0], + [0, 0, 1], + ]) + + dec_matrix = np.array([ + [np.cos(-dec), 0, np.sin(-dec)], + [0, 1, 0], + [-np.sin(-dec), 0, np.cos(-dec)], + ]) + + # Compose the matrices + matrix = ra_matrix @ dec_matrix @ p_matrix + + # Extract the input coordinates and negate the HP latitude, + # since it increases in the opposite direction from RA. + if HPcoord._is_2d: # noqa: SLF001 + rep = astropy.coordinates.UnitSphericalRepresentation( + -HPcoord.Tx, HPcoord.Ty) + else: + rep = astropy.coordinates.SphericalRepresentation( + -HPcoord.Tx, HPcoord.Ty, HPcoord.distance) + + # Apply the transformation + rep = rep.to_cartesian() + rep = rep.transform(matrix) + + # Match the input representation. (If the input was UnitSpherical, meaning there's no + # distance coordinate, this drops the distance coordinate.) + rep = rep.represent_as(type(HPcoord.data)) + + # Put the computed coordinates into the output frame + return GCRSframe.realize_frame(rep) - crota = extract_crota_from_wcs(wcs_helio) - new_header = make_fitswcs_header( - data_shape[1:] if is_3d else data_shape, - reference_coord_arcsec, - reference_pixel=u.Quantity( - [wcs_helio.wcs.crpix[0] - 1, wcs_helio.wcs.crpix[1] - 1] * u.pixel, - ), - scale=u.Quantity([cdelt1, cdelt2] * u.arcsec / u.pix), - rotation_angle=-p_angle - crota, - observatory="PUNCH", - projection_code=wcs_helio.wcs.ctype[0][-3:], - ) +def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_shape: tuple[int, int]) -> WCS: + """Calculate the celestial WCS from a helio WCS.""" + is_3d = len(data_shape) == 3 - wcs_celestial = WCS(new_header) - wcs_celestial.wcs.ctype = "RA---ARC", "DEC--ARC" - sun_location = get_sun_ra_dec(date_obs) - wcs_celestial.wcs.crval = sun_location[0], sun_location[1] - wcs_celestial.wcs.cdelt = wcs_celestial.wcs.cdelt * (-1, 1) + old_crval = SkyCoord(wcs_helio.wcs.crval[0] * u.deg, wcs_helio.wcs.crval[1] * u.deg, + frame="helioprojective", observer="earth", obstime=date_obs) + new_crval = old_crval.transform_to(GCRS) + + rotation_angle = -(extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad) + rotation_angle = extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad + new_pc_matrix = calculate_pc_matrix(rotation_angle, wcs_helio.wcs.cdelt) + + cdelt1 = np.abs(wcs_helio.wcs.cdelt[0]) * u.deg + cdelt2 = np.abs(wcs_helio.wcs.cdelt[1]) * u.deg + + projection_code = wcs_helio.wcs.ctype[0][-3:] + + wcs_celestial = WCS({"CRVAL1": new_crval.ra.to(u.deg).value, + "CRVAL2": new_crval.dec.to(u.deg).value, + "CUNIT1": "deg", + "CUNIT2": "deg", + "CRPIX1": wcs_helio.wcs.crpix[0], + "CRPIX2": wcs_helio.wcs.crpix[1], + "CDELT1": -cdelt1.to(u.deg).value, + "CDELT2": cdelt2.to(u.deg).value, + "CTYPE1": f"RA---{projection_code}", + "CTYPE2": f"DEC--{projection_code}", + "PC1_1": new_pc_matrix[0, 0], + "PC1_2": new_pc_matrix[1, 0], + "PC2_1": new_pc_matrix[0, 1], + "PC2_2": new_pc_matrix[1, 1]}, + ) if is_3d: wcs_celestial = add_stokes_axis_to_wcs(wcs_celestial, 2) - wcs_celestial.array_shape = data_shape return wcs_celestial From 36c2de253dfd56acc81ad775a70f4dfb10c90b93 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 03:52:25 -0600 Subject: [PATCH 6/8] ruff fixes --- punchbowl/data/wcs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/punchbowl/data/wcs.py b/punchbowl/data/wcs.py index 93490c6e..815f4b63 100644 --- a/punchbowl/data/wcs.py +++ b/punchbowl/data/wcs.py @@ -100,6 +100,7 @@ def calculate_pc_matrix(crota: float, cdelt: (float, float)) -> np.ndarray: ------- np.ndarray PC matrix + """ return np.array( [ @@ -138,6 +139,7 @@ def get_p_angle(time: str="now") -> u.deg: ------- out : `~astropy.coordinates.Angle` The position angle + """ obstime = sunpy.coordinates.sun.parse_time(time) From a355116ebe9c65683f13f94f73b93d3b41f2f56d Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 12:08:13 -0600 Subject: [PATCH 7/8] remove old code --- punchbowl/data/io.py | 68 -------------------------------------------- 1 file changed, 68 deletions(-) diff --git a/punchbowl/data/io.py b/punchbowl/data/io.py index 6670938f..578c2255 100644 --- a/punchbowl/data/io.py +++ b/punchbowl/data/io.py @@ -3,21 +3,14 @@ import os.path import astropy.units as u -import astropy.wcs.wcsapi import matplotlib as mpl import numpy as np -from astropy.coordinates import SkyCoord from astropy.io import fits -from astropy.io.fits import Header from astropy.nddata import StdDevUncertainty -from astropy.time import Time from astropy.wcs import WCS from ndcube import NDCube -from sunpy.coordinates import frames, sun -from sunpy.map import solar_angular_radius from punchbowl.data.meta import NormalizedMetadata -from punchbowl.data.wcs import calculate_helio_wcs_from_celestial _ROOT = os.path.abspath(os.path.dirname(__file__)) @@ -49,67 +42,6 @@ def write_ndcube_to_fits(cube: NDCube, ) -def construct_wcs_header_fields(cube: NDCube) -> Header: - """ - Compute primary and secondary WCS header cards to add to a data object. - - Returns - ------- - Header - - """ - date_obs = Time(cube.meta.datetime) - - celestial_wcs_header = cube.wcs.to_header() - output_header = astropy.io.fits.Header() - - unused_keys = [ - "DATE-OBS", - "DATE-BEG", - "DATE-AVG", - "DATE-END", - "DATE", - "MJD-OBS", - "TELAPSE", - "RSUN_REF", - "TIMESYS", - ] - - helio_wcs, p_angle = calculate_helio_wcs_from_celestial( - wcs_celestial=cube.wcs, date_obs=date_obs, data_shape=cube.data.shape, - ) - - helio_wcs_hdul_reference = helio_wcs.to_fits() - helio_wcs_header = helio_wcs_hdul_reference[0].header - - for key in unused_keys: - if key in celestial_wcs_header: - del celestial_wcs_header[key] - if key in helio_wcs_header: - del helio_wcs_header[key] - - if cube.meta["CTYPE1"] is not None: - for key, value in helio_wcs.to_header().items(): - output_header[key] = value - if cube.meta["CTYPE1A"] is not None: - for key, value in celestial_wcs_header.items(): - output_header[key + "A"] = value - - center_helio_coord = SkyCoord( - helio_wcs.wcs.crval[0] * u.deg, - helio_wcs.wcs.crval[1] * u.deg, - frame=frames.Helioprojective, - obstime=date_obs, - observer="earth", - ) - - output_header["RSUN_ARC"] = solar_angular_radius(center_helio_coord).value - output_header["SOLAR_EP"] = p_angle.value - output_header["CAR_ROT"] = float(sun.carrington_rotation_number(t=date_obs)) - - return output_header - - def _write_fits(cube: NDCube, filename: str, overwrite: bool = True, uncertainty_quantize_level: float = -2.0) -> None: _update_statistics(cube) From 7a1acc8355b21e205691a551b133f73189efd9c3 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Thu, 15 Aug 2024 12:08:38 -0600 Subject: [PATCH 8/8] finalize wcs conversions --- punchbowl/data/tests/test_wcs.py | 72 ++++++++++++++++++++++++-------- punchbowl/data/wcs.py | 70 +++++++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 21 deletions(-) diff --git a/punchbowl/data/tests/test_wcs.py b/punchbowl/data/tests/test_wcs.py index 22bebab0..8e973b67 100644 --- a/punchbowl/data/tests/test_wcs.py +++ b/punchbowl/data/tests/test_wcs.py @@ -25,21 +25,38 @@ def test_sun_location(): time_current = Time(datetime.utcnow()) - skycoord_sun = astropy.coordinates.get_sun(Time(datetime.utcnow())) + skycoord_sun = astropy.coordinates.get_sun(time_current) skycoord_origin = SkyCoord(0*u.deg, 0*u.deg, frame=frames.Helioprojective, obstime=time_current, observer='earth') - with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): - skycoord_origin_celestial = skycoord_origin.transform_to(GCRS) + # with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): + skycoord_origin_celestial = skycoord_origin.transform_to(GCRS) - with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): - assert skycoord_origin_celestial.separation(skycoord_sun) < 1 * u.arcsec - assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec + # with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): + assert skycoord_origin_celestial.separation(skycoord_sun) < 1 * u.arcsec + assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec +def test_sun_location_gcrs(): + time_current = Time(datetime.utcnow()) + + skycoord_sun = astropy.coordinates.get_sun(time_current) + + skycoord_origin = SkyCoord(0*u.deg, 0*u.deg, + frame=frames.Helioprojective, + obstime=time_current, + observer='earth') + + # with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): + skycoord_origin_helio = skycoord_sun.transform_to(frames.Helioprojective) + + # with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer): + assert skycoord_origin_helio.separation(skycoord_sun) < 1 * u.arcsec + assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec + def test_extract_crota(): pc = calculate_pc_matrix(10*u.deg, [1, 1]) wcs_celestial = WCS({"CRVAL1": 0, @@ -91,7 +108,7 @@ def test_wcs_many_point_2d_check(): points_helio = wcs_helio.all_pix2world(input_coords, 0) output_coords = [] - + intermediates = [] for c_pix, c_celestial, c_helio in zip(input_coords, points_celestial, points_helio): skycoord_celestial = SkyCoord(c_celestial[0] * u.deg, c_celestial[1] * u.deg, frame=GCRS, @@ -103,10 +120,12 @@ def test_wcs_many_point_2d_check(): ) intermediate = skycoord_celestial.transform_to(frames.Helioprojective) + intermediates.append(intermediate) output_coords.append(wcs_helio.all_world2pix(intermediate.data.lon.to(u.deg).value, intermediate.data.lat.to(u.deg).value, 0)) output_coords = np.array(output_coords) + print(intermediates[0].Tx.deg, intermediates[0].Ty.deg, points_helio[0]) distances = np.linalg.norm(input_coords - output_coords, axis=1) assert np.mean(distances) < 0.1 @@ -176,8 +195,20 @@ def test_load_trefoil_wcs(): def test_helio_celestial_wcs(): header = fits.Header.fromtextfile(os.path.join(_ROOT, "example_header.txt")) - date_obs = Time(header['DATE-OBS'], format='isot', scale='utc') + date_obs = Time(header['DATE-OBS']) + # date_obs = Time("2021-06-20T00:00:00.000", format='isot', scale='utc') + # + # wcs_helio = WCS({"CRVAL1": 0.0, + # "CRVAL2": 0.0, + # "CRPIX1": 2047.5, + # "CRPIX2": 2047.5, + # "CDELT1": 0.0225, + # "CDELT2": 0.0225, + # "CUNIT1": "deg", + # "CUNIT2": "deg", + # "CTYPE1": "HPLN-ARC", + # "CTYPE2": "HPLT-ARC"}) wcs_helio = WCS(header) wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (3, 4096, 4096)) @@ -190,7 +221,8 @@ def test_helio_celestial_wcs(): input_coords = np.stack([ np.linspace(0, 4096, npoints).astype(int), np.linspace(0, 4096, npoints).astype(int), - np.ones(npoints, dtype=int),], axis=1) + np.ones(npoints, dtype=int),], + axis=1) points_celestial = wcs_celestial.all_pix2world(input_coords, 0) points_helio = wcs_helio.all_pix2world(input_coords, 0) @@ -200,15 +232,19 @@ def test_helio_celestial_wcs(): skycoord_celestial = SkyCoord(c_celestial[0] * u.deg, c_celestial[1] * u.deg, frame=GCRS, obstime=date_obs, - observer=test_gcrs, - obsgeoloc=test_gcrs.cartesian, - obsgeovel=test_gcrs.velocity.to_cartesian(), - distance=test_gcrs.hcrs.distance + observer="earth", + # observer=test_gcrs, + # obsgeoloc=test_gcrs.cartesian, + # obsgeovel=test_gcrs.velocity.to_cartesian(), + # distance=test_gcrs.hcrs.distance ) - intermediate = skycoord_celestial.transform_to(frames.Helioprojective) - output_coords.append(wcs_helio.all_world2pix(intermediate.data.lon.to(u.deg).value, - intermediate.data.lat.to(u.deg).value, 2, 0)) + intermediate = skycoord_celestial.transform_to(frames.Helioprojective(observer='earth', obstime=date_obs)) + # final = SkyCoord(intermediate.Tx, intermediate.Ty, frame="helioprojective", observer=intermediate.observer, obstime=intermediate.obstime) + output_coords.append(wcs_helio.all_world2pix(intermediate.Tx.to(u.deg).value, + intermediate.Ty.to(u.deg).value, + 2, + 0)) output_coords = np.array(output_coords) distances = np.linalg.norm(input_coords - output_coords, axis=1) @@ -253,8 +289,8 @@ def test_back_and_forth_wcs_from_celestial(): def test_back_and_forth_wcs_from_helio(): date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc') - wcs_helio = WCS({"CRVAL1": 0.0, - "CRVAL2": 0.0, + wcs_helio = WCS({"CRVAL1": 15.0, + "CRVAL2": 10.0, "CRPIX1": 2047.5, "CRPIX2": 2047.5, "CDELT1": 0.0225, diff --git a/punchbowl/data/wcs.py b/punchbowl/data/wcs.py index 815f4b63..d8d783fb 100644 --- a/punchbowl/data/wcs.py +++ b/punchbowl/data/wcs.py @@ -56,7 +56,7 @@ def calculate_helio_wcs_from_celestial(wcs_celestial: WCS, geocentric = GCRS(obstime=date_obs) p_angle = _sun_north_angle_to_z(geocentric) - rotation_angle = extract_crota_from_wcs(wcs_celestial).to(u.rad).value - get_p_angle(date_obs).rad + rotation_angle = extract_crota_from_wcs(wcs_celestial).to(u.rad).value + get_p_angle(date_obs).rad new_header = sunpy.map.make_fitswcs_header( data_shape[1:] if is_3d else data_shape, @@ -206,6 +206,71 @@ def hpc_to_gcrs(HPcoord, GCRSframe): # noqa: ANN201, N803, ANN001 # Put the computed coordinates into the output frame return GCRSframe.realize_frame(rep) +@astropy.coordinates.frame_transform_graph.transform( + astropy.coordinates.FunctionTransform, + astropy.coordinates.GCRS, + sunpy.coordinates.Helioprojective) +def gcrs_to_hpc(GCRScoord, Helioprojective): # noqa: ANN201, N803, ANN001 + """Convert GCRS to HPC.""" + if not _times_are_equal(GCRScoord.obstime, Helioprojective.obstime): + raise ValueError("Obstimes are not equal") # noqa: TRY003, EM101 + obstime = GCRScoord.obstime or Helioprojective.obstime + + # Compute the three angles we need + position = astropy.coordinates.get_sun(obstime) + ra, dec = position.ra.rad, position.dec.rad + p = get_p_angle(obstime).rad + + # Prepare rotation matrices for each + p_matrix = np.array([ + [1, 0, 0], + [0, np.cos(p), -np.sin(p)], + [0, np.sin(p), np.cos(p)], + ]) + + ra_matrix = np.array([ + [np.cos(ra), -np.sin(ra), 0], + [np.sin(ra), np.cos(ra), 0], + [0, 0, 1], + ]) + + dec_matrix = np.array([ + [np.cos(-dec), 0, np.sin(-dec)], + [0, 1, 0], + [-np.sin(-dec), 0, np.cos(-dec)], + ]) + + # Compose the matrices + old_matrix = ra_matrix @ dec_matrix @ p_matrix + matrix = np.linalg.inv(old_matrix) + + # Extract the input coordinates and negate the HP latitude, + # since it increases in the opposite direction from RA. + if isinstance(GCRScoord.data, astropy.coordinates.UnitSphericalRepresentation): + rep = astropy.coordinates.UnitSphericalRepresentation( + GCRScoord.ra, GCRScoord.dec) # , earth_distance(obstime)) + else: + rep = astropy.coordinates.SphericalRepresentation( + GCRScoord.ra, GCRScoord.dec, GCRScoord.distance) + + # Apply the transformation + rep = rep.to_cartesian() + rep = rep.transform(matrix) + + # Match the input representation. (If the input was UnitSpherical, meaning there's no + # distance coordinate, this drops the distance coordinate.) + rep = rep.represent_as(type(GCRScoord.data)) + + if isinstance(rep, astropy.coordinates.UnitSphericalRepresentation): + rep = astropy.coordinates.UnitSphericalRepresentation( + -rep.lon, rep.lat) # , earth_distance(obstime)) + else: + rep = astropy.coordinates.SphericalRepresentation( + -rep.lon, rep.lat, rep.distance) + + # Put the computed coordinates into the output frame + return Helioprojective.realize_frame(rep) + def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_shape: tuple[int, int]) -> WCS: """Calculate the celestial WCS from a helio WCS.""" @@ -215,8 +280,7 @@ def calculate_celestial_wcs_from_helio(wcs_helio: WCS, date_obs: datetime, data_ frame="helioprojective", observer="earth", obstime=date_obs) new_crval = old_crval.transform_to(GCRS) - rotation_angle = -(extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad) - rotation_angle = extract_crota_from_wcs(wcs_helio).to(u.rad).value + get_p_angle(date_obs).rad + rotation_angle = extract_crota_from_wcs(wcs_helio).to(u.rad).value - get_p_angle(date_obs).rad new_pc_matrix = calculate_pc_matrix(rotation_angle, wcs_helio.wcs.cdelt) cdelt1 = np.abs(wcs_helio.wcs.cdelt[0]) * u.deg