diff --git a/src/pyuvdata/uvbeam/uvbeam.py b/src/pyuvdata/uvbeam/uvbeam.py index ec9016d18..2c19fff02 100644 --- a/src/pyuvdata/uvbeam/uvbeam.py +++ b/src/pyuvdata/uvbeam/uvbeam.py @@ -61,6 +61,10 @@ class UVBeam(UVBase): "description": "scipy RectBivariate spline interpolation", "func": "_interp_az_za_rect_spline", }, + "az_za_map_coordinates": { + "description": "scipy map_coordinates interpolation", + "func": "_interp_az_za_map_coordinates", + }, "healpix_simple": { "description": "healpix nearest-neighbor bilinear interpolation", "func": "_interp_healpix_bilinear", @@ -1301,6 +1305,161 @@ def get_lambda(real_lut, imag_lut=None): return tuple(interp_arrays) + def _prepare_freq_interpolation( + self, freq_array, *, freq_interp_kind="cubic", freq_interp_tol=1.0 + ): + """ + Perform frequency interpolation of the beam. + + This helper function handles the frequency interpolation prior to performing + interpolation along the azimuth/zenith angle axes. + """ + if freq_array is not None: + assert isinstance(freq_array, np.ndarray) + interp_arrays = self._interp_freq( + freq_array, kind=freq_interp_kind, tol=freq_interp_tol + ) + if self.antenna_type == "phased_array": + (input_data_array, interp_bandpass, interp_coupling_matrix) = ( + interp_arrays + ) + else: + input_data_array, interp_bandpass = interp_arrays + interp_coupling_matrix = None + input_nfreqs = freq_array.size + else: + input_data_array = self.data_array + input_nfreqs = self.Nfreqs + freq_array = self.freq_array + interp_bandpass = self.bandpass_array[0] + if self.antenna_type == "phased_array": + interp_coupling_matrix = self.coupling_matrix + else: + interp_coupling_matrix = None + + return ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) + + def _check_interpolation_domain(self, az_array, za_array, phi_use, theta_use): + """Check if the interpolation domain is covered by the intrinsic data array.""" + max_axis_diff = max(np.diff(self.axis1_array)[0], np.diff(self.axis2_array)[0]) + za_sq_dist = np.full(len(za_array), np.inf) + az_sq_dist = np.full(len(az_array), np.inf) + if (len(theta_use) + len(phi_use)) > len(za_array): + # If there are fewer interpolation points than grid points, go + # through the grid points one-by-one to spot any outliers + for idx in range(az_array.size): + za_sq_dist[idx] = np.min((theta_use - za_array[idx]) ** 2.0) + az_sq_dist[idx] = np.min((phi_use - az_array[idx]) ** 2.0) + else: + # Otherwise, if we have lots of interpolation points, then it's faster + # to evaluate the grid steps one-by-one. + for theta_val in theta_use: + temp_arr = np.square(za_array - theta_val) + za_sq_dist = np.where(za_sq_dist > temp_arr, temp_arr, za_sq_dist) + + for phi_val in phi_use: + temp_arr = np.square(az_array - phi_val) + az_sq_dist = np.where(az_sq_dist > temp_arr, temp_arr, az_sq_dist) + + if np.any(np.sqrt(az_sq_dist + za_sq_dist) > (max_axis_diff * 2.0)): + raise ValueError( + "at least one interpolation location " + "is outside of the UVBeam pixel coverage." + ) + + def _prepare_coordinate_data(self, input_data_array): + """Prepare coordinate data for interpolation functions.""" + freq_axis = 2 + axis1_diff = np.diff(self.axis1_array)[0] + phi_length = np.abs(self.axis1_array[0] - self.axis1_array[-1]) + axis1_diff + phi_vals, theta_vals = self.axis1_array, self.axis2_array + + if np.isclose(phi_length, 2 * np.pi, atol=axis1_diff): + # phi wraps around, extend array in each direction to improve interpolation + extend_length = 3 + phi_use = np.concatenate( + ( + phi_vals[0] + (np.arange(-extend_length, 0) * axis1_diff), + phi_vals, + phi_vals[-1] + (np.arange(1, extend_length + 1) * axis1_diff), + ) + ) + + low_slice = input_data_array[..., :extend_length] + high_slice = input_data_array[..., -1 * extend_length :] + + data_use = np.concatenate( + (high_slice, input_data_array, low_slice), axis=freq_axis + 2 + ) + else: + phi_use = phi_vals + data_use = input_data_array + + return data_use, phi_use, theta_vals + + def _prepare_polarized_inputs(self, polarizations): + """Prepare inputs for polarized interpolation functions.""" + # Npols is only defined for power beams. For E-field beams need Nfeeds. + if self.beam_type == "power": + # get requested polarization indices + if polarizations is None: + Npol_feeds = self.Npols + pol_inds = np.arange(Npol_feeds) + else: + pols = [ + utils.polstr2num(p, x_orientation=self.x_orientation) + for p in polarizations + ] + pol_inds = [] + for pol in pols: + if pol not in self.polarization_array: + raise ValueError( + "Requested polarization {} not found " + "in self.polarization_array".format(pol) + ) + pol_inds.append(np.where(self.polarization_array == pol)[0][0]) + pol_inds = np.asarray(pol_inds) + Npol_feeds = len(pol_inds) + + else: + Npol_feeds = self.Nfeeds + pol_inds = np.arange(Npol_feeds) + + return Npol_feeds, pol_inds + + def _prepare_basis_vector_array(self, npoints): + """Prepare basis vector array for interpolation functions.""" + if self.basis_vector_array is not None: + if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( + self.basis_vector_array[1, 0, :] > 0 + ): + # Input basis vectors are not aligned to the native theta/phi + # coordinate system + raise NotImplementedError( + "interpolation for input basis " + "vectors that are not aligned to the " + "native theta/phi coordinate system " + "is not yet supported" + ) + else: + # The basis vector array comes in defined at the rectangular grid. + # Redefine it for the interpolation points + interp_basis_vector = np.zeros( + [self.Naxes_vec, self.Ncomponents_vec, npoints] + ) + interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat + interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat + else: + interp_basis_vector = None + + return interp_basis_vector + def _interp_az_za_rect_spline( self, *, @@ -1313,10 +1472,12 @@ def _interp_az_za_rect_spline( reuse_spline=False, spline_opts=None, check_azza_domain: bool = True, - spatial_interp_func="RectBivariateSpline", ): """ - Interpolate in az_za coordinate system with a simple spline. + Interpolate in az_za coordinate system using RectBivariateSpline. + + Uses the :func:`scipy.interpolate.RectBivariateSpline` function to perform + interpolation in the azimuth-zenith angle coordinate system. Parameters ---------- @@ -1341,10 +1502,6 @@ def _interp_az_za_rect_spline( check_azza_domain : bool Whether to check the domain of az/za to ensure that they are covered by the intrinsic data array. Checking them can be quite computationally expensive. - spatial_interp_func : str - The spatial interpolation function to use. Options are 'RectBivariateSpline' - and 'map_coordinates'. If 'map_coordinates' is chosen, the input data must - be on a regular grid. Returns ------- @@ -1368,118 +1525,57 @@ def _interp_az_za_rect_spline( "function" ) - if freq_array is not None: - assert isinstance(freq_array, np.ndarray) - interp_arrays = self._interp_freq( - freq_array, kind=freq_interp_kind, tol=freq_interp_tol - ) - if self.antenna_type == "phased_array": - (input_data_array, interp_bandpass, interp_coupling_matrix) = ( - interp_arrays - ) - else: - input_data_array, interp_bandpass = interp_arrays - input_nfreqs = freq_array.size - else: - input_data_array = self.data_array - input_nfreqs = self.Nfreqs - freq_array = self.freq_array - interp_bandpass = self.bandpass_array[0] - if self.antenna_type == "phased_array": - interp_coupling_matrix = self.coupling_matrix + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._prepare_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) + # If az_array and za_array are not provided, return the interpolated data if az_array is None or za_array is None: interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] if self.antenna_type == "phased_array": interp_arrays.append(interp_coupling_matrix) return tuple(interp_arrays) + freq_axis = 2 + + # Check input arrays assert isinstance(az_array, np.ndarray) assert isinstance(za_array, np.ndarray) assert az_array.ndim == 1 assert az_array.shape == za_array.shape - - if spatial_interp_func not in ["RectBivariateSpline", "map_coordinates"]: - raise ValueError( - "interpolator must be 'RectBivariateSpline' or" " 'map_coordinates'" - ) - - npoints = az_array.size - - axis1_diff = np.diff(self.axis1_array)[0] - axis2_diff = np.diff(self.axis2_array)[0] - max_axis_diff = np.max([axis1_diff, axis2_diff]) - - phi_length = np.abs(self.axis1_array[0] - self.axis1_array[-1]) + axis1_diff - - phi_vals, theta_vals = self.axis1_array, self.axis2_array - - freq_axis = 2 assert input_data_array.shape[freq_axis] == input_nfreqs + # Get the data type if np.iscomplexobj(input_data_array): data_type = np.complex128 else: data_type = np.float64 - if np.isclose(phi_length, 2 * np.pi, atol=axis1_diff): - # phi wraps around, extend array in each direction to improve interpolation - extend_length = 3 - phi_use = np.concatenate( - ( - phi_vals[0] + (np.arange(-extend_length, 0) * axis1_diff), - phi_vals, - phi_vals[-1] + (np.arange(1, extend_length + 1) * axis1_diff), - ) - ) + # Prepare the data for interpolation + data_use, phi_use, theta_use = self._prepare_coordinate_data(input_data_array) - low_slice = input_data_array[..., :extend_length] - high_slice = input_data_array[..., -1 * extend_length :] + # Prepate basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) - data_use = np.concatenate( - (high_slice, input_data_array, low_slice), axis=freq_axis + 2 - ) - else: - phi_use = phi_vals - data_use = input_data_array - - # This is now always true, keeping this here to keep naming convention the same - theta_use = theta_vals - - if spatial_interp_func == "map_coordinates": - axis1_diff = np.diff(phi_use) - axis2_diff = np.diff(theta_use) + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) - if not np.allclose(axis1_diff, axis1_diff[0]) or not np.allclose( - axis2_diff, axis2_diff[0] - ): - raise ValueError( - "axis1_array and axis2_array must be evenly spaced for " - "map_coordinates interpolation" - ) + # Check if the interpolation points are within the data array + if check_azza_domain: + self._check_interpolation_domain(az_array, za_array, phi_use, theta_use) - if self.basis_vector_array is not None: - if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( - self.basis_vector_array[1, 0, :] > 0 - ): - # Input basis vectors are not aligned to the native theta/phi - # coordinate system - raise NotImplementedError( - "interpolation for input basis " - "vectors that are not aligned to the " - "native theta/phi coordinate system " - "is not yet supported" - ) - else: - # The basis vector array comes in defined at the rectangular grid. - # Redefine it for the interpolation points - interp_basis_vector = np.zeros( - [self.Naxes_vec, self.Ncomponents_vec, npoints] - ) - interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat - interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat - else: - interp_basis_vector = None + interp_data = np.zeros( + (self.Naxes_vec, Npol_feeds, input_nfreqs, az_array.size), dtype=data_type + ) def get_lambda(real_lut, imag_lut=None, **kwargs): # Returns function objects for interpolation reuse @@ -1490,132 +1586,211 @@ def get_lambda(real_lut, imag_lut=None, **kwargs): real_lut(za, az, **kwargs) + 1j * imag_lut(za, az, **kwargs) ) - # Npols is only defined for power beams. For E-field beams need Nfeeds. - if self.beam_type == "power": - # get requested polarization indices - if polarizations is None: - Npol_feeds = self.Npols - pol_inds = np.arange(Npol_feeds) - else: - pols = [ - utils.polstr2num(p, x_orientation=self.x_orientation) - for p in polarizations - ] - pol_inds = [] - for pol in pols: - if pol not in self.polarization_array: - raise ValueError( - "Requested polarization {} not found " - "in self.polarization_array".format(pol) - ) - pol_inds.append(np.where(self.polarization_array == pol)[0][0]) - pol_inds = np.asarray(pol_inds) - Npol_feeds = len(pol_inds) - - else: - Npol_feeds = self.Nfeeds - pol_inds = np.arange(Npol_feeds) - - if check_azza_domain: - za_sq_dist = np.full(len(za_array), np.inf) - az_sq_dist = np.full(len(az_array), np.inf) - if (len(theta_use) + len(phi_use)) > len(za_array): - # If there are fewer interpolation points than grid points, go - # through the grid points one-by-one to spot any outliers - for idx in range(npoints): - za_sq_dist[idx] = np.min((theta_use - za_array[idx]) ** 2.0) - az_sq_dist[idx] = np.min((phi_use - az_array[idx]) ** 2.0) - else: - # Otherwise, if we have lots of interpolation points, then it's faster - # to evaluate the grid steps one-by-one. - for theta_val in theta_use: - temp_arr = np.square(za_array - theta_val) - za_sq_dist = np.where(za_sq_dist > temp_arr, temp_arr, za_sq_dist) - - for phi_val in phi_use: - temp_arr = np.square(az_array - phi_val) - az_sq_dist = np.where(az_sq_dist > temp_arr, temp_arr, az_sq_dist) - - if np.any(np.sqrt(az_sq_dist + za_sq_dist) > (max_axis_diff * 2.0)): - raise ValueError( - "at least one interpolation location " - "is outside of the UVBeam pixel coverage." - ) - - data_shape = (self.Naxes_vec, Npol_feeds, input_nfreqs, npoints) - interp_data = np.zeros(data_shape, dtype=data_type) - if spline_opts is None or not isinstance(spline_opts, dict): spline_opts = {} if reuse_spline and not hasattr(self, "saved_interp_functions"): int_dict = {} self.saved_interp_functions = int_dict - if spatial_interp_func == "map_coordinates": - _az_array = ( - (az_array - phi_use.min()) - / (phi_use.max() - phi_use.min()) - * (phi_use.size - 1) - ) - _za_array = ( - (za_array - theta_use.min()) - / (theta_use.max() - theta_use.min()) - * (theta_use.size - 1) - ) - for index3 in range(input_nfreqs): freq = freq_array[index3] for index0 in range(self.Naxes_vec): for pol_return_ind, index2 in enumerate(pol_inds): do_interp = True key = (freq, index2, index0) - if spatial_interp_func == "map_coordinates": - interp_data[index0, pol_return_ind, index3, :] = ( - ndimage.map_coordinates( - data_use[index0, index2, index3], - np.array([_za_array, _az_array]), + + if reuse_spline: + if key in self.saved_interp_functions.keys(): + do_interp = False + lut = self.saved_interp_functions[key] + + if do_interp: + data_inds = (index0, index2, index3) + + if np.iscomplexobj(data_use): + # interpolate real and imaginary parts separately + real_lut = interpolate.RectBivariateSpline( + theta_use, + phi_use, + data_use[data_inds].real, **spline_opts, ) - ) - else: + imag_lut = interpolate.RectBivariateSpline( + theta_use, + phi_use, + data_use[data_inds].imag, + **spline_opts, + ) + lut = get_lambda(real_lut, imag_lut, grid=False) + else: + lut = interpolate.RectBivariateSpline( + theta_use, phi_use, data_use[data_inds], **spline_opts + ) + lut = get_lambda(lut, grid=False) + if reuse_spline: - if key in self.saved_interp_functions.keys(): - do_interp = False - lut = self.saved_interp_functions[key] - - if do_interp: - data_inds = (index0, index2, index3) - - if np.iscomplexobj(data_use): - # interpolate real and imaginary parts separately - real_lut = interpolate.RectBivariateSpline( - theta_use, - phi_use, - data_use[data_inds].real, - **spline_opts, - ) - imag_lut = interpolate.RectBivariateSpline( - theta_use, - phi_use, - data_use[data_inds].imag, - **spline_opts, - ) - lut = get_lambda(real_lut, imag_lut, grid=False) - else: - lut = interpolate.RectBivariateSpline( - theta_use, - phi_use, - data_use[data_inds], - **spline_opts, - ) - lut = get_lambda(lut, grid=False) - - if reuse_spline: - self.saved_interp_functions[key] = lut - - interp_data[index0, pol_return_ind, index3, :] = lut( - za_array, az_array + self.saved_interp_functions[key] = lut + + interp_data[index0, pol_return_ind, index3, :] = lut( + za_array, az_array + ) + + interp_arrays = [interp_data, interp_basis_vector, interp_bandpass] + if self.antenna_type == "phased_array": + interp_arrays.append(interp_coupling_matrix) + return tuple(interp_arrays) + + def _interp_az_za_map_coordinates( + self, + *, + az_array, + za_array, + freq_array, + freq_interp_kind="cubic", + freq_interp_tol=1.0, + polarizations=None, + spline_opts=None, + check_azza_domain: bool = True, + reuse_spline: bool = False, + ): + """ + Interpolate in az_za coordinate system using RectBivariateSpline. + + Uses the :func:`scipy.ndimage.map_coordinates` function to perform + interpolation in the azimuth-zenith angle coordinate system. + + Parameters + ---------- + az_array : array_like of floats + Azimuth values to interpolate to in radians, specifying the azimuth + positions for every interpolation point (same length as `za_array`). + za_array : array_like of floats + Zenith values to interpolate to in radians, specifying the zenith + positions for every interpolation point (same length as `az_array`). + freq_array : array_like of floats + Frequency values to interpolate to. + freq_interp_kind : str + Interpolation method to use frequency. + See scipy.interpolate.interp1d for details. + polarizations : list of str + polarizations to interpolate if beam_type is 'power'. + Default is all polarizations in self.polarization_array. + reuse_spline : bool + Save the interpolation functions for reuse. + spline_opts : dict + Options (kx, ky, s) for numpy.RectBivariateSpline. + check_azza_domain : bool + Whether to check the domain of az/za to ensure that they are covered by the + intrinsic data array. Checking them can be quite computationally expensive. + + Returns + ------- + interp_data : array_like of float or complex + The array of interpolated data values, + shape: (Naxes_vec, Nfeeds or Npols, freq_array.size, az_array.size) + interp_basis_vector : array_like of float + The array of interpolated basis vectors, + shape: (Naxes_vec, Ncomponents_vec, az_array.size) + interp_bandpass : array_like of float + The interpolated bandpass. shape: (freq_array.size,) + interp_coupling_matrix : array_like of complex, optional + The interpolated coupling matrix. Only returned if antenna_type is + "phased_array". + shape: (Nelements, Nelements, Nfeeds, Nfeeds, freq_array.size) + + """ + if self.pixel_coordinate_system != "az_za": + raise ValueError( + "pixel_coordinate_system must be 'az_za' to use this interpolation " + "function" + ) + + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._prepare_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) + + if az_array is None or za_array is None: + interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] + if self.antenna_type == "phased_array": + interp_arrays.append(interp_coupling_matrix) + return tuple(interp_arrays) + + freq_axis = 2 + + # Check input arrays + assert isinstance(az_array, np.ndarray) + assert isinstance(za_array, np.ndarray) + assert az_array.ndim == 1 + assert az_array.shape == za_array.shape + assert input_data_array.shape[freq_axis] == input_nfreqs + + # Get the data type + if np.iscomplexobj(input_data_array): + data_type = np.complex128 + else: + data_type = np.float64 + + # Prepare the data for interpolation + data_use, phi_use, theta_use = self._prepare_coordinate_data(input_data_array) + + # Check if the interpolation points are evenly-spaced + axis1_diff = np.diff(phi_use) + axis2_diff = np.diff(theta_use) + if not np.allclose(axis1_diff, axis1_diff[0]) or not np.allclose( + axis2_diff, axis2_diff[0] + ): + raise ValueError( + "axis1_array and axis2_array must be evenly spaced for " + "map_coordinates interpolation" + ) + + # Prepare basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) + + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) + + # Check if the interpolation points are within the data array + if check_azza_domain: + self._check_interpolation_domain(az_array, za_array, phi_use, theta_use) + + interp_data = np.zeros( + (self.Naxes_vec, Npol_feeds, input_nfreqs, az_array.size), dtype=data_type + ) + + if spline_opts is None or not isinstance(spline_opts, dict): + spline_opts = {} + + _az_array = ( + (az_array - phi_use.min()) + / (phi_use.max() - phi_use.min()) + * (phi_use.size - 1) + ) + _za_array = ( + (za_array - theta_use.min()) + / (theta_use.max() - theta_use.min()) + * (theta_use.size - 1) + ) + + for index3 in range(input_nfreqs): + for index0 in range(self.Naxes_vec): + for pol_return_ind, index2 in enumerate(pol_inds): + interp_data[index0, pol_return_ind, index3, :] = ( + ndimage.map_coordinates( + data_use[index0, index2, index3], + np.array([_za_array, _az_array]), + **spline_opts, ) + ) interp_arrays = [interp_data, interp_basis_vector, interp_bandpass] if self.antenna_type == "phased_array": @@ -1693,25 +1868,18 @@ def _interp_healpix_bilinear( "simple healpix interpolation requires healpix pixels to be in order." ) - if freq_array is not None: - assert isinstance(freq_array, np.ndarray) - interp_arrays = self._interp_freq( - freq_array, kind=freq_interp_kind, tol=freq_interp_tol - ) - if self.antenna_type == "phased_array": - (input_data_array, interp_bandpass, interp_coupling_matrix) = ( - interp_arrays - ) - else: - input_data_array, interp_bandpass = interp_arrays - input_nfreqs = freq_array.size - else: - input_data_array = self.data_array - input_nfreqs = self.Nfreqs - freq_array = self.freq_array[0] - interp_bandpass = self.bandpass_array[0] - if self.antenna_type == "phased_array": - interp_coupling_matrix = self.coupling_matrix + # Perform initial frequency interpolation to get the data array + ( + input_data_array, + interp_bandpass, + interp_coupling_matrix, + input_nfreqs, + freq_array, + ) = self._prepare_freq_interpolation( + freq_array=freq_array, + freq_interp_kind=freq_interp_kind, + freq_interp_tol=freq_interp_tol, + ) if az_array is None or za_array is None: interp_arrays = [input_data_array, self.basis_vector_array, interp_bandpass] @@ -1719,68 +1887,26 @@ def _interp_healpix_bilinear( interp_arrays.append(interp_coupling_matrix) return tuple(interp_arrays) + # Check input arrays assert isinstance(az_array, np.ndarray) assert isinstance(za_array, np.ndarray) assert az_array.ndim == 1 assert az_array.shape == za_array.shape - npoints = az_array.size - - # Npols is only defined for power beams. For E-field beams need Nfeeds. - if self.beam_type == "power": - # get requested polarization indices - if polarizations is None: - Npol_feeds = self.Npols - pol_inds = np.arange(Npol_feeds) - else: - pols = [ - utils.polstr2num(p, x_orientation=self.x_orientation) - for p in polarizations - ] - pol_inds = [] - for pol in pols: - if pol not in self.polarization_array: - raise ValueError( - f"Requested polarization {pol} not found " - "in self.polarization_array" - ) - pol_inds.append(np.where(self.polarization_array == pol)[0][0]) - pol_inds = np.asarray(pol_inds) - Npol_feeds = len(pol_inds) - else: - Npol_feeds = self.Nfeeds - pol_inds = np.arange(Npol_feeds) + # Get number of polarizations and indices + Npol_feeds, pol_inds = self._prepare_polarized_inputs(polarizations) if np.iscomplexobj(input_data_array): data_type = np.complex128 else: data_type = np.float64 + interp_data = np.zeros( (self.Naxes_vec, Npol_feeds, input_nfreqs, len(az_array)), dtype=data_type ) - if self.basis_vector_array is not None: - if np.any(self.basis_vector_array[0, 1, :] > 0) or np.any( - self.basis_vector_array[1, 0, :] > 0 - ): - """Input basis vectors are not aligned to the native theta/phi - coordinate system""" - raise NotImplementedError( - "interpolation for input basis " - "vectors that are not aligned to the " - "native theta/phi coordinate system " - "is not yet supported" - ) - else: - """The basis vector array comes in defined at the rectangular grid. - Redefine it for the interpolation points""" - interp_basis_vector = np.zeros( - [self.Naxes_vec, self.Ncomponents_vec, npoints] - ) - interp_basis_vector[0, 0, :] = np.ones(npoints) # theta hat - interp_basis_vector[1, 1, :] = np.ones(npoints) # phi hat - else: - interp_basis_vector = None + # Prepare basis functions + interp_basis_vector = self._prepare_basis_vector_array(az_array.size) hp_obj = HEALPix(nside=self.nside, order=self.ordering) lat_array = Angle(np.pi / 2, units.radian) - Angle(za_array, units.radian) @@ -1834,7 +1960,6 @@ def interp( check_extra=True, run_check_acceptability=True, check_azza_domain: bool = True, - spatial_interp_func="RectBivariateSpline", ): """ Interpolate beam to given frequency, az & za locations or Healpix pixel centers. @@ -1846,6 +1971,8 @@ def interp( - "az_za_simple": Uses scipy RectBivariate spline interpolation, can only be used on objects with an "az_za" pixel_coordinate_system. + - "az_za_map_coordinates": Uses scipy map_coordinates interpolation, can only + be used on objects with an "az_za" pixel_coordinate_system. - "healpix_simple": Uses HEALPix nearest-neighbor bilinear interpolation, can only be used on objects with a "healpix" pixel_coordinate_system. @@ -1862,7 +1989,8 @@ def interp( interpolation_function : str, optional Specify the interpolation function to use. Defaults to: "az_za_simple" for objects with the "az_za" pixel_coordinate_system and "healpix_simple" for - objects with the "healpix" pixel_coordinate_system. + objects with the "healpix" pixel_coordinate_system. "az_za_map_coordinates" + is also available for objects with the "az_za" pixel_coordinate_system. freq_interp_kind : str Interpolation method to use frequency. See scipy.interpolate.interp1d for details. Defaults to "cubic" (Note that this is a change. It used to @@ -1898,7 +2026,7 @@ def interp( spline_opts : dict Provide options to numpy.RectBivariateSpline. This includes spline order parameters `kx` and `ky`, and smoothing parameter `s`. - Only applies for `az_za_simple` interpolation. + Applies for `az_za_simple` and `az_za_map_coordinates` interpolation. run_check : bool Only used if new_object is True. Option to check for the existence and proper shapes of required parameters on the new object. @@ -1913,11 +2041,6 @@ def interp( intrinsic data array. Checking them can be quite computationally expensive. Conversely, if the passed az/za are outside of the domain, they will be silently extrapolated and the behavior is not well-defined. - spatial_interp_func : str - The spatial interpolation function to use. Options are 'RectBivariateSpline' - and 'map_coordinates'. If 'map_coordinates' is choosen, input beam must be - evenly spaced in theta and phi. Only applies for `az_za_simple` - interpolation. Returns ------- @@ -2032,7 +2155,6 @@ def interp( extra_keyword_dict["reuse_spline"] = reuse_spline extra_keyword_dict["spline_opts"] = spline_opts extra_keyword_dict["check_azza_domain"] = check_azza_domain - extra_keyword_dict["spatial_interp_func"] = spatial_interp_func interp_arrays = getattr(self, interp_func)( az_array=az_array_use, @@ -2193,6 +2315,8 @@ def to_healpix( - "az_za_simple": Uses scipy RectBivariate spline interpolation, can only be used on objects with an "az_za" pixel_coordinate_system. + - "az_za_map_coordinates": Uses scipy map_coordinates interpolation, can only + be used on objects with an "az_za" pixel_coordinate Parameters ---------- @@ -2201,8 +2325,10 @@ def to_healpix( the nside that gives the closest resolution that is higher than the input resolution. interpolation_function : str, optional - Specify the interpolation function to use. Defaults to to: "az_za_simple" - for objects with the "az_za" pixel_coordinate_system. + Specify the interpolation function to use. Defaults to to: + "az_za_simple" for objects with the "az_za" pixel_coordinate_system. + "az_za_map_coordinates" is also available for objects with the "az_za" + pixel_coordinate_system. run_check : bool Option to check for the existence and proper shapes of required parameters after converting to healpix. diff --git a/tests/uvbeam/test_uvbeam.py b/tests/uvbeam/test_uvbeam.py index e84e6f4bd..f4c6c211a 100644 --- a/tests/uvbeam/test_uvbeam.py +++ b/tests/uvbeam/test_uvbeam.py @@ -1073,7 +1073,7 @@ def test_spatial_interpolation_everyother( za_array=za_interp_vals, freq_array=freq_interp_vals, freq_interp_kind="linear", - spatial_interp_func="map_coordinates", + interpolation_function="az_za_map_coordinates", spline_opts={"order": 1}, ) assert np.allclose(linear_data_array, mp_data_array) @@ -1170,17 +1170,6 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): az_array=az_interp_vals, za_array=za_interp_vals, return_coupling=True ) - # test error returning coupling matrix for simple antenna_types - with pytest.raises( - ValueError, - match="interpolator must be 'RectBivariateSpline' or 'map_coordinates'", - ): - uvbeam.interp( - az_array=az_interp_vals, - za_array=za_interp_vals, - spatial_interp_func="NotSupportedInterpolator", - ) - # Check that error is raised if the input beam is not on a regular grid # and spatial_interp_func="map_coordinates" is used az_array_uneven = np.array([0.1, 0.5, 0.56, 0.9]) @@ -1202,7 +1191,7 @@ def test_spatial_interpolation_errors(cst_power_2freq_cut): new_beam.interp( az_array=az_interp_vals, za_array=za_interp_vals, - spatial_interp_func="map_coordinates", + interpolation_function="az_za_map_coordinates", )