From 5e67189fe975eabc317b7474ca28167d3ccff409 Mon Sep 17 00:00:00 2001 From: drabbit17 Date: Fri, 9 Apr 2021 00:37:28 +0100 Subject: [PATCH] Further refactoring The refactoring should make it possible testing both the distribution parametrization and sampled values according to need, as well as any other future test. More details on PR #4608 --- pymc3/tests/test_distributions_random.py | 335 +++++++++++++++-------- 1 file changed, 225 insertions(+), 110 deletions(-) diff --git a/pymc3/tests/test_distributions_random.py b/pymc3/tests/test_distributions_random.py index 00655b70166..853f4f65ec4 100644 --- a/pymc3/tests/test_distributions_random.py +++ b/pymc3/tests/test_distributions_random.py @@ -11,11 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import functools import itertools import sys from contextlib import ExitStack as does_not_raise +from typing import Any, Callable, Dict, List import aesara import numpy as np @@ -419,181 +420,295 @@ class TestMoyal(BaseTestCases.BaseTestCase): params = {"mu": 0.0, "sigma": 1.0} -class TestCorrectParametrizationMappingPymcToScipy(SeededTest): - @staticmethod - def get_inputs_from_apply_node_outputs(outputs): +class TestRandomVariableDistribution(SeededTest): + def _run_distribution_checks( + self, params: Dict[str, Any], tests_to_run: List[Callable] + ) -> None: + params["pymc_dist_output"] = params["pymc_dist"].dist( + **params["pymc_dist_params"], + size=params.get("size", 15), + rng=aesara.shared(self.get_random_state()), + ) + if "expected_dist" in params: + params["expected_dist_outcome"] = params["expected_dist"]( + **params["expected_dist_params"], size=params.get("size", 15) + ) + for test in tests_to_run: + test(**params) + + def _compare_pymc_dist_with_expected( + self, pymc_dist_output, expected_dist_outcome, decimal: int = 6, **kwargs + ) -> None: + assert_array_almost_equal(pymc_dist_output.eval(), expected_dist_outcome, decimal=decimal) + + def _pymc_params_match_rv_op( + self, pymc_dist_output, expected_rv_op_params: Dict[str, Any], decimal: int = 6, **kwargs + ): + aesera_dist_inputs = self._get_inputs_from_apply_node_outputs(pymc_dist_output)[3:] + assert len(expected_rv_op_params) == len(aesera_dist_inputs) + for (expected_name, expected_value), actual_variable in zip( + expected_rv_op_params.items(), aesera_dist_inputs + ): + assert_almost_equal(expected_value, actual_variable.eval(), decimal=decimal) + + def _get_inputs_from_apply_node_outputs(self, outputs) -> List[Any]: parents = outputs.get_parents() if not parents: raise Exception("Parent Apply node missing for output") # I am assuming there will always only be 1 Apply parent node in this context return parents[0].inputs - def _compare_pymc_sampling_with_aesara( - self, - pymc_dist, - pymc_params, - expected_aesara_params, - size=15, - decimal=6, - sampling_aesara_params=None, - ): - pymc_params["size"] = size - pymc_dist_output = pymc_dist.dist(rng=aesara.shared(self.get_random_state()), **pymc_params) - self._pymc_params_match_aesara_rv(pymc_dist_output, expected_aesara_params, decimal) - if sampling_aesara_params is None: - sampling_aesara_params = expected_aesara_params - self._pymc_sample_matches_aeasara_rv( - pymc_dist_output, pymc_dist, sampling_aesara_params, size, decimal - ) + def _get_numpy_distribution(self, name: str) -> Callable: + return functools.partial(getattr(np.random.RandomState, name), self.get_random_state()) - def _pymc_params_match_aesara_rv(self, pymc_dist_output, expected_aesara_params, decimal=6): - aesera_dist_inputs = self.get_inputs_from_apply_node_outputs(pymc_dist_output)[3:] - assert len(expected_aesara_params) == len(aesera_dist_inputs) - for (expected_name, expected_value), actual_variable in zip( - expected_aesara_params.items(), aesera_dist_inputs - ): - assert_almost_equal(expected_value, actual_variable.eval(), decimal=decimal) - - def _pymc_sample_matches_aeasara_rv( - self, pymc_dist_output, pymc_dist, expected_aesara_params, size, decimal - ): - sample = pymc_dist.rv_op( - *[p for p in expected_aesara_params.values()], - size=size, - rng=aesara.shared(self.get_random_state()), + def test_gumbel(self): + params = { + "pymc_dist": pm.Gumbel, + "pymc_dist_params": {"mu": 1.5, "beta": 3.0}, + "expected_rv_op_params": {"mu": 1.5, "beta": 3.0}, + "expected_dist": st.gumbel_r.rvs, + "expected_dist_params": {"loc": 1.5, "scale": 3.0}, + "size": 15, + "random_state": self.get_random_state(), + } + self._run_distribution_checks( + params, [self._pymc_params_match_rv_op, self._compare_pymc_dist_with_expected] ) - assert_array_almost_equal(pymc_dist_output.eval(), sample.eval(), decimal=decimal) def test_normal(self): - params = {"mu": 5.0, "sigma": 10.0} - self._compare_pymc_sampling_with_aesara(pm.Normal, params, params.copy()) + params = { + "pymc_dist": pm.Normal, + "pymc_dist_params": {"mu": 5.0, "sigma": 10.0}, + "expected_rv_op_params": {"mu": 5.0, "sigma": 10.0}, + "expected_dist": self._get_numpy_distribution("normal"), + "expected_dist_params": {"loc": 5.0, "scale": 10.0}, + "size": 15, + } + self._run_distribution_checks( + params, [self._pymc_params_match_rv_op, self._compare_pymc_dist_with_expected] + ) def test_uniform(self): - params = {"lower": 0.5, "upper": 1.5} - self._compare_pymc_sampling_with_aesara(pm.Uniform, params, params.copy()) + params = { + "pymc_dist": pm.Uniform, + "pymc_dist_params": {"lower": 0.5, "upper": 1.5}, + "expected_rv_op_params": {"lower": 0.5, "upper": 1.5}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_half_normal(self): - params, expected_aesara_params = {"sigma": 10.0}, {"mean": 0, "sigma": 10.0} - self._compare_pymc_sampling_with_aesara( - pm.HalfNormal, - params, - expected_aesara_params, - ) + params = { + "pymc_dist": pm.HalfNormal, + "pymc_dist_params": {"sigma": 10.0}, + "expected_rv_op_params": {"mean": 0, "sigma": 10.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_beta_alpha_beta(self): - params = {"alpha": 2.0, "beta": 5.0} - self._compare_pymc_sampling_with_aesara(pm.Beta, params, params.copy()) + params = { + "pymc_dist": pm.Beta, + "pymc_dist_params": {"alpha": 2.0, "beta": 5.0}, + "expected_rv_op_params": {"alpha": 2.0, "beta": 5.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_beta_mu_sigma(self): - params = {"mu": 0.5, "sigma": 0.25} + params = { + "pymc_dist": pm.Beta, + "pymc_dist_params": {"mu": 0.5, "sigma": 0.25}, + } expected_alpha, expected_beta = pm.Beta.get_alpha_beta( - mu=params["mu"], sigma=params["sigma"] + mu=params["pymc_dist_params"]["mu"], sigma=params["pymc_dist_params"]["sigma"] ) - expected_params = {"alpha": expected_alpha, "beta": expected_beta} - self._compare_pymc_sampling_with_aesara(pm.Beta, params, expected_params) + params["expected_rv_op_params"] = {"alpha": expected_alpha, "beta": expected_beta} + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_exponential(self): - params = {"lam": 10.0} - expected_params = {"lam": 1.0 / params["lam"]} - self._compare_pymc_sampling_with_aesara(pm.Exponential, params, expected_params) + params = { + "pymc_dist": pm.Exponential, + "pymc_dist_params": {"lam": 10.0}, + "expected_rv_op_params": {"lam": 1.0 / 10.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_cauchy(self): - params = {"alpha": 2.0, "beta": 5.0} - self._compare_pymc_sampling_with_aesara(pm.Cauchy, params, params.copy()) + params = { + "pymc_dist": pm.Cauchy, + "pymc_dist_params": {"alpha": 2.0, "beta": 5.0}, + "expected_rv_op_params": {"alpha": 2.0, "beta": 5.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_half_cauchy(self): - params = {"beta": 5.0} - expected_params = {"alpha": 0.0, "beta": 5.0} - self._compare_pymc_sampling_with_aesara(pm.HalfCauchy, params, expected_params) + params = { + "pymc_dist": pm.HalfCauchy, + "pymc_dist_params": {"beta": 5.0}, + "expected_rv_op_params": {"alpha": 0.0, "beta": 5.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_gamma_alpha_beta(self): - params = {"alpha": 2.0, "beta": 5.0} - expected_params = {"alpha": params["alpha"], "beta": 1 / params["beta"]} - sampling_aesara_params = {"alpha": params["alpha"], "beta": params["beta"]} - self._compare_pymc_sampling_with_aesara( - pm.Gamma, params, expected_params, sampling_aesara_params=sampling_aesara_params - ) + params = { + "pymc_dist": pm.Gamma, + "pymc_dist_params": {"alpha": 2.0, "beta": 5.0}, + "expected_rv_op_params": {"alpha": 2.0, "beta": 1 / 5.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_gamma_mu_sigma(self): - params = {"mu": 0.5, "sigma": 0.25} + params = { + "pymc_dist": pm.Gamma, + "pymc_dist_params": {"mu": 0.5, "sigma": 0.25}, + } expected_alpha, expected_beta = pm.Gamma.get_alpha_beta( - mu=params["mu"], sigma=params["sigma"] - ) - expected_params = {"alpha": expected_alpha, "beta": 1 / expected_beta} - sampling_aesara_params = {"alpha": expected_alpha, "beta": expected_beta} - self._compare_pymc_sampling_with_aesara( - pm.Gamma, params, expected_params, sampling_aesara_params=sampling_aesara_params + mu=params["pymc_dist_params"]["mu"], sigma=params["pymc_dist_params"]["sigma"] ) + params["expected_rv_op_params"] = {"alpha": expected_alpha, "beta": 1 / expected_beta} + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_inverse_gamma_alpha_beta(self): - params = {"alpha": 2.0, "beta": 5.0} - self._compare_pymc_sampling_with_aesara(pm.InverseGamma, params, params.copy()) + params = { + "pymc_dist": pm.InverseGamma, + "pymc_dist_params": {"alpha": 2.0, "beta": 5.0}, + "expected_rv_op_params": {"alpha": 2.0, "beta": 5.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_inverse_gamma_mu_sigma(self): - params = {"mu": 0.5, "sigma": 0.25} + params = { + "pymc_dist": pm.InverseGamma, + "pymc_dist_params": {"mu": 0.5, "sigma": 0.25}, + } expected_alpha, expected_beta = pm.InverseGamma._get_alpha_beta( - alpha=None, beta=None, mu=params["mu"], sigma=params["sigma"] + alpha=None, + beta=None, + mu=params["pymc_dist_params"]["mu"], + sigma=params["pymc_dist_params"]["sigma"], ) - expected_params = {"alpha": expected_alpha, "beta": expected_beta} - self._compare_pymc_sampling_with_aesara(pm.InverseGamma, params, expected_params) + params["expected_rv_op_params"] = {"alpha": expected_alpha, "beta": expected_beta} + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_binomial(self): - params = {"n": 100, "p": 0.33} - self._compare_pymc_sampling_with_aesara(pm.Binomial, params, params.copy()) + params = { + "pymc_dist": pm.Binomial, + "pymc_dist_params": {"n": 100, "p": 0.33}, + "expected_rv_op_params": {"n": 100, "p": 0.33}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_negative_binomial(self): - params = {"n": 100, "p": 0.33} - self._compare_pymc_sampling_with_aesara(pm.NegativeBinomial, params, params.copy()) + params = { + "pymc_dist": pm.NegativeBinomial, + "pymc_dist_params": {"n": 100, "p": 0.33}, + "expected_rv_op_params": {"n": 100, "p": 0.33}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_negative_binomial_mu_sigma(self): - params = {"mu": 5.0, "alpha": 8.0} + params = { + "pymc_dist": pm.NegativeBinomial, + "pymc_dist_params": {"mu": 5.0, "alpha": 8.0}, + } expected_n, expected_p = pm.NegativeBinomial.get_n_p( - mu=params["mu"], alpha=params["alpha"], n=None, p=None + mu=params["pymc_dist_params"]["mu"], + alpha=params["pymc_dist_params"]["alpha"], + n=None, + p=None, ) - expected_params = {"n": expected_n, "p": expected_p} - self._compare_pymc_sampling_with_aesara(pm.NegativeBinomial, params, expected_params) + params["expected_rv_op_params"] = {"n": expected_n, "p": expected_p} + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_bernoulli(self): - params = {"p": 0.33} - self._compare_pymc_sampling_with_aesara(pm.Bernoulli, params, params.copy()) + params = { + "pymc_dist": pm.Bernoulli, + "pymc_dist_params": {"p": 0.33}, + "expected_rv_op_params": {"p": 0.33}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_bernoulli_logit_p(self): - params = {"logit_p": 1.0} - bernoulli_sample = pm.Bernoulli.dist(logit_p=params["logit_p"]) + params = { + "pymc_dist": pm.Bernoulli, + "pymc_dist_params": {"logit_p": 1.0}, + "expected_rv_op_params": {"mean": 0, "sigma": 10.0}, + } + bernoulli_sample = pm.Bernoulli.dist(logit_p=params["pymc_dist_params"]["logit_p"]) with pytest.raises(ValueError): bernoulli_sample.eval() def test_poisson(self): - params = {"mu": 4.0} - self._compare_pymc_sampling_with_aesara(pm.Poisson, params, params.copy()) + params = { + "pymc_dist": pm.Poisson, + "pymc_dist_params": {"mu": 4.0}, + "expected_rv_op_params": {"mu": 4.0}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_mv_normal_distribution(self): - params = {"mu": np.array([1.0, 2.0]), "cov": np.array([[2.0, 0.0], [0.0, 3.5]])} - self._compare_pymc_sampling_with_aesara(pm.MvNormal, params, params.copy()) + params = { + "pymc_dist": pm.MvNormal, + "pymc_dist_params": { + "mu": np.array([1.0, 2.0]), + "cov": np.array([[2.0, 0.0], [0.0, 3.5]]), + }, + "expected_rv_op_params": { + "mu": np.array([1.0, 2.0]), + "cov": np.array([[2.0, 0.0], [0.0, 3.5]]), + }, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_mv_normal_distribution_chol(self): - params = {"mu": np.array([1.0, 2.0]), "chol": np.array([[2.0, 0.0], [0.0, 3.5]])} - expected_cov = quaddist_matrix(chol=params["chol"]) - expected_params = {"mu": np.array([1.0, 2.0]), "cov": expected_cov.eval()} - self._compare_pymc_sampling_with_aesara(pm.MvNormal, params, expected_params) + params = { + "pymc_dist": pm.MvNormal, + "pymc_dist_params": { + "mu": np.array([1.0, 2.0]), + "chol": np.array([[2.0, 0.0], [0.0, 3.5]]), + }, + } + params["expected_rv_op_params"] = { + "mu": np.array([1.0, 2.0]), + "cov": quaddist_matrix(chol=params["pymc_dist_params"]["chol"]).eval(), + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_mv_normal_distribution_tau(self): - params = {"mu": np.array([1.0, 2.0]), "tau": np.array([[2.0, 0.0], [0.0, 3.5]])} - expected_cov = quaddist_matrix(tau=params["tau"]) - expected_params = {"mu": np.array([1.0, 2.0]), "cov": expected_cov.eval()} - self._compare_pymc_sampling_with_aesara(pm.MvNormal, params, expected_params) + params = { + "pymc_dist": pm.MvNormal, + "pymc_dist_params": { + "mu": np.array([1.0, 2.0]), + "tau": np.array([[2.0, 0.0], [0.0, 3.5]]), + }, + } + params["expected_rv_op_params"] = { + "mu": np.array([1.0, 2.0]), + "cov": quaddist_matrix(tau=params["pymc_dist_params"]["tau"]).eval(), + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_dirichlet(self): - params = {"a": np.array([1.0, 2.0])} - self._compare_pymc_sampling_with_aesara(pm.Dirichlet, params, params.copy()) + params = { + "pymc_dist": pm.Dirichlet, + "pymc_dist_params": {"a": np.array([1.0, 2.0])}, + "expected_rv_op_params": {"a": np.array([1.0, 2.0])}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_multinomial(self): - params = {"n": 85, "p": np.array([0.28, 0.62, 0.10])} - self._compare_pymc_sampling_with_aesara(pm.Multinomial, params, params.copy()) + params = { + "pymc_dist": pm.Multinomial, + "pymc_dist_params": {"n": 85, "p": np.array([0.28, 0.62, 0.10])}, + "expected_rv_op_params": {"n": 85, "p": np.array([0.28, 0.62, 0.10])}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) def test_categorical(self): - params = {"p": np.array([0.28, 0.62, 0.10])} - self._compare_pymc_sampling_with_aesara(pm.Categorical, params, params.copy()) + params = { + "pymc_dist": pm.Categorical, + "pymc_dist_params": {"p": np.array([0.28, 0.62, 0.10])}, + "expected_rv_op_params": {"p": np.array([0.28, 0.62, 0.10])}, + } + self._run_distribution_checks(params, [self._pymc_params_match_rv_op]) class TestScalarParameterSamples(SeededTest):