Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes wcs conversions #226

Merged
merged 10 commits into from
Aug 15, 2024
68 changes: 0 additions & 68 deletions punchbowl/data/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@
import os.path

import astropy.units as u
import astropy.wcs.wcsapi
import matplotlib as mpl
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.io.fits import Header
from astropy.nddata import StdDevUncertainty
from astropy.time import Time
from astropy.wcs import WCS
from ndcube import NDCube
from sunpy.coordinates import frames, sun
from sunpy.map import solar_angular_radius

from punchbowl.data.meta import NormalizedMetadata
from punchbowl.data.wcs import calculate_helio_wcs_from_celestial

_ROOT = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -49,67 +42,6 @@ def write_ndcube_to_fits(cube: NDCube,
)


def construct_wcs_header_fields(cube: NDCube) -> Header:
"""
Compute primary and secondary WCS header cards to add to a data object.

Returns
-------
Header

"""
date_obs = Time(cube.meta.datetime)

celestial_wcs_header = cube.wcs.to_header()
output_header = astropy.io.fits.Header()

unused_keys = [
"DATE-OBS",
"DATE-BEG",
"DATE-AVG",
"DATE-END",
"DATE",
"MJD-OBS",
"TELAPSE",
"RSUN_REF",
"TIMESYS",
]

helio_wcs, p_angle = calculate_helio_wcs_from_celestial(
wcs_celestial=cube.wcs, date_obs=date_obs, data_shape=cube.data.shape,
)

helio_wcs_hdul_reference = helio_wcs.to_fits()
helio_wcs_header = helio_wcs_hdul_reference[0].header

for key in unused_keys:
if key in celestial_wcs_header:
del celestial_wcs_header[key]
if key in helio_wcs_header:
del helio_wcs_header[key]

if cube.meta["CTYPE1"] is not None:
for key, value in helio_wcs.to_header().items():
output_header[key] = value
if cube.meta["CTYPE1A"] is not None:
for key, value in celestial_wcs_header.items():
output_header[key + "A"] = value

center_helio_coord = SkyCoord(
helio_wcs.wcs.crval[0] * u.deg,
helio_wcs.wcs.crval[1] * u.deg,
frame=frames.Helioprojective,
obstime=date_obs,
observer="earth",
)

output_header["RSUN_ARC"] = solar_angular_radius(center_helio_coord).value
output_header["SOLAR_EP"] = p_angle.value
output_header["CAR_ROT"] = float(sun.carrington_rotation_number(t=date_obs))

return output_header


def _write_fits(cube: NDCube, filename: str, overwrite: bool = True, uncertainty_quantize_level: float = -2.0) -> None:
_update_statistics(cube)

Expand Down
126 changes: 101 additions & 25 deletions punchbowl/data/tests/test_wcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,70 @@
from sunpy.coordinates import frames

from punchbowl.data.meta import NormalizedMetadata
from punchbowl.data.wcs import calculate_celestial_wcs_from_helio, calculate_helio_wcs_from_celestial, load_trefoil_wcs
from punchbowl.data.wcs import (
calculate_celestial_wcs_from_helio,
calculate_helio_wcs_from_celestial,
calculate_pc_matrix,
load_trefoil_wcs,
)

_ROOT = os.path.abspath(os.path.dirname(__file__))


def test_sun_location():
time_current = Time(datetime.utcnow())

skycoord_sun = astropy.coordinates.get_sun(Time(datetime.utcnow()))
skycoord_sun = astropy.coordinates.get_sun(time_current)

skycoord_origin = SkyCoord(0*u.deg, 0*u.deg,
frame=frames.Helioprojective,
obstime=time_current,
observer='earth')

with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
skycoord_origin_celestial = skycoord_origin.transform_to(GCRS)
# with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
skycoord_origin_celestial = skycoord_origin.transform_to(GCRS)

with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
assert skycoord_origin_celestial.separation(skycoord_sun) < 1 * u.arcsec
assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec
# with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
assert skycoord_origin_celestial.separation(skycoord_sun) < 1 * u.arcsec
assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec


def test_sun_location_gcrs():
time_current = Time(datetime.utcnow())

skycoord_sun = astropy.coordinates.get_sun(time_current)

skycoord_origin = SkyCoord(0*u.deg, 0*u.deg,
frame=frames.Helioprojective,
obstime=time_current,
observer='earth')

# with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
skycoord_origin_helio = skycoord_sun.transform_to(frames.Helioprojective)

# with frames.Helioprojective.assume_spherical_screen(skycoord_origin.observer):
assert skycoord_origin_helio.separation(skycoord_sun) < 1 * u.arcsec
assert skycoord_origin.separation(skycoord_sun) < 1 * u.arcsec

def test_extract_crota():
pc = calculate_pc_matrix(10*u.deg, [1, 1])
wcs_celestial = WCS({"CRVAL1": 0,
"CRVAL2": 0,
"CRPIX1": 2047.5,
"CRPIX2": 2047.5,
"CDELT1": -0.0225,
"CDELT2": 0.0225,
"CUNIT1": "deg",
"CUNIT2": "deg",
"CTYPE1": "RA---AZP",
"CTYPE2": "DEC--AZP",
"PC1_1": pc[0, 0],
"PC1_2": pc[1, 0],
"PC2_1": pc[0, 1],
"PC2_2": pc[1, 1]
})
assert extract_crota_from_wcs(wcs_celestial) == 10 * u.deg

def test_wcs_many_point_2d_check():
m = NormalizedMetadata.load_template("CTM", "2")
date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc')
Expand Down Expand Up @@ -67,7 +108,7 @@ def test_wcs_many_point_2d_check():
points_helio = wcs_helio.all_pix2world(input_coords, 0)

output_coords = []

intermediates = []
for c_pix, c_celestial, c_helio in zip(input_coords, points_celestial, points_helio):
skycoord_celestial = SkyCoord(c_celestial[0] * u.deg, c_celestial[1] * u.deg,
frame=GCRS,
Expand All @@ -79,10 +120,12 @@ def test_wcs_many_point_2d_check():
)

intermediate = skycoord_celestial.transform_to(frames.Helioprojective)
intermediates.append(intermediate)
output_coords.append(wcs_helio.all_world2pix(intermediate.data.lon.to(u.deg).value,
intermediate.data.lat.to(u.deg).value, 0))

output_coords = np.array(output_coords)
print(intermediates[0].Tx.deg, intermediates[0].Ty.deg, points_helio[0])
distances = np.linalg.norm(input_coords - output_coords, axis=1)
assert np.mean(distances) < 0.1

Expand Down Expand Up @@ -153,18 +196,33 @@ def test_load_trefoil_wcs():
def test_helio_celestial_wcs():
header = fits.Header.fromtextfile(os.path.join(_ROOT, "example_header.txt"))

date_obs = Time(header['DATE-OBS'])
# date_obs = Time("2021-06-20T00:00:00.000", format='isot', scale='utc')
#
# wcs_helio = WCS({"CRVAL1": 0.0,
# "CRVAL2": 0.0,
# "CRPIX1": 2047.5,
# "CRPIX2": 2047.5,
# "CDELT1": 0.0225,
# "CDELT2": 0.0225,
# "CUNIT1": "deg",
# "CUNIT2": "deg",
# "CTYPE1": "HPLN-ARC",
# "CTYPE2": "HPLT-ARC"})
wcs_helio = WCS(header)
wcs_celestial = WCS(header, key='A')
wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (3, 4096, 4096))

wcs_celestial2 = WCS(header, key='A')

date_obs = Time(header['DATE-OBS'], format='isot', scale='utc')
test_loc = EarthLocation.from_geocentric(0, 0, 0, unit=u.m)
test_gcrs = SkyCoord(test_loc.get_gcrs(date_obs))

npoints = 20
input_coords = np.stack([
np.linspace(0, 4096, npoints).astype(int),
np.linspace(0, 4096, npoints).astype(int),
np.ones(npoints, dtype=int),], axis=1)
np.ones(npoints, dtype=int),],
axis=1)

points_celestial = wcs_celestial.all_pix2world(input_coords, 0)
points_helio = wcs_helio.all_pix2world(input_coords, 0)
Expand All @@ -174,25 +232,32 @@ def test_helio_celestial_wcs():
skycoord_celestial = SkyCoord(c_celestial[0] * u.deg, c_celestial[1] * u.deg,
frame=GCRS,
obstime=date_obs,
observer=test_gcrs,
obsgeoloc=test_gcrs.cartesian,
obsgeovel=test_gcrs.velocity.to_cartesian(),
distance=test_gcrs.hcrs.distance
observer="earth",
# observer=test_gcrs,
# obsgeoloc=test_gcrs.cartesian,
# obsgeovel=test_gcrs.velocity.to_cartesian(),
# distance=test_gcrs.hcrs.distance
)

intermediate = skycoord_celestial.transform_to(frames.Helioprojective)
output_coords.append(wcs_helio.all_world2pix(intermediate.data.lon.to(u.deg).value,
intermediate.data.lat.to(u.deg).value, 2, 0))
intermediate = skycoord_celestial.transform_to(frames.Helioprojective(observer='earth', obstime=date_obs))
# final = SkyCoord(intermediate.Tx, intermediate.Ty, frame="helioprojective", observer=intermediate.observer, obstime=intermediate.obstime)
output_coords.append(wcs_helio.all_world2pix(intermediate.Tx.to(u.deg).value,
intermediate.Ty.to(u.deg).value,
2,
0))

output_coords = np.array(output_coords)
distances = np.linalg.norm(input_coords - output_coords, axis=1)

assert np.nanmean(distances) < 0.1

from punchbowl.data.wcs import extract_crota_from_wcs, get_p_angle


def test_back_and_forth_wcs_from_celestial():
date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc')
sun_radec = get_sun(date_obs)
pc = calculate_pc_matrix(0, [0.0225, 0.0225])
wcs_celestial = WCS({"CRVAL1": sun_radec.ra.to(u.deg).value,
"CRVAL2": sun_radec.dec.to(u.deg).value,
"CRPIX1": 2047.5,
Expand All @@ -201,11 +266,19 @@ def test_back_and_forth_wcs_from_celestial():
"CDELT2": 0.0225,
"CUNIT1": "deg",
"CUNIT2": "deg",
"CTYPE1": "RA---ARC",
"CTYPE2": "DEC--ARC"})

"CTYPE1": "RA---AZP",
"CTYPE2": "DEC--AZP",
"PC1_1": pc[0, 0],
"PC1_2": pc[1, 0],
"PC2_1": pc[0, 1],
"PC2_2": pc[1, 1]
})

print(extract_crota_from_wcs(wcs_celestial))
wcs_helio, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10))
print(extract_crota_from_wcs(wcs_helio))
wcs_celestial_recovered = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10))
print(extract_crota_from_wcs(wcs_celestial_recovered))

assert np.allclose(wcs_celestial.wcs.crval, wcs_celestial_recovered.wcs.crval)
assert np.allclose(wcs_celestial.wcs.crpix, wcs_celestial_recovered.wcs.crpix)
Expand All @@ -216,8 +289,8 @@ def test_back_and_forth_wcs_from_celestial():
def test_back_and_forth_wcs_from_helio():
date_obs = Time("2024-01-01T00:00:00", format='isot', scale='utc')

wcs_helio = WCS({"CRVAL1": 0.0,
"CRVAL2": 0.0,
wcs_helio = WCS({"CRVAL1": 15.0,
"CRVAL2": 10.0,
"CRPIX1": 2047.5,
"CRPIX2": 2047.5,
"CDELT1": 0.0225,
Expand All @@ -227,8 +300,11 @@ def test_back_and_forth_wcs_from_helio():
"CTYPE1": "HPLN-ARC",
"CTYPE2": "HPLT-ARC"})

wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio, date_obs, (10, 10))
wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial, date_obs, (10, 10))
print(extract_crota_from_wcs(wcs_helio))
wcs_celestial = calculate_celestial_wcs_from_helio(wcs_helio.copy(), date_obs, (10, 10))
print(extract_crota_from_wcs(wcs_celestial))
wcs_helio_recovered, p_angle = calculate_helio_wcs_from_celestial(wcs_celestial.copy(), date_obs, (10, 10))
print(extract_crota_from_wcs(wcs_helio_recovered))

assert np.allclose(wcs_helio.wcs.crval, wcs_helio_recovered.wcs.crval)
assert np.allclose(wcs_helio.wcs.crpix, wcs_helio_recovered.wcs.crpix)
Expand Down
Loading
Loading