Skip to content

Commit

Permalink
test: add tests of with_feeds and as_power_beam
Browse files Browse the repository at this point in the history
  • Loading branch information
steven-murray committed Nov 9, 2024
1 parent 2431d74 commit ec7c675
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 8 deletions.
62 changes: 55 additions & 7 deletions src/pyuvdata/beam_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import copy
import warnings
from dataclasses import InitVar, asdict, dataclass, replace
from itertools import product
from typing import Literal

import numpy as np
import numpy.typing as npt

from .analytic_beam import AnalyticBeam
from .utils import pol as upol
from .uvbeam import UVBeam

# Other methods we may want to include:
Expand Down Expand Up @@ -75,7 +77,7 @@ def __post_init__(self, include_cross_pols: bool):
if isinstance(self.beam, UVBeam):
if self.beam_type is None or self.beam_type == self.beam.beam_type:
self.beam_type = self.beam.beam_type
elif self.beam_type == "power":
elif self.beam_type == "power" and self.beam.beam_type != "power":
warnings.warn(
"Input beam is an efield UVBeam but beam_type is specified as "
"'power'. Converting efield beam to power."
Expand All @@ -88,11 +90,13 @@ def __post_init__(self, include_cross_pols: bool):
"efield beam, either provide an efield UVBeam or do not "
"specify `beam_type`."
)
elif self.beam_type is None:
self.beam_type = "efield"

@property
def Npols(self):
"""The number of polarizations in the beam."""
return self.beam.Npols
return self.beam.Npols or len(self.polarization_array)

@property
def polarization_array(self):
Expand All @@ -107,7 +111,7 @@ def feed_array(self):
@property
def Nfeeds(self):
"""The number of feeds defined on the beam."""
return self.beam.Nfeeds
return self.beam.Nfeeds or len(self.feed_array)

def clone(self, **kw):
"""Return a new instance with updated parameters."""
Expand All @@ -118,13 +122,20 @@ def as_power_beam(
):
"""Return a new interface instance that is in the power-beam mode.
If already in the power-beam mode, this is a no-op. Note that this might be
slighty unexpected, because the effect of `include_cross_pols` is not accounted
for in this case.
Parameters
----------
include_cross_pols : bool, optional
Whether to include cross-pols in the power beam.
allow_beam_mutation : bool, optional
Whether to allow the underlying beam to be updated in-place.
"""
if self.beam_type == "power":
return self

beam = self.beam if allow_beam_mutation else copy.deepcopy(self.beam)

# We cannot simply use .clone() here, because we need to be able to pass
Expand All @@ -133,13 +144,50 @@ def as_power_beam(
this["beam"] = beam
this["beam_type"] = "power"
this["include_cross_pols"] = include_cross_pols
return BeamInterface(**this)
with warnings.catch_warnings():
# Don't emit the warning that we're converting to power, because that is
# explicitly desired.
warnings.simplefilter("ignore", UserWarning)
return BeamInterface(**this)

def with_feeds(self, feeds, *, maintain_ordering: bool = True):
"""Return a new interface instance with updated feed_array.
def with_feeds(self, feeds):
"""Return a new interface instance with updated feed_array."""
Parameters
----------
feeds : array_like of str
The feeds to keep in the beam. Each value should be a string, e.g. 'n', 'x'.
maintain_ordering : bool, optional
If True, maintain the same polarization ordering as in the beam currently.
If False, change ordering to match the input feeds, which are turned into
pols (if a power beam) by using product(feeds, feeds).
"""
if not self._isuvbeam:
if maintain_ordering:
feeds = [fd for fd in self.feed_array if fd in feeds]
return self.clone(beam=self.beam.clone(feed_array=feeds))
new_beam = self.beam.select(feeds=feeds, inplace=False)
if self.beam_type == "power":
# Down-select polarizations based on the feeds input.
possible_pols = [f1 + f2 for f1, f2 in product(feeds, feeds)]
possible_pol_ints = upol.polstr2num(
possible_pols, x_orientation=self.beam.x_orientation
)
if maintain_ordering:
use_pols = [
p for p in self.beam.polarization_array if p in possible_pol_ints
]
print("use_pols: ", use_pols)
else:
use_pols = [
p for p in possible_pol_ints if p in self.beam.polarization_array
]

new_beam = self.beam.select(polarizations=use_pols, inplace=False)
else:
if maintain_ordering:
feeds = [fd for fd in self.feed_array if fd in feeds]

new_beam = self.beam.select(feeds=feeds, inplace=False)
return self.clone(beam=new_beam)

@property
Expand Down
41 changes: 40 additions & 1 deletion tests/test_beam_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def test_clone():
@pytest.mark.parametrize("uvbeam", [True, False], ids=["uvbeam", "analytic"])
@pytest.mark.parametrize("allow_mutation", [True, False], ids=["mutate", "nomutate"])
@pytest.mark.parametrize("include_cross_pols", [True, False], ids=["incx", "nox"])
def test_astype(uvbeam: bool, allow_mutation: bool, include_cross_pols: bool):
def test_as_power(uvbeam: bool, allow_mutation: bool, include_cross_pols: bool):
beam = AiryBeam(diameter=14.0)
if uvbeam:
beam = beam.to_uvbeam(freq_array=np.array([1e8]), nside=32)
Expand All @@ -300,12 +300,51 @@ def test_astype(uvbeam: bool, allow_mutation: bool, include_cross_pols: bool):
assert intf.beam.beam_type == "efield"


def test_as_power_noop():
"""Ensure that calling as_power_beam on a power beam is a no-op."""
beam = AiryBeam(diameter=14.0)
intf = BeamInterface(beam, beam_type="power")
intf2 = intf.as_power_beam()
assert intf is intf2


@pytest.mark.parametrize("uvbeam", [True, False])
def test_with_feeds(uvbeam: bool):
beam = AiryBeam(diameter=14.0)
if uvbeam:
beam = beam.to_uvbeam(freq_array=np.array([1e8]), nside=32)

intf = BeamInterface(beam)

intf_feedx = intf.with_feeds(["x"])
assert intf_feedx.feed_array == ["x"]


def test_with_feeds_ordering():
beam = AiryBeam(diameter=14.0)
intf = BeamInterface(beam)

intf_feedx = intf.with_feeds(["y", "x"], maintain_ordering=True)
assert np.all(intf_feedx.feed_array == ["x", "y"])

intf_feedyx = intf.with_feeds(["y", "x"], maintain_ordering=False)
assert np.all(intf_feedyx.feed_array == ["y", "x"])


@pytest.mark.filterwarnings("ignore:Input beam is an efield UVBeam")
@pytest.mark.filterwarnings("ignore:Selected polarizations are not evenly spaced")
def test_with_feeds_ordering_power():
beam = AiryBeam(diameter=14.0).to_uvbeam(freq_array=np.array([1e8]), nside=16)

intf = BeamInterface(beam, beam_type="power")
print(intf.polarization_array)
intf_feedx = intf.with_feeds(["y", "x"], maintain_ordering=True)
assert np.all(intf_feedx.polarization_array == [-5, -6, -7, -8])

intf_feedyx = intf.with_feeds(["y", "x"], maintain_ordering=False)
print(
utils.pol.polnum2str(
intf_feedyx.polarization_array, x_orientation=intf_feedyx.beam.x_orientation
)
)
assert np.all(intf_feedyx.polarization_array == [-6, -8, -7, -5])

0 comments on commit ec7c675

Please sign in to comment.