From 8456635eef729fb4964f86ddcb309ba93781813a Mon Sep 17 00:00:00 2001 From: Daniel Weindl Date: Wed, 11 Dec 2024 09:49:10 +0100 Subject: [PATCH] Refactor priors (#329) Introduces `Prior` and `Distribution` classes for handling PEtab-specific prior distributions, and (PEtab-version-invariant) univariate probability distributions. Supports sampling from them, and evaluating negative log-priors (#312). Later on, this can be extended to noise models for measurements and computing loglikelihoods. This also adds a notebook demonstrating the various prior options which are a common source confusion. Closes #311. :eyes: notebook: https://petab--329.org.readthedocs.build/projects/libpetab-python/en/329/example/distributions.html --- doc/example.rst | 1 + doc/example/distributions.ipynb | 208 +++++++++++++++++++++++++++ doc/modules.rst | 1 + petab/v1/C.py | 3 +- petab/v1/distributions.py | 243 ++++++++++++++++++++++++++++++++ petab/v1/parameters.py | 3 +- petab/v1/priors.py | 212 ++++++++++++++++++++++++++++ petab/v1/sampling.py | 110 ++++----------- tests/v1/test_distributions.py | 87 ++++++++++++ tests/v1/test_priors.py | 94 ++++++++---- 10 files changed, 851 insertions(+), 111 deletions(-) create mode 100644 doc/example/distributions.ipynb create mode 100644 petab/v1/distributions.py create mode 100644 tests/v1/test_distributions.py diff --git a/doc/example.rst b/doc/example.rst index 6fe6dab5..dfe54fb3 100644 --- a/doc/example.rst +++ b/doc/example.rst @@ -10,6 +10,7 @@ The following examples should help to get a better idea of how to use the PEtab example/example_petablint.ipynb example/example_visualization.ipynb + example/distributions.ipynb Examples of systems biology parameter estimation problems specified in PEtab can be found in the `systems biology benchmark model collection `_. diff --git a/doc/example/distributions.ipynb b/doc/example/distributions.ipynb new file mode 100644 index 00000000..86235fe1 --- /dev/null +++ b/doc/example/distributions.ipynb @@ -0,0 +1,208 @@ +{ + "cells": [ + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "# Prior distributions in PEtab\n", + "\n", + "This notebook gives a brief overview of the prior distributions in PEtab and how they are represented in the PEtab library.\n", + "\n", + "Prior distributions are used to specify the prior knowledge about the parameters.\n", + "Parameter priors are specified in the parameter table. A prior is defined by its type and its parameters.\n", + "Each prior type has a specific set of parameters. For example, the normal distribution has two parameters: the mean and the standard deviation.\n", + "\n", + "There are two types of priors in PEtab - objective priors and initialization priors:\n", + "\n", + "* *Objective priors* are used to specify the prior knowledge about the parameters that are to be estimated. They will enter the objective function of the optimization problem. They are specified in the `objectivePriorType` and `objectivePriorParameters` columns of the parameter table.\n", + "* *Initialization priors* can be used as a hint for the optimization algorithm. They will not enter the objective function. They are specified in the `initializationPriorType` and `initializationPriorParameters` columns of the parameter table.\n", + "\n", + "\n" + ], + "id": "372289411a2aa7b3" + }, + { + "metadata": { + "collapsed": true + }, + "cell_type": "code", + "source": [ + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import seaborn as sns\n", + "\n", + "from petab.v1.C import *\n", + "from petab.v1.priors import Prior\n", + "\n", + "sns.set_style(None)\n", + "\n", + "\n", + "def plot(prior: Prior, ax=None):\n", + " \"\"\"Visualize a distribution.\"\"\"\n", + " if ax is None:\n", + " fig, ax = plt.subplots()\n", + "\n", + " sample = prior.sample(10000)\n", + "\n", + " # pdf\n", + " xmin = min(sample.min(), prior.lb_scaled if prior.bounds is not None else sample.min())\n", + " xmax = max(sample.max(), prior.ub_scaled if prior.bounds is not None else sample.max())\n", + " x = np.linspace(xmin, xmax, 500)\n", + " y = prior.pdf(x)\n", + " ax.plot(x, y, color='red', label='pdf')\n", + "\n", + " sns.histplot(sample, stat='density', ax=ax, label=\"sample\")\n", + "\n", + " # bounds\n", + " if prior.bounds is not None:\n", + " for bound in (prior.lb_scaled, prior.ub_scaled):\n", + " if bound is not None and np.isfinite(bound):\n", + " ax.axvline(bound, color='black', linestyle='--', label='bound')\n", + "\n", + " ax.set_title(str(prior))\n", + " ax.set_xlabel('Parameter value on the parameter scale')\n", + " ax.grid(False)\n", + " handles, labels = ax.get_legend_handles_labels()\n", + " unique_labels = dict(zip(labels, handles))\n", + " ax.legend(unique_labels.values(), unique_labels.keys())\n", + " plt.show()" + ], + "id": "initial_id", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "The basic distributions are the uniform, normal, Laplace, log-normal, and log-laplace distributions:\n", + "id": "db36a4a93622ccb8" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(UNIFORM, (0, 1)))\n", + "plot(Prior(NORMAL, (0, 1)))\n", + "plot(Prior(LAPLACE, (0, 1)))\n", + "plot(Prior(LOG_NORMAL, (0, 1)))\n", + "plot(Prior(LOG_LAPLACE, (1, 0.5)))" + ], + "id": "4f09e50a3db06d9f", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "If a parameter scale is specified (`parameterScale=lin|log|log10` not a `parameterScale*`-type distribution), the sample is transformed accordingly (but not the distribution parameters):\n", + "id": "dab4b2d1e0f312d8" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(NORMAL, (10, 2), transformation=LIN))\n", + "plot(Prior(NORMAL, (10, 2), transformation=LOG))\n", + "\n", + "# Note that the log-normal distribution is different from a log-transformed normal distribution:\n", + "plot(Prior(LOG_NORMAL, (10, 2), transformation=LIN))" + ], + "id": "f6192c226f179ef9", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "On the log-transformed parameter scale, `Log*` and `parameterScale*` distributions are equivalent:", + "id": "4281ed48859e6431" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(LOG_NORMAL, (10, 2), transformation=LOG))\n", + "plot(Prior(PARAMETER_SCALE_NORMAL, (10, 2)))" + ], + "id": "34c95268e8921070", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "Prior distributions can also be defined on the parameter scale by using the types `parameterScaleUniform`, `parameterScaleNormal` or `parameterScaleLaplace`. In these cases, 1) the distribution parameter are interpreted on the transformed parameter scale, and 2) a sample from the given distribution is used directly, without applying any transformation according to `parameterScale` (this implies, that for `parameterScale=lin`, there is no difference between `parameterScaleUniform` and `uniform`):", + "id": "263c9fd31156a4d5" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(UNIFORM, (0.01, 2), transformation=LOG10))\n", + "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LOG10))\n", + "\n", + "plot(Prior(UNIFORM, (0.01, 2), transformation=LIN))\n", + "plot(Prior(PARAMETER_SCALE_UNIFORM, (0.01, 2), transformation=LIN))\n" + ], + "id": "5ca940bc24312fc6", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "To prevent the sampled parameters from exceeding the bounds, the sampled parameters are clipped to the bounds. The bounds are defined in the parameter table. Note that the current implementation does not support sampling from a truncated distribution. Instead, the samples are clipped to the bounds. This may introduce unwanted bias, and thus, should only be used with caution (i.e., the bounds should be chosen wide enough):", + "id": "b1a8b17d765db826" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(NORMAL, (0, 1), bounds=(-4, 4))) # negligible clipping-bias at 4 sigma\n", + "plot(Prior(UNIFORM, (0, 1), bounds=(0.1, 0.9))) # significant clipping-bias" + ], + "id": "4ac42b1eed759bdd", + "outputs": [], + "execution_count": null + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": "Further distribution examples:", + "id": "45ffce1341483f24" + }, + { + "metadata": {}, + "cell_type": "code", + "source": [ + "plot(Prior(NORMAL, (10, 1), bounds=(6, 14), transformation=\"log10\"))\n", + "plot(Prior(PARAMETER_SCALE_NORMAL, (10, 1), bounds=(10**6, 10**14), transformation=\"log10\"))\n", + "plot(Prior(LAPLACE, (10, 2), bounds=(6, 14)))" + ], + "id": "581e1ac431860419", + "outputs": [], + "execution_count": null + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/modules.rst b/doc/modules.rst index 87a9559d..e933c06f 100644 --- a/doc/modules.rst +++ b/doc/modules.rst @@ -14,6 +14,7 @@ API Reference petab.v1.composite_problem petab.v1.conditions petab.v1.core + petab.v1.distributions petab.v1.lint petab.v1.measurements petab.v1.models diff --git a/petab/v1/C.py b/petab/v1/C.py index a013a0cc..be044a5c 100644 --- a/petab/v1/C.py +++ b/petab/v1/C.py @@ -173,7 +173,8 @@ LOG10 = "log10" #: Supported observable transformations OBSERVABLE_TRANSFORMATIONS = [LIN, LOG, LOG10] - +#: Supported parameter transformations +PARAMETER_SCALES = [LIN, LOG, LOG10] # NOISE MODELS diff --git a/petab/v1/distributions.py b/petab/v1/distributions.py new file mode 100644 index 00000000..418f5b44 --- /dev/null +++ b/petab/v1/distributions.py @@ -0,0 +1,243 @@ +"""Probability distributions used by PEtab.""" +from __future__ import annotations + +import abc + +import numpy as np +from scipy.stats import laplace, norm, uniform + +__all__ = [ + "Distribution", + "Normal", + "Uniform", + "Laplace", +] + + +class Distribution(abc.ABC): + """A univariate probability distribution. + + This class provides a common interface for sampling from and evaluating + the probability density function of a univariate probability distribution. + + The distribution can be transformed by applying a logarithm to the samples + and the PDF. This is useful, e.g., for log-normal distributions. + + :param log: If ``True``, the distribution is transformed to its + corresponding log distribution (e.g., Normal -> LogNormal). + If a float, the distribution is transformed to its corresponding + log distribution with the given base (e.g., Normal -> Log10Normal). + If ``False``, no transformation is applied. + """ + + def __init__(self, log: bool | float = False): + if log is True: + log = np.exp(1) + self._logbase = log + + def _undo_log(self, x: np.ndarray | float) -> np.ndarray | float: + """Undo the log transformation. + + :param x: The sample to transform. + :return: The transformed sample + """ + if self._logbase is False: + return x + return self._logbase**x + + def _apply_log(self, x: np.ndarray | float) -> np.ndarray | float: + """Apply the log transformation. + + :param x: The value to transform. + :return: The transformed value. + """ + if self._logbase is False: + return x + return np.log(x) / np.log(self._logbase) + + def sample(self, shape=None) -> np.ndarray: + """Sample from the distribution. + + :param shape: The shape of the sample. + :return: A sample from the distribution. + """ + sample = self._sample(shape) + return self._undo_log(sample) + + @abc.abstractmethod + def _sample(self, shape=None) -> np.ndarray: + """Sample from the underlying distribution. + + :param shape: The shape of the sample. + :return: A sample from the underlying distribution, + before applying, e.g., the log transformation. + """ + ... + + def pdf(self, x): + """Probability density function at x. + + :param x: The value at which to evaluate the PDF. + :return: The value of the PDF at ``x``. + """ + # handle the log transformation; see also: + # https://en.wikipedia.org/wiki/Probability_density_function#Scalar_to_scalar + chain_rule_factor = ( + (1 / (x * np.log(self._logbase))) if self._logbase else 1 + ) + return self._pdf(self._apply_log(x)) * chain_rule_factor + + @abc.abstractmethod + def _pdf(self, x): + """Probability density function of the underlying distribution at x. + + :param x: The value at which to evaluate the PDF. + :return: The value of the PDF at ``x``. + """ + ... + + @property + def logbase(self) -> bool | float: + """The base of the log transformation. + + If ``False``, no transformation is applied. + """ + return self._logbase + + +class Normal(Distribution): + """A (log-)normal distribution. + + :param loc: The location parameter of the distribution. + :param scale: The scale parameter of the distribution. + :param truncation: The truncation limits of the distribution. + :param log: If ``True``, the distribution is transformed to a log-normal + distribution. If a float, the distribution is transformed to a + log-normal distribution with the given base. + If ``False``, no transformation is applied. + If a transformation is applied, the location and scale parameters + and the truncation limits are the location, scale and truncation limits + of the underlying normal distribution. + """ + + def __init__( + self, + loc: float, + scale: float, + truncation: tuple[float, float] | None = None, + log: bool | float = False, + ): + super().__init__(log=log) + self._loc = loc + self._scale = scale + self._truncation = truncation + + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") + + def __repr__(self): + trunc = f", truncation={self._truncation}" if self._truncation else "" + log = f", log={self._logbase}" if self._logbase else "" + return f"Normal(loc={self._loc}, scale={self._scale}{trunc}{log})" + + def _sample(self, shape=None): + return np.random.normal(loc=self._loc, scale=self._scale, size=shape) + + def _pdf(self, x): + return norm.pdf(x, loc=self._loc, scale=self._scale) + + @property + def loc(self): + """The location parameter of the underlying distribution.""" + return self._loc + + @property + def scale(self): + """The scale parameter of the underlying distribution.""" + return self._scale + + +class Uniform(Distribution): + """A (log-)uniform distribution. + + :param low: The lower bound of the distribution. + :param high: The upper bound of the distribution. + :param log: If ``True``, the distribution is transformed to a log-uniform + distribution. If a float, the distribution is transformed to a + log-uniform distribution with the given base. + If ``False``, no transformation is applied. + If a transformation is applied, the lower and upper bounds are the + lower and upper bounds of the underlying uniform distribution. + """ + + def __init__( + self, + low: float, + high: float, + *, + log: bool | float = False, + ): + super().__init__(log=log) + self._low = low + self._high = high + + def __repr__(self): + log = f", log={self._logbase}" if self._logbase else "" + return f"Uniform(low={self._low}, high={self._high}{log})" + + def _sample(self, shape=None): + return np.random.uniform(low=self._low, high=self._high, size=shape) + + def _pdf(self, x): + return uniform.pdf(x, loc=self._low, scale=self._high - self._low) + + +class Laplace(Distribution): + """A (log-)Laplace distribution. + + :param loc: The location parameter of the distribution. + :param scale: The scale parameter of the distribution. + :param truncation: The truncation limits of the distribution. + :param log: If ``True``, the distribution is transformed to a log-Laplace + distribution. If a float, the distribution is transformed to a + log-Laplace distribution with the given base. + If ``False``, no transformation is applied. + If a transformation is applied, the location and scale parameters + and the truncation limits are the location, scale and truncation limits + of the underlying Laplace distribution. + """ + + def __init__( + self, + loc: float, + scale: float, + truncation: tuple[float, float] | None = None, + log: bool | float = False, + ): + super().__init__(log=log) + self._loc = loc + self._scale = scale + self._truncation = truncation + if truncation is not None: + raise NotImplementedError("Truncation is not yet implemented.") + + def __repr__(self): + trunc = f", truncation={self._truncation}" if self._truncation else "" + log = f", log={self._logbase}" if self._logbase else "" + return f"Laplace(loc={self._loc}, scale={self._scale}{trunc}{log})" + + def _sample(self, shape=None): + return np.random.laplace(loc=self._loc, scale=self._scale, size=shape) + + def _pdf(self, x): + return laplace.pdf(x, loc=self._loc, scale=self._scale) + + @property + def loc(self): + """The location parameter of the underlying distribution.""" + return self._loc + + @property + def scale(self): + """The scale parameter of the underlying distribution.""" + return self._scale diff --git a/petab/v1/parameters.py b/petab/v1/parameters.py index 8f252988..8875c84f 100644 --- a/petab/v1/parameters.py +++ b/petab/v1/parameters.py @@ -524,7 +524,8 @@ def scale( if scale_str == LOG: return np.log(parameter) if scale_str == LOG10: - return np.log10(parameter) + with np.errstate(divide="ignore"): + return np.log10(parameter) raise ValueError(f"Invalid parameter scaling: {scale_str}") diff --git a/petab/v1/priors.py b/petab/v1/priors.py index 52fec20d..f0f37f75 100644 --- a/petab/v1/priors.py +++ b/petab/v1/priors.py @@ -1,5 +1,8 @@ """Functions related to prior handling.""" +from __future__ import annotations + import copy +from typing import Literal import numpy as np import pandas as pd @@ -29,12 +32,221 @@ PARAMETER_SEPARATOR, SIMULATION_CONDITION_ID, TIME, + C, Problem, ) +from .distributions import * +from .parameters import scale, unscale __all__ = ["priors_to_measurements"] +class Prior: + """A PEtab parameter prior. + + Different from the general :class:`Distribution`, this class is used to + represent the prior distribution of a PEtab parameter using the + PEtab-specific options like `parameterScale`, `*PriorType`, + `*PriorParameters`, and `lowerBound` / `upperBounds`. + + :param type_: The type of the distribution. + :param transformation: The transformation to be applied to the sample. + Ignored if `parameter_scale` is `True`. + :param parameters: The parameters of the distribution (unaffected by + `parameter_scale` and `transformation`, but in the case of + `parameterScale*` distribution types, the parameters are assumed to be + on the `parameter_scale` scale). + :param bounds: The untransformed bounds of the sample (lower, upper). + :param transformation: The transformation of the distribution. + """ + + def __init__( + self, + type_: str, + parameters: tuple, + bounds: tuple = None, + transformation: str = C.LIN, + ): + if transformation not in C.PARAMETER_SCALES: + raise ValueError( + f"Unknown parameter transformation: {transformation}" + ) + + if len(parameters) != 2: + raise ValueError( + f"Expected two parameters, got {len(parameters)}: {parameters}" + ) + + if bounds is not None and len(bounds) != 2: + raise ValueError( + "Expected (lowerBound, upperBound), got " + f"{len(bounds)}: {bounds}" + ) + + self._type = type_ + self._parameters = parameters + self._bounds = bounds + self._transformation = transformation + + # create the underlying distribution + match type_, transformation: + case (C.UNIFORM, _) | (C.PARAMETER_SCALE_UNIFORM, C.LIN): + self.distribution = Uniform(*parameters) + case (C.NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LIN): + self.distribution = Normal(*parameters) + case (C.LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LIN): + self.distribution = Laplace(*parameters) + case (C.PARAMETER_SCALE_UNIFORM, C.LOG): + self.distribution = Uniform(*parameters, log=True) + case (C.LOG_NORMAL, _) | (C.PARAMETER_SCALE_NORMAL, C.LOG): + self.distribution = Normal(*parameters, log=True) + case (C.LOG_LAPLACE, _) | (C.PARAMETER_SCALE_LAPLACE, C.LOG): + self.distribution = Laplace(*parameters, log=True) + case (C.PARAMETER_SCALE_UNIFORM, C.LOG10): + self.distribution = Uniform(*parameters, log=10) + case (C.PARAMETER_SCALE_NORMAL, C.LOG10): + self.distribution = Normal(*parameters, log=10) + case (C.PARAMETER_SCALE_LAPLACE, C.LOG10): + self.distribution = Laplace(*parameters, log=10) + case _: + raise ValueError( + "Unsupported distribution type / transformation: " + f"{type_} / {transformation}" + ) + + def __repr__(self): + return ( + f"{self.__class__.__name__}(" + f"{self.type!r}, {self.parameters!r}," + f" bounds={self.bounds!r}, transformation={self.transformation!r}," + ")" + ) + + @property + def type(self): + return self._type + + @property + def parameters(self): + return self._parameters + + @property + def bounds(self): + return self._bounds + + @property + def transformation(self): + return self._transformation + + def sample(self, shape=None) -> np.ndarray: + """Sample from the distribution. + + :param shape: The shape of the sample. + :return: A sample from the distribution. + """ + raw_sample = self.distribution.sample(shape) + return self._clip_to_bounds(self._scale_sample(raw_sample)) + + def _scale_sample(self, sample): + """Scale the sample to the parameter space""" + # if self.on_parameter_scale: + # return sample + + return scale(sample, self.transformation) + + def _clip_to_bounds(self, x): + """Clip `x` values to bounds. + + :param x: The values to clip. Assumed to be on the parameter scale. + """ + # TODO: replace this by proper truncation + if self.bounds is None: + return x + + return np.maximum( + np.minimum(self.ub_scaled, x), + self.lb_scaled, + ) + + @property + def lb_scaled(self): + """The lower bound on the parameter scale.""" + return scale(self.bounds[0], self.transformation) + + @property + def ub_scaled(self): + """The upper bound on the parameter scale.""" + return scale(self.bounds[1], self.transformation) + + def pdf(self, x): + """Probability density function at x. + + :param x: The value at which to evaluate the PDF. + ``x`` is assumed to be on the parameter scale. + :return: The value of the PDF at ``x``. Note that the PDF does + currently not account for the clipping at the bounds. + """ + x = unscale(x, self.transformation) + + # scale the PDF to the parameter scale + if self.transformation == C.LIN: + coeff = 1 + elif self.transformation == C.LOG10: + coeff = x * np.log(10) + elif self.transformation == C.LOG: + coeff = x + else: + raise ValueError(f"Unknown transformation: {self.transformation}") + + return self.distribution.pdf(x) * coeff + + def neglogprior(self, x): + """Negative log-prior at x. + + :param x: The value at which to evaluate the negative log-prior. + ``x`` is assumed to be on the parameter scale. + :return: The negative log-prior at ``x``. + """ + return -np.log(self.pdf(x)) + + @staticmethod + def from_par_dict( + d, type_=Literal["initialization", "objective"] + ) -> Prior: + """Create a distribution from a row of the parameter table. + + :param d: A dictionary representing a row of the parameter table. + :param type_: The type of the distribution. + :return: A distribution object. + """ + dist_type = d.get(f"{type_}PriorType", C.PARAMETER_SCALE_UNIFORM) + if not isinstance(dist_type, str) and np.isnan(dist_type): + dist_type = C.PARAMETER_SCALE_UNIFORM + + pscale = d.get(C.PARAMETER_SCALE, C.LIN) + if ( + pd.isna(d[f"{type_}PriorParameters"]) + and dist_type == C.PARAMETER_SCALE_UNIFORM + ): + params = ( + scale(d[C.LOWER_BOUND], pscale), + scale(d[C.UPPER_BOUND], pscale), + ) + else: + params = tuple( + map( + float, + d[f"{type_}PriorParameters"].split(C.PARAMETER_SEPARATOR), + ) + ) + return Prior( + type_=dist_type, + parameters=params, + bounds=(d[C.LOWER_BOUND], d[C.UPPER_BOUND]), + transformation=pscale, + ) + + def priors_to_measurements(problem: Problem): """Convert priors to measurements. diff --git a/petab/v1/sampling.py b/petab/v1/sampling.py index be154f1c..a046879f 100644 --- a/petab/v1/sampling.py +++ b/petab/v1/sampling.py @@ -5,7 +5,6 @@ import numpy as np import pandas as pd -from . import parameters from .C import * # noqa: F403 __all__ = ["sample_from_prior", "sample_parameter_startpoints"] @@ -24,86 +23,14 @@ def sample_from_prior( Returns: Array with sampled values """ + from .priors import Prior + # unpack info p_type, p_params, scaling, bounds = prior - - # define a function to rescale the sampled points to parameter scale - def scale(x): - if scaling == LIN: - return x - if scaling == LOG: - return np.log(x) - if scaling == LOG10: - return np.log10(x) - raise NotImplementedError( - f"Parameter priors on the parameter scale {scaling} are " - "currently not implemented." - ) - - def clip_to_bounds(x: np.array): - """Clip values in array x to bounds""" - return np.maximum(np.minimum(scale(bounds[1]), x), scale(bounds[0])) - - # define lambda functions for each parameter - if p_type == UNIFORM: - sp = scale( - (p_params[1] - p_params[0]) * np.random.random((n_starts,)) - + p_params[0] - ) - - elif p_type == PARAMETER_SCALE_UNIFORM: - sp = (p_params[1] - p_params[0]) * np.random.random( - (n_starts,) - ) + p_params[0] - - elif p_type == NORMAL: - sp = scale( - np.random.normal( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - ) - - elif p_type == LOG_NORMAL: - sp = scale( - np.exp( - np.random.normal( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - ) - ) - - elif p_type == PARAMETER_SCALE_NORMAL: - sp = np.random.normal( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - - elif p_type == LAPLACE: - sp = scale( - np.random.laplace( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - ) - - elif p_type == LOG_LAPLACE: - sp = scale( - np.exp( - np.random.laplace( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - ) - ) - - elif p_type == PARAMETER_SCALE_LAPLACE: - sp = np.random.laplace( - loc=p_params[0], scale=p_params[1], size=(n_starts,) - ) - - else: - raise NotImplementedError( - f"Parameter priors of type {prior[0]} are not implemented." - ) - - return clip_to_bounds(sp) + prior = Prior( + p_type, tuple(p_params), bounds=tuple(bounds), transformation=scaling + ) + return prior.sample(shape=(n_starts,)) def sample_parameter_startpoints( @@ -127,14 +54,27 @@ def sample_parameter_startpoints( Array of sampled starting points with dimensions `n_startpoints` x `n_optimization_parameters` """ + from .priors import Prior + if seed is not None: np.random.seed(seed) - # get types and parameters of priors from dataframe - prior_list = parameters.get_priors_from_df( - parameter_df, mode=INITIALIZATION, parameter_ids=parameter_ids - ) + par_to_estimate = parameter_df.loc[parameter_df[ESTIMATE] == 1] - startpoints = [sample_from_prior(prior, n_starts) for prior in prior_list] + if parameter_ids is not None: + try: + par_to_estimate = par_to_estimate.loc[parameter_ids, :] + except KeyError as e: + missing_ids = set(parameter_ids) - set(par_to_estimate.index) + raise KeyError( + "Parameter table does not contain estimated parameter(s) " + f"{missing_ids}." + ) from e - return np.array(startpoints).T + # get types and parameters of priors from dataframe + return np.array( + [ + Prior.from_par_dict(row, type_="initialization").sample(n_starts) + for row in par_to_estimate.to_dict("records") + ] + ).T diff --git a/tests/v1/test_distributions.py b/tests/v1/test_distributions.py new file mode 100644 index 00000000..9df830fa --- /dev/null +++ b/tests/v1/test_distributions.py @@ -0,0 +1,87 @@ +import numpy as np +import pytest +from numpy.testing import assert_allclose +from scipy.integrate import cumulative_trapezoid +from scipy.stats import ( + kstest, + laplace, + loglaplace, + lognorm, + loguniform, + norm, + uniform, +) + +from petab.v1.distributions import * +from petab.v2.C import * + + +@pytest.mark.parametrize( + "distribution", + [ + Normal(2, 1), + Normal(2, 1, log=True), + Normal(2, 1, log=10), + Uniform(2, 4), + Uniform(-2, 4, log=True), + Uniform(2, 4, log=10), + Laplace(1, 2), + Laplace(1, 0.5, log=True), + ], +) +def test_sample_matches_pdf(distribution): + """Test that the sample matches the PDF.""" + np.random.seed(1) + N_SAMPLES = 10_000 + sample = distribution.sample(N_SAMPLES) + + def cdf(x): + # pdf -> cdf + return cumulative_trapezoid(distribution.pdf(x), x) + + # Kolmogorov-Smirnov test to check if the sample is drawn from the CDF + _, p = kstest(sample, cdf) + + # if p < 0.05: + # import matplotlib.pyplot as plt + # plt.hist(sample, bins=100, density=True) + # x = np.linspace(min(sample), max(sample), 100) + # plt.plot(x, distribution.pdf(x)) + # plt.show() + + assert p > 0.05, (p, distribution) + + # Test samples match scipy CDFs + reference_pdf = None + if isinstance(distribution, Normal) and distribution.logbase is False: + reference_pdf = norm.pdf(sample, distribution.loc, distribution.scale) + elif isinstance(distribution, Uniform) and distribution.logbase is False: + reference_pdf = uniform.pdf( + sample, distribution._low, distribution._high - distribution._low + ) + elif isinstance(distribution, Laplace) and distribution.logbase is False: + reference_pdf = laplace.pdf( + sample, distribution.loc, distribution.scale + ) + elif isinstance(distribution, Normal) and distribution.logbase == np.exp( + 1 + ): + reference_pdf = lognorm.pdf( + sample, scale=np.exp(distribution.loc), s=distribution.scale + ) + elif isinstance(distribution, Uniform) and distribution.logbase == np.exp( + 1 + ): + reference_pdf = loguniform.pdf( + sample, np.exp(distribution._low), np.exp(distribution._high) + ) + elif isinstance(distribution, Laplace) and distribution.logbase == np.exp( + 1 + ): + reference_pdf = loglaplace.pdf( + sample, c=1 / distribution.scale, scale=np.exp(distribution.loc) + ) + if reference_pdf is not None: + assert_allclose( + distribution.pdf(sample), reference_pdf, rtol=1e-10, atol=1e-14 + ) diff --git a/tests/v1/test_priors.py b/tests/v1/test_priors.py index ac07d089..d98879e3 100644 --- a/tests/v1/test_priors.py +++ b/tests/v1/test_priors.py @@ -1,11 +1,13 @@ from copy import deepcopy +from itertools import product from pathlib import Path import benchmark_models_petab import numpy as np import pandas as pd import pytest -from scipy.stats import norm +from scipy.integrate import cumulative_trapezoid +from scipy.stats import kstest import petab.v1 from petab.v1 import ( @@ -14,10 +16,11 @@ OBJECTIVE_PRIOR_TYPE, OBSERVABLE_ID, SIMULATION, + C, get_simulation_conditions, get_simulation_df, ) -from petab.v1.priors import priors_to_measurements +from petab.v1.priors import Prior, priors_to_measurements @pytest.mark.parametrize( @@ -48,6 +51,13 @@ def test_priors_to_measurements(problem_id): strict=True, ) ) + x_unscaled_dict = dict( + zip( + original_problem.x_free_ids, + original_problem.x_nominal_free, + strict=True, + ) + ) # convert priors to measurements petab_problem_measurements = priors_to_measurements(petab_problem_priors) @@ -110,9 +120,13 @@ def test_priors_to_measurements(problem_id): def apply_parameter_values(row): # apply the parameter values to the observable formula for the prior if row[OBSERVABLE_ID].startswith("prior_"): - row[SIMULATION] = x_scaled_dict[ - row[OBSERVABLE_ID].removeprefix("prior_") - ] + parameter_id = row[OBSERVABLE_ID].removeprefix("prior_") + if original_problem.parameter_df.loc[ + parameter_id, OBJECTIVE_PRIOR_TYPE + ].startswith("parameterScale"): + row[SIMULATION] = x_scaled_dict[parameter_id] + else: + row[SIMULATION] = x_unscaled_dict[parameter_id] return row simulated_prior_observables = simulated_prior_observables.apply( @@ -140,29 +154,61 @@ def apply_parameter_values(row): (petab_problem_priors.parameter_df[ESTIMATE] == 1) & petab_problem_priors.parameter_df[OBJECTIVE_PRIOR_TYPE].notna() ] - priors = petab.v1.get_priors_from_df( - petab_problem_priors.parameter_df, - mode="objective", - parameter_ids=parameter_ids, - ) + priors = [ + Prior.from_par_dict( + petab_problem_priors.parameter_df.loc[par_id], type_="objective" + ) + for par_id in parameter_ids + ] prior_contrib = 0 for parameter_id, prior in zip(parameter_ids, priors, strict=True): - prior_type, prior_pars, par_scale, par_bounds = prior - if prior_type == petab.v1.PARAMETER_SCALE_NORMAL: - prior_contrib += norm.logpdf( - x_scaled_dict[parameter_id], - loc=prior_pars[0], - scale=prior_pars[1], - ) - else: - # enable other models, once libpetab has proper support for - # evaluating the prior contribution. until then, two test - # problems should suffice - assert problem_id == "Raimundez_PCB2020" - pytest.skip(f"Prior type {prior_type} not implemented") + prior_contrib -= prior.neglogprior(x_scaled_dict[parameter_id]) assert np.isclose( - llh_priors + prior_contrib, llh_measurements, rtol=1e-3, atol=1e-16 + llh_priors + prior_contrib, llh_measurements, rtol=1e-8, atol=1e-16 ), (llh_priors + prior_contrib, llh_measurements) # check that the tolerance is not too high assert np.abs(prior_contrib) > 1e-8 * np.abs(llh_priors) + + +cases = list( + product( + [ + (C.NORMAL, (10, 1)), + (C.LOG_NORMAL, (2, 1)), + (C.UNIFORM, (1, 2)), + (C.LAPLACE, (20, 2)), + (C.LOG_LAPLACE, (1, 0.5)), + (C.PARAMETER_SCALE_NORMAL, (1, 1)), + (C.PARAMETER_SCALE_LAPLACE, (1, 2)), + (C.PARAMETER_SCALE_UNIFORM, (1, 2)), + ], + C.PARAMETER_SCALES, + ) +) +ids = [f"{prior_args[0]}_{transform}" for prior_args, transform in cases] + + +@pytest.mark.parametrize("prior_args, transform", cases, ids=ids) +def test_sample_matches_pdf(prior_args, transform): + """Test that the sample matches the PDF.""" + np.random.seed(1) + N_SAMPLES = 10_000 + prior = Prior(*prior_args, transformation=transform) + sample = prior.sample(N_SAMPLES) + + # pdf -> cdf + def cdf(x): + return cumulative_trapezoid(prior.pdf(x), x) + + # Kolmogorov-Smirnov test to check if the sample is drawn from the CDF + _, p = kstest(sample, cdf) + + # if p < 0.05: + # import matplotlib.pyplot as plt + # plt.hist(sample, bins=100, density=True) + # x = np.linspace(min(sample), max(sample), 100) + # plt.plot(x, distribution.pdf(x)) + # plt.show() + + assert p > 0.05, (p, prior)