diff --git a/src/pyxu/operator/ufunc.py b/src/pyxu/operator/ufunc.py index a3a43af6f..88fa29047 100644 --- a/src/pyxu/operator/ufunc.py +++ b/src/pyxu/operator/ufunc.py @@ -1,5 +1,3 @@ -import types - import numpy as np import scipy.integrate as spi @@ -1034,11 +1032,23 @@ class FSSPulse(pxa.Map): Finite-Support Symmetric function :math:`f: \mathbb{R} \to \mathbb{R}`, element-wise. """ - def __init__(self, dim_shape: pxt.NDArrayShape): + def __init__( + self, + dim_shape: pxt.NDArrayShape, + support: pxt.Real, + ): + r""" + Parameters + ---------- + support: Real + Value :math:`s > 0` such that :math:`f(x) = 0, \; \forall |x| > s`. + """ super().__init__( dim_shape=dim_shape, codim_shape=dim_shape, ) + self._support = float(support) + assert self._support > 0 def support(self) -> pxt.Real: r""" @@ -1047,8 +1057,30 @@ def support(self) -> pxt.Real: s: Real Value :math:`s > 0` such that :math:`f(x) = 0, \; \forall |x| > s`. """ - raise NotImplementedError + return self._support + + @pxrt.enforce_precision(i="arr") + def apply(self, arr: pxt.NDArray) -> pxt.NDArray: + ndi = pxd.NDArrayInfo.from_obj(arr) + if ndi != pxd.NDArrayInfo.DASK: + out = self._apply(arr) + else: # DASK inputs + + def blockwise_apply(arr: pxt.NDArray, cls: pxt.OpC, **kwargs) -> pxt.NDArray: + return cls(**kwargs)._apply(arr) + + cls, kwargs = self._init_args() + out = arr.map_blocks( + func=blockwise_apply, + dtype=arr.dtype, + meta=arr._meta, + # -- Extra blockise_apply() args -- + cls=cls, + **kwargs, + ) + return out + @pxrt.enforce_precision(i="arr") def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: r""" Evaluate :math:`f^{\mathcal{F}}(v)`. @@ -1061,7 +1093,24 @@ def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: \mathcal{F}(f)(v) = \int f(x) e^{-j 2\pi v x} dx """ - raise NotImplementedError + ndi = pxd.NDArrayInfo.from_obj(arr) + if ndi != pxd.NDArrayInfo.DASK: + out = self._applyF(arr) + else: # DASK inputs + + def blockwise_applyF(arr: pxt.NDArray, cls: pxt.OpC, **kwargs) -> pxt.NDArray: + return cls(**kwargs)._applyF(arr) + + cls, kwargs = self._init_args() + out = arr.map_blocks( + func=blockwise_applyF, + dtype=arr.dtype, + meta=arr._meta, + # -- Extra blockwise_applyF() args -- + cls=cls, + **kwargs, + ) + return out def supportF(self, eps: pxt.Real) -> pxt.Real: r""" @@ -1094,7 +1143,7 @@ def energy(f: callable, a: float, b: float) -> float: sF = np.inf else: s = self.support() - E_tot = energy(self.apply, -s, s) + E_tot = energy(self._apply, -s, s) # Coarse-grain search for a max bandwidth in v_step increments. tolerance_reached = False @@ -1102,72 +1151,46 @@ def energy(f: callable, a: float, b: float) -> float: v_max = 0 while not tolerance_reached: v_max += v_step - E = energy(self.applyF, -v_max, v_max) + E = energy(self._applyF, -v_max, v_max) tolerance_reached = E >= tol * E_tot # Fine-grained search for a max bandwidth in [v_max - v_step, v_max] region. v_fine = np.linspace(v_max - v_step, v_max, 100) - E = np.array([energy(self.applyF, -v, v) for v in v_fine]) + E = np.array([energy(self._applyF, -v, v) for v in v_fine]) sF = v_fine[E >= tol * E_tot].min() return sF def argscale(self, scalar: pxt.Real) -> pxt.OpT: - # Redefined to propagate (support[F], applyF,) scalar = float(scalar) assert scalar > 0 - @pxrt.enforce_precision(i="arr") - def g_apply(_, arr: pxt.NDArray) -> pxt.NDArray: - # :math:`g(x) = f(\alpha x)` - op, cst = _._op, _._cst - return op.apply(cst * arr) - - def g_support(_) -> pxt.Real: - op, cst = _._op, _._cst - return op.support() / cst - - @pxrt.enforce_precision(i="arr") - def g_applyF(_, arr: pxt.NDArray) -> pxt.NDArray: - # :math:`g^{F}(v) = f^{F}(v / \alpha) / \alpha` - op, cst = _._op, _._cst - return op.applyF(arr / cst) / cst - - def g_supportF(_, eps: pxt.Real) -> pxt.Real: - op, cst = _._op, _._cst - return op.supportF(eps) * cst - - def g_expr(_) -> tuple: - return ("argscale", _._op, _._cst) - - g = FSSPulse(dim_shape=self.dim_shape) - g._cst = scalar # scale factor - g._op = self # initial pulse - - g.apply = types.MethodType(g_apply, g) - g.support = types.MethodType(g_support, g) - g.applyF = types.MethodType(g_applyF, g) - g.supportF = types.MethodType(g_supportF, g) - g._expr = types.MethodType(g_expr, g) - return g - - def _blockwize_apply(self, arr: pxt.NDArray) -> pxt.NDArray: - # self.apply, applied blockwize on DASK inputs. - out = arr.map_blocks( - func=self.apply, - dtype=arr.dtype, - meta=arr._meta, + cls, kwargs = self._init_args() + kwargs["support"] = kwargs["support"] / scalar + return cls(**kwargs) + + # Internal Helpers -------------------------------------------------------- + def _init_args(self) -> tuple[pxt.OpC, dict]: + # Override in sub-classes with (class-to-instantiate, init_kwargs) + cls = self.__class__ + kwargs = dict( + dim_shape=self.dim_shape, + support=self.support(), ) - return out + return (cls, kwargs) - def _blockwize_applyF(self, arr: pxt.NDArray) -> pxt.NDArray: - # self.applyF, applied blockwize on DASK inputs. - out = arr.map_blocks( - func=self.applyF, - dtype=arr.dtype, - meta=arr._meta, - ) - return out + def _apply(self, arr: pxt.NDArray) -> pxt.NDArray: + # Override in sub-classes with apply() definition for NumPy/CuPy backends. + raise NotImplementedError + + def _applyF(self, arr: pxt.NDArray) -> pxt.NDArray: + # Override in sub-classes with applyF() definition for NumPy/CuPy backends. + raise NotImplementedError + + def __repr__(self) -> str: + klass = self.__class__.__name__ + support = self.support() + return f"{klass}(support={support})" class Dirac(FSSPulse): @@ -1180,28 +1203,29 @@ class Dirac(FSSPulse): * :math:`f^{\mathcal{F}}(v) = 1` """ - def __init__(self, dim_shape: pxt.NDArrayShape): - super().__init__(dim_shape=dim_shape) + def __init__( + self, + dim_shape: pxt.NDArrayShape, + support: pxt.Real = 1e-6, # small value approximating 0 + ): + super().__init__( + dim_shape=dim_shape, + support=support, + ) - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_apply) - def apply(self, arr: pxt.NDArray) -> pxt.NDArray: + def supportF(self, eps: pxt.Real) -> pxt.Real: + return np.inf + + # Internal Helpers -------------------------------------------------------- + def _apply(self, arr: pxt.NDArray) -> pxt.NDArray: xp, _ = _get_module(arr) y = xp.zeros_like(arr) y[xp.isclose(arr, 0)] = 1 return y - def support(self) -> pxt.Real: - return 1e-6 # small value approximating 0 - - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_applyF) - def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: + def _applyF(self, arr: pxt.NDArray) -> pxt.NDArray: return np.ones_like(arr) - def supportF(self, eps: pxt.Real) -> pxt.Real: - return np.inf - class Box(FSSPulse): r""" @@ -1209,29 +1233,31 @@ class Box(FSSPulse): Notes ----- - * :math:`f(x) = 1_{[-1, 1]}(x)` - * :math:`f^{\mathcal{F}}(v) = 2 \; \text{sinc}(2 v)` + * :math:`f(x) = 1_{[-s, s]}(x)` + * :math:`f^{\mathcal{F}}(v) = 2s \; \text{sinc}(2s v)` """ - def __init__(self, dim_shape: pxt.NDArrayShape): - super().__init__(dim_shape=dim_shape) + def __init__( + self, + dim_shape: pxt.NDArrayShape, + support: pxt.Real = 1, + ): + super().__init__( + dim_shape=dim_shape, + support=support, + ) - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_apply) - def apply(self, arr: pxt.NDArray) -> pxt.NDArray: + # Internal Helpers -------------------------------------------------------- + def _apply(self, arr: pxt.NDArray) -> pxt.NDArray: xp, _ = _get_module(arr) y = xp.zeros_like(arr) - y[xp.fabs(arr) <= 1] = 1 + y[xp.fabs(arr) <= self.support()] = 1 return y - def support(self) -> pxt.Real: - return 1.0 - - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_applyF) - def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: + def _applyF(self, arr: pxt.NDArray) -> pxt.NDArray: xp, _ = _get_module(arr) - y = 2 * xp.sinc(2 * arr) + scale = 2 * self.support() + y = scale * xp.sinc(scale * arr) return y @@ -1241,29 +1267,33 @@ class Triangle(FSSPulse): Notes ----- - * :math:`f(x) = (1 - |x|) 1_{[-1, 1]}(x)` - * :math:`f^{\mathcal{F}}(v) = \text{sinc}^{2}(v)` + * :math:`f(x) = (1 - |x/s|) 1_{[-s, s]}(x)` + * :math:`f^{\mathcal{F}}(v) = s \text{sinc}^{2}(s v)` """ - def __init__(self, dim_shape: pxt.NDArrayShape): - super().__init__(dim_shape=dim_shape) + def __init__( + self, + dim_shape: pxt.NDArrayShape, + support: pxt.Real = 1, + ): + super().__init__( + dim_shape=dim_shape, + support=support, + ) - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_apply) - def apply(self, arr: pxt.NDArray) -> pxt.NDArray: + # Internal Helpers -------------------------------------------------------- + def _apply(self, arr: pxt.NDArray) -> pxt.NDArray: xp, _ = _get_module(arr) - y = xp.clip(1 - xp.fabs(arr), 0, None) + arr = xp.fabs(arr) + arr /= self.support() + y = xp.clip(1 - arr, 0, None) return y - def support(self) -> pxt.Real: - return 1.0 - - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_applyF) - def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: + def _applyF(self, arr: pxt.NDArray) -> pxt.NDArray: xp, _ = _get_module(arr) - y = xp.sinc(arr) + y = xp.sinc(self.support() * arr) y **= 2 + y *= self.support() return y @@ -1320,58 +1350,68 @@ class KaiserBessel(FSSPulse): Notes ----- - * :math:`f(x) = \frac{I_{0}(\beta \sqrt{1 - x^{2}})}{I_{0}(\beta)} - 1_{[-1, 1]}(x)` + * :math:`f(x) = \frac{I_{0}(\beta \sqrt{1 - (x/s)^{2}})}{I_{0}(\beta)} + 1_{[-s, s]}(x)` * :math:`f^{\mathcal{F}}(v) = - \frac{2}{I_{0}(\beta)} + \frac{2 s}{I_{0}(\beta)} \frac - {\sinh\left[\sqrt{\beta^{2} - (2 \pi v)^{2}} \right]} - {\sqrt{\beta^{2} - (2 \pi v)^{2}}}` + {\sinh\left[\sqrt{\beta^{2} - (2 \pi s v)^{2}} \right]} + {\sqrt{\beta^{2} - (2 \pi s v)^{2}}}` """ - def __init__(self, dim_shape: pxt.NDArrayShape, beta: pxt.Real): - super().__init__(dim_shape=dim_shape) + def __init__( + self, + dim_shape: pxt.NDArrayShape, + beta: pxt.Real, + support: pxt.Real = 1, + ): + super().__init__( + dim_shape=dim_shape, + support=support, + ) self._beta = float(beta) - assert self._beta >= 0 + assert self._beta > 0 - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_apply) - def apply(self, arr: pxt.NDArray) -> pxt.NDArray: + def supportF(self, eps: pxt.Real) -> pxt.Real: + if np.isclose(eps, 0): + # use cut-off frequency: corresponds roughly to eps=1e-10 + sF = self._beta / (2 * np.pi * self.support()) + else: + sF = super().supportF(eps) + return sF + + # Internal Helpers -------------------------------------------------------- + def _init_args(self) -> tuple[pxt.OpC, dict]: + cls = self.__class__ + kwargs = dict( + dim_shape=self.dim_shape, + beta=self._beta, + support=self.support(), + ) + return (cls, kwargs) + + def _apply(self, arr: pxt.NDArray) -> pxt.NDArray: xp, sps = _get_module(arr) y = xp.zeros_like(arr) - mask = xp.fabs(arr) <= 1 - y[mask] = sps.i0(self._beta * xp.sqrt(1 - (arr[mask] ** 2))) + + mask = xp.fabs(arr) <= self.support() + x = arr[mask] ** 2 + x /= self.support() ** 2 + y[mask] = sps.i0(self._beta * xp.sqrt(1 - x)) + y /= sps.i0(self._beta) return y - def support(self) -> pxt.Real: - return 1.0 - - @pxrt.enforce_precision(i="arr") - @pxu.redirect(i="arr", DASK=FSSPulse._blockwize_applyF) - def applyF(self, arr: pxt.NDArray) -> pxt.NDArray: - if np.isclose(self._beta, 0): - y = Box().applyF(arr) - else: - xp, sps = _get_module(arr) + def _applyF(self, arr: pxt.NDArray) -> pxt.NDArray: + xp, sps = _get_module(arr) - a = self._beta**2 - (2 * np.pi * arr) ** 2 - mask = a > 0 - a = xp.sqrt(xp.fabs(a)) + a = self._beta**2 - (2 * np.pi * self.support() * arr) ** 2 + mask = a > 0 + a = xp.sqrt(xp.fabs(a)) - y = xp.zeros_like(arr) - y[mask] = xp.sinh(a[mask]) / a[mask] - y[~mask] = xp.sinc(a[~mask] / np.pi) + y = xp.zeros_like(arr) + y[mask] = xp.sinh(a[mask]) / a[mask] + y[~mask] = xp.sinc(a[~mask] / np.pi) - y *= 2 / sps.i0(self._beta) + y *= (2 * self.support()) / sps.i0(self._beta) return y - - def supportF(self, eps: pxt.Real) -> pxt.Real: - if np.isclose(self._beta, 0): - sF = Box().supportF(eps) - elif np.isclose(eps, 0): - # use cut-off frequency: corresponds roughly to eps=1e-10 - sF = self._beta / (2 * np.pi) - else: - sF = super().supportF(eps) - return sF