Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sums of distributions #425

Merged
merged 7 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 254 additions & 1 deletion src/sdr/_probability.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy.typing as npt
import scipy.special

from ._helper import convert_output, export, verify_arraylike
from ._helper import convert_output, export, verify_arraylike, verify_scalar


@export
Expand Down Expand Up @@ -75,3 +75,256 @@ def Qinv(p: npt.ArrayLike) -> npt.NDArray[np.float64]:
x = np.sqrt(2) * scipy.special.erfcinv(2 * p)

return convert_output(x)


@export
def sum_distribution(
X: scipy.stats.rv_continuous | scipy.stats.rv_histogram,
n_terms: int,
p: float = 1e-16,
) -> scipy.stats.rv_histogram:
r"""
Numerically calculates the distribution of the sum of $n$ i.i.d. random variables $X_i$.

Arguments:
X: The distribution of the i.i.d. random variables $X_i$.
n_terms: The number $n$ of random variables to sum.
p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine
the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate
analysis, but will require more computation.

Returns:
The distribution of the sum $Z = X_1 + X_2 + \ldots + X_n$.

Notes:
The PDF of the sum of $n$ independent random variables is the convolution of the PDF of the base distribution.

$$f_{X_1 + X_2 + \ldots + X_n}(t) = (f_X * f_X * \ldots * f_X)(t)$$

Examples:
Compute the distribution of the sum of two normal distributions.

.. ipython:: python

X = scipy.stats.norm(loc=-1, scale=0.5)
n_terms = 2
x = np.linspace(-6, 2, 1_001)

@savefig sdr_sum_distribution_1.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X"); \
plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of two Normal distributions");

Compute the distribution of the sum of three Rayleigh distributions.

.. ipython:: python

X = scipy.stats.rayleigh(scale=1)
n_terms = 3
x = np.linspace(0, 10, 1_001)

@savefig sdr_sum_distribution_2.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X + X"); \
plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X + X empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of three Rayleigh distributions");

Compute the distribution of the sum of four Rician distributions.

.. ipython:: python

X = scipy.stats.rice(2)
n_terms = 4
x = np.linspace(0, 18, 1_001)

@savefig sdr_sum_distribution_3.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, sdr.sum_distribution(X, n_terms).pdf(x), label="X + X + X + X"); \
plt.hist(X.rvs((100_000, n_terms)).sum(axis=1), bins=101, density=True, histtype="step", label="X + X + X + X empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of four Rician distributions");

Group:
probability
"""
verify_scalar(n_terms, int=True, positive=True)
verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1)

if n_terms == 1:
return X

# Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side,
# is p.
x1_min, x1_max = _x_range(X, p)
x = np.linspace(x1_min, x1_max, 1_001)
dx = np.mean(np.diff(x))

# Compute the PDF of the base distribution
f_X = X.pdf(x)

# The PDF of the sum of n_terms independent random variables is the convolution of the PDF of the base distribution.
# This is efficiently computed in the frequency domain by exponentiating the FFT. The FFT must be zero-padded
# enough that the circular convolutions do not wrap around.
n_fft = scipy.fft.next_fast_len(f_X.size * n_terms)
f_X_fft = np.fft.fft(f_X, n_fft)
f_X_fft = f_X_fft**n_terms
f_Y = np.fft.ifft(f_X_fft).real
f_Y /= f_Y.sum() * dx
x = np.arange(f_Y.size) * dx + x[0] * (n_terms)

# Adjust the histograms bins to be on either side of each point. So there is one extra point added.
x = np.append(x, x[-1] + dx)
x -= dx / 2

return scipy.stats.rv_histogram((f_Y, x))


@export
def sum_distributions(
X: scipy.stats.rv_continuous | scipy.stats.rv_histogram,
Y: scipy.stats.rv_continuous | scipy.stats.rv_histogram,
p: float = 1e-16,
) -> scipy.stats.rv_histogram:
r"""
Numerically calculates the distribution of the sum of two independent random variables $X$ and $Y$.

Arguments:
X: The distribution of the first random variable $X$.
Y: The distribution of the second random variable $Y$.
p: The probability of exceeding the x axis, on either side, for each distribution. This is used to determine
the bounds on the x axis for the numerical convolution. Smaller values of $p$ will result in more accurate
analysis, but will require more computation.

Returns:
The distribution of the sum $Z = X + Y$.

Notes:
The PDF of the sum of two independent random variables is the convolution of the PDF of the two distributions.

$$f_{X+Y}(t) = (f_X * f_Y)(t)$$

Examples:
Compute the distribution of the sum of two normal distributions.

.. ipython:: python

X = scipy.stats.norm(loc=-1, scale=0.5)
Y = scipy.stats.norm(loc=2, scale=1.5)
x = np.linspace(-5, 10, 1_001)

@savefig sdr_sum_distributions_1.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, Y.pdf(x), label="Y"); \
plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \
plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of two Normal distributions");

Compute the distribution of the sum of two Rayleigh distributions.

.. ipython:: python

X = scipy.stats.rayleigh(scale=1)
Y = scipy.stats.rayleigh(loc=1, scale=2)
x = np.linspace(0, 12, 1_001)

@savefig sdr_sum_distributions_2.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, Y.pdf(x), label="Y"); \
plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \
plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of two Rayleigh distributions");

Compute the distribution of the sum of two Rician distributions.

.. ipython:: python

X = scipy.stats.rice(2)
Y = scipy.stats.rice(3)
x = np.linspace(0, 12, 1_001)

@savefig sdr_sum_distributions_3.png
plt.figure(); \
plt.plot(x, X.pdf(x), label="X"); \
plt.plot(x, Y.pdf(x), label="Y"); \
plt.plot(x, sdr.sum_distributions(X, Y).pdf(x), label="X + Y"); \
plt.hist(X.rvs(100_000) + Y.rvs(100_000), bins=101, density=True, histtype="step", label="X + Y empirical"); \
plt.legend(); \
plt.xlabel("Random variable"); \
plt.ylabel("Probability density"); \
plt.title("Sum of two Rician distributions");

Group:
probability
"""
verify_scalar(p, float=True, exclusive_min=0, inclusive_max=0.1)

# Determine the x axis of each distribution such that the probability of exceeding the x axis, on either side,
# is p.
x1_min, x1_max = _x_range(X, p)
x2_min, x2_max = _x_range(Y, p)
dx1 = (x1_max - x1_min) / 1_000
dx2 = (x2_max - x2_min) / 1_000
dx = np.min([dx1, dx2]) # Use the smaller delta x -- must use the same dx for both distributions
x1 = np.arange(x1_min, x1_max, dx)
x2 = np.arange(x2_min, x2_max, dx)

# Compute the PDF of each distribution
f_X = X.pdf(x1)
f_Y = Y.pdf(x2)

# The PDF of the sum of two independent random variables is the convolution of the PDF of the two distributions
f_Z = np.convolve(f_X, f_Y, mode="full") * dx

# Determine the x axis for the output convolution
x = np.arange(f_Z.size) * dx + x1[0] + x2[0]

# Adjust the histograms bins to be on either side of each point. So there is one extra point added.
x = np.append(x, x[-1] + dx)
x -= dx / 2

return scipy.stats.rv_histogram((f_Z, x))


def _x_range(X: scipy.stats.rv_continuous, p: float) -> tuple[float, float]:
r"""
Determines the range of x values for a given distribution such that the probability of exceeding the x axis, on
either side, is p.
"""
# Need to have these loops because for very small p, sometimes SciPy will return NaN instead of a valid value.
# The while loops will increase the p value until a valid value is returned.

pp = p
while True:
x_min = X.ppf(pp)
if not np.isnan(x_min):
break
pp *= 10

pp = p
while True:
x_max = X.isf(pp)
if not np.isnan(x_max):
break
pp *= 10

return x_min, x_max
File renamed without changes.
50 changes: 50 additions & 0 deletions tests/probability/test_sum_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import numpy as np
import scipy.stats

import sdr


def test_normal():
X = scipy.stats.norm(loc=-1, scale=0.5)
_verify(X, 2)
_verify(X, 3)
_verify(X, 4)


def test_rayleigh():
X = scipy.stats.rayleigh(scale=1)
_verify(X, 2)
_verify(X, 3)
_verify(X, 4)


def test_rician():
X = scipy.stats.rice(2)
_verify(X, 2)
_verify(X, 3)
_verify(X, 4)


def _verify(X, n):
# Numerically compute the distribution
Y = sdr.sum_distribution(X, n)

# Empirically compute the distribution
y = X.rvs((100_000, n)).sum(axis=1)
hist, bins = np.histogram(y, bins=101, density=True)
x = bins[1:] - np.diff(bins) / 2

if False:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(x, X.pdf(x), label="X")
plt.plot(x, Y.pdf(x), label="X + ... + X")
plt.hist(y, bins=101, cumulative=False, density=True, histtype="step", label="X + .. + X empirical")
plt.legend()
plt.xlabel("Random variable")
plt.ylabel("Probability density")
plt.title("Sum of distribution")
plt.show()

assert np.allclose(Y.pdf(x), hist, atol=np.max(hist) * 0.1)
60 changes: 60 additions & 0 deletions tests/probability/test_sum_distributions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import numpy as np
import scipy.stats

import sdr


def test_normal_normal():
X = scipy.stats.norm(loc=-1, scale=0.5)
Y = scipy.stats.norm(loc=2, scale=1.5)
_verify(X, Y)


def test_rayleigh_rayleigh():
X = scipy.stats.rayleigh(scale=1)
Y = scipy.stats.rayleigh(loc=1, scale=2)
_verify(X, Y)


def test_rician_rician():
X = scipy.stats.rice(2)
Y = scipy.stats.rice(3)
_verify(X, Y)


def test_normal_rayleigh():
X = scipy.stats.norm(loc=-1, scale=0.5)
Y = scipy.stats.rayleigh(loc=2, scale=1.5)
_verify(X, Y)


def test_rayleigh_rician():
X = scipy.stats.rayleigh(scale=1)
Y = scipy.stats.rice(3)
_verify(X, Y)


def _verify(X, Y):
# Numerically compute the distribution
Z = sdr.sum_distributions(X, Y)

# Empirically compute the distribution
z = X.rvs(100_000) + Y.rvs(100_000)
hist, bins = np.histogram(z, bins=101, density=True)
x = bins[1:] - np.diff(bins) / 2

if False:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(x, X.pdf(x), label="X")
plt.plot(x, Y.pdf(x), label="Y")
plt.plot(x, Z.pdf(x), label="X + Y")
plt.hist(z, bins=101, cumulative=False, density=True, histtype="step", label="X + Y empirical")
plt.legend()
plt.xlabel("Random variable")
plt.ylabel("Probability density")
plt.title("Sum of two distributions")
plt.show()

assert np.allclose(Z.pdf(x), hist, atol=np.max(hist) * 0.1)
Loading