diff --git a/doubleml/irm/iivm.py b/doubleml/irm/iivm.py index 3f252f2a5..a43c0a035 100644 --- a/doubleml/irm/iivm.py +++ b/doubleml/irm/iivm.py @@ -1,4 +1,5 @@ import numpy as np +from scipy.stats import norm from sklearn.utils import check_X_y from sklearn.utils.multiclass import type_of_target @@ -12,7 +13,7 @@ _check_score, _check_trimming, ) -from doubleml.utils._estimation import _dml_cv_predict, _dml_tune, _get_cond_smpls +from doubleml.utils._estimation import _dml_cv_predict, _dml_tune, _get_cond_smpls, _solve_quadratic_inequality from doubleml.utils._propensity_score import _normalize_ipw, _trimm @@ -196,6 +197,23 @@ def __init__( self.subgroups = subgroups self._external_predictions_implemented = True + def __str__(self): + parent_str = super().__str__() + + # add robust confset + if self.framework is None: + confset_str = "" + else: + confset = self.robust_confset() + formatted_confset = ", ".join([f"[{lower:.4f}, {upper:.4f}]" for lower, upper in confset]) + confset_str = ( + "\n\n--------------- Additional Information ----------------\n" + + f"Robust Confidence Set: {formatted_confset}\n" + ) + + res = parent_str + confset_str + return res + @property def normalize_ipw(self): """ @@ -550,3 +568,45 @@ def _nuisance_tuning( def _sensitivity_element_est(self, preds): pass + + def robust_confset(self, level=0.95): + """ + Confidence sets for non-parametric instrumental variable models that are uniformly valid under weak instruments. + These are obtained by inverting a score-like test statistic based on estimated influence function. + + Parameters + ---------- + level : float + The confidence level. + Default is ``0.95``. + + Returns + ------- + list_confset : List + A list that contains tuples. Each tuple contains the lower and upper + bounds of an interval. The union of this intervals forms the confidence set. + """ + + if self.framework is None: + raise ValueError("Apply fit() before robust_confset().") + if not isinstance(level, float): + raise TypeError(f"The confidence level must be of float type. {str(level)} of type {str(type(level))} was passed.") + if (level <= 0) | (level >= 1): + raise ValueError(f"The confidence level must be in (0,1). {str(level)} was passed.") + + # compute critical values + alpha = 1 - level + critical_value = norm.ppf(1.0 - alpha / 2) + + # We need to find the thetas that solve the equation + # n * np.mean(score(theta))/np.mean(score(theta)**2) <= critical_value**2. + # This is equivalent to solving the equation + # a theta^2 + b theta + c <= 0 + # for some a, b, c, which we calculate next, and then solve the equation. + n = self.psi_elements["psi_a"].shape[0] + a = n * np.mean(self.psi_elements["psi_a"]) ** 2 - critical_value**2 * np.mean(np.square(self.psi_elements["psi_a"])) + b = 2 * n * np.mean(self.psi_elements["psi_a"]) * np.mean( + self.psi_elements["psi_b"] + ) - 2 * critical_value**2 * np.mean(np.multiply(self.psi_elements["psi_a"], self.psi_elements["psi_b"])) + c = n * np.mean(self.psi_elements["psi_b"]) ** 2 - critical_value**2 * np.mean(np.square(self.psi_elements["psi_b"])) + return _solve_quadratic_inequality(a, b, c) diff --git a/doubleml/irm/tests/test_iivm_unif_confset.py b/doubleml/irm/tests/test_iivm_unif_confset.py new file mode 100644 index 000000000..e25188c26 --- /dev/null +++ b/doubleml/irm/tests/test_iivm_unif_confset.py @@ -0,0 +1,98 @@ +import numpy as np +import pytest +from sklearn.ensemble import RandomForestClassifier +from sklearn.linear_model import LinearRegression, LogisticRegression + +import doubleml as dml + + +def generate_weak_iv_data(n_samples, instrument_size, true_ATE): + u = np.random.normal(0, 2, size=n_samples) + X = np.random.normal(0, 1, size=n_samples) + Z = np.random.binomial(1, 0.5, size=n_samples) + A = instrument_size * Z + u + A = np.array(A > 0, dtype=int) + Y = true_ATE * A + np.sign(u) + dml_data = dml.DoubleMLData.from_arrays(x=X, y=Y, d=A, z=Z) + return dml_data + + +@pytest.mark.ci +def test_coverage_robust_confset(): + # Test parameters + true_ATE = 0.5 + instrument_size = 0.005 + n_samples = 1000 + n_simulations = 100 + + np.random.seed(3141) + coverage = [] + for _ in range(n_simulations): + data = generate_weak_iv_data(n_samples, instrument_size, true_ATE) + + # Set machine learning methods + learner_g = LinearRegression() + classifier_m = LogisticRegression() + classifier_r = RandomForestClassifier(n_estimators=20, max_depth=5) + + # Create and fit new model + dml_iivm_obj = dml.DoubleMLIIVM(data, learner_g, classifier_m, classifier_r) + dml_iivm_obj.fit() + + # Get confidence set + conf_set = dml_iivm_obj.robust_confset() + + # check if conf_set is a list of tuples + assert isinstance(conf_set, list) + assert all(isinstance(x, tuple) and len(x) == 2 for x in conf_set) + + # Check if true ATE is in confidence set + ate_in_confset = any(x[0] < true_ATE < x[1] for x in conf_set) + coverage.append(ate_in_confset) + + # Calculate coverage rate + coverage_rate = np.mean(coverage) + assert coverage_rate >= 0.9, f"Coverage rate {coverage_rate} is below 0.9" + + +@pytest.mark.ci +def test_exceptions_robust_confset(): + # Test parameters + true_ATE = 0.5 + instrument_size = 0.005 + n_samples = 1000 + + np.random.seed(3141) + data = generate_weak_iv_data(n_samples, instrument_size, true_ATE) + + # create new model + learner_g = LinearRegression() + classifier_m = LogisticRegression() + classifier_r = RandomForestClassifier(n_estimators=20, max_depth=5) + dml_iivm_obj = dml.DoubleMLIIVM(data, learner_g, classifier_m, classifier_r) + + # Check if the robust_confset method raises an exception when called before fitting + msg = r"Apply fit\(\) before robust_confset\(\)." + with pytest.raises(ValueError, match=msg): + dml_iivm_obj.robust_confset() + + # Check if str representation of the object is working + str_repr = str(dml_iivm_obj) + assert isinstance(str_repr, str) + assert "Robust" not in str_repr + + # Fit the model + dml_iivm_obj.fit() + + # Check invalid inputs + msg = "The confidence level must be of float type. 0.95 of type was passed." + with pytest.raises(TypeError, match=msg): + dml_iivm_obj.robust_confset(level="0.95") + msg = r"The confidence level must be in \(0,1\). 1.5 was passed." + with pytest.raises(ValueError, match=msg): + dml_iivm_obj.robust_confset(level=1.5) + + # Check if str representation of the object is working + str_repr = str(dml_iivm_obj) + assert isinstance(str_repr, str) + assert "Robust Confidence Set" in str_repr diff --git a/doubleml/utils/_estimation.py b/doubleml/utils/_estimation.py index 25e6262d3..f04eb4f8b 100644 --- a/doubleml/utils/_estimation.py +++ b/doubleml/utils/_estimation.py @@ -341,3 +341,53 @@ def _set_external_predictions(external_predictions, learners, treatment, i_rep): else: ext_prediction_dict[learner] = None return ext_prediction_dict + + +def _solve_quadratic_inequality(a: float, b: float, c: float): + """ + Solves the quadratic inequation a*x^2 + b*x + c <= 0 and returns the intervals. + + Parameters + ---------- + a : float + Coefficient of x^2. + b : float + Coefficient of x. + c : float + Constant term. + + Returns + ------- + List[Tuple[float, float]] + A list of intervals where the inequation holds. + """ + + # Handle special cases + if abs(a) < 1e-12: # a is effectively zero + if abs(b) < 1e-12: # constant case + return [(-np.inf, np.inf)] if c <= 0 else [] + # Linear case: + else: + root = -c / b + return [(-np.inf, root)] if b > 0 else [(root, np.inf)] + + # Standard case: quadratic equation + roots = np.polynomial.polynomial.polyroots([c, b, a]) + real_roots = np.sort(roots[np.isreal(roots)].real) + + if len(real_roots) == 0: # No real roots + if a > 0: # parabola opens upwards, no real roots + return [] + else: # parabola opens downwards, always <= 0 + return [(-np.inf, np.inf)] + elif len(real_roots) == 1 or np.allclose(real_roots[0], real_roots[1]): # One real root + if a > 0: + return [(real_roots[0], real_roots[0])] # parabola touches x-axis at one point + else: + return [(-np.inf, np.inf)] # parabola is always <= 0 + else: + assert len(real_roots) == 2 + if a > 0: # happy quadratic (parabola opens upwards) + return [(real_roots[0], real_roots[1])] + else: # sad quadratic (parabola opens downwards) + return [(-np.inf, real_roots[0]), (real_roots[1], np.inf)] diff --git a/doubleml/utils/tests/test_quadratic_inequality.py b/doubleml/utils/tests/test_quadratic_inequality.py new file mode 100644 index 000000000..68faff0fe --- /dev/null +++ b/doubleml/utils/tests/test_quadratic_inequality.py @@ -0,0 +1,41 @@ +import numpy as np +import pytest + +from doubleml.utils._estimation import _solve_quadratic_inequality + + +@pytest.mark.parametrize( + "a, b, c, expected", + [ + (1, 0, -4, [(-2.0, 2.0)]), # happy quadratic, determinant > 0 + (-1, 0, 4, [(-np.inf, -2), (2, np.inf)]), # sad quadratic, determinant > 0 + (1, 0, 4, []), # happy quadratic, determinant < 0 + (-1, 0, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant < 0 + (1, 0, 0, [(0.0, 0.0)]), # happy quadratic, determinant = 0 + (-1, 0, 0, [(-np.inf, np.inf)]), # sad quadratic, determinant = 0 + (1, 3, -4, [(-4.0, 1.0)]), # happy quadratic, determinant > 0 + (-1, 3, 4, [(-np.inf, -1), (4, np.inf)]), # sad quadratic, determinant > 0 + (-1, -3, 4, [(-np.inf, -4), (1, np.inf)]), # sad quadratic, determinant > 0 + (1, 3, 4, []), # happy quadratic, determinant < 0 + (-1, 3, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant < 0 + (1, 4, 4, [(-2.0, -2.0)]), # happy quadratic, determinant = 0 + (-1, 4, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant = 0 + (0, 0, 0, [(-np.inf, np.inf)]), # constant and equal to zero + (0, 0, 1, []), # constant and larger than zero + (0, 1, 0, [(-np.inf, 0.0)]), # increasing linear function + (0, -1, -1, [(-1.0, np.inf)]), # decreasing linear function + ], +) +def test_solve_quadratic_inequation(a, b, c, expected): + result = _solve_quadratic_inequality(a, b, c) + + assert len(result) == len(expected), f"Expected {len(expected)} intervals but got {len(result)}" + + for i, tpl in enumerate(result): + if tpl[0] == -np.inf: + assert np.isinf(tpl[0]) + if tpl[1] == np.inf: + assert np.isinf(tpl[1]) + else: + assert np.isclose(tpl[0], expected[i][0]), f"Expected {expected[i][0]} but got {tpl[0]}" + assert np.isclose(tpl[1], expected[i][1]), f"Expected {expected[i][1]} but got {tpl[1]}"