Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion doubleml/irm/iivm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
from scipy.stats import norm
from sklearn.utils import check_X_y
from sklearn.utils.multiclass import type_of_target

Expand All @@ -12,7 +13,7 @@
_check_score,
_check_trimming,
)
from doubleml.utils._estimation import _dml_cv_predict, _dml_tune, _get_cond_smpls
from doubleml.utils._estimation import _dml_cv_predict, _dml_tune, _get_cond_smpls, _solve_quadratic_inequality
from doubleml.utils._propensity_score import _normalize_ipw, _trimm


Expand Down Expand Up @@ -196,6 +197,23 @@ def __init__(
self.subgroups = subgroups
self._external_predictions_implemented = True

def __str__(self):
parent_str = super().__str__()

# add robust confset
if self.framework is None:
confset_str = ""
else:
confset = self.robust_confset()
formatted_confset = ", ".join([f"[{lower:.4f}, {upper:.4f}]" for lower, upper in confset])
confset_str = (
"\n\n--------------- Additional Information ----------------\n"
+ f"Robust Confidence Set: {formatted_confset}\n"
)

res = parent_str + confset_str
return res

@property
def normalize_ipw(self):
"""
Expand Down Expand Up @@ -550,3 +568,45 @@ def _nuisance_tuning(

def _sensitivity_element_est(self, preds):
pass

def robust_confset(self, level=0.95):
"""
Confidence sets for non-parametric instrumental variable models that are uniformly valid under weak instruments.
These are obtained by inverting a score-like test statistic based on estimated influence function.

Parameters
----------
level : float
The confidence level.
Default is ``0.95``.

Returns
-------
list_confset : List
A list that contains tuples. Each tuple contains the lower and upper
bounds of an interval. The union of this intervals forms the confidence set.
"""

if self.framework is None:
raise ValueError("Apply fit() before robust_confset().")
if not isinstance(level, float):
raise TypeError(f"The confidence level must be of float type. {str(level)} of type {str(type(level))} was passed.")
if (level <= 0) | (level >= 1):
raise ValueError(f"The confidence level must be in (0,1). {str(level)} was passed.")

# compute critical values
alpha = 1 - level
critical_value = norm.ppf(1.0 - alpha / 2)

# We need to find the thetas that solve the equation
# n * np.mean(score(theta))/np.mean(score(theta)**2) <= critical_value**2.
# This is equivalent to solving the equation
# a theta^2 + b theta + c <= 0
# for some a, b, c, which we calculate next, and then solve the equation.
n = self.psi_elements["psi_a"].shape[0]
a = n * np.mean(self.psi_elements["psi_a"]) ** 2 - critical_value**2 * np.mean(np.square(self.psi_elements["psi_a"]))
b = 2 * n * np.mean(self.psi_elements["psi_a"]) * np.mean(
self.psi_elements["psi_b"]
) - 2 * critical_value**2 * np.mean(np.multiply(self.psi_elements["psi_a"], self.psi_elements["psi_b"]))
c = n * np.mean(self.psi_elements["psi_b"]) ** 2 - critical_value**2 * np.mean(np.square(self.psi_elements["psi_b"]))
return _solve_quadratic_inequality(a, b, c)
98 changes: 98 additions & 0 deletions doubleml/irm/tests/test_iivm_unif_confset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import numpy as np
import pytest
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression

import doubleml as dml


def generate_weak_iv_data(n_samples, instrument_size, true_ATE):
u = np.random.normal(0, 2, size=n_samples)
X = np.random.normal(0, 1, size=n_samples)
Z = np.random.binomial(1, 0.5, size=n_samples)
A = instrument_size * Z + u
A = np.array(A > 0, dtype=int)
Y = true_ATE * A + np.sign(u)
dml_data = dml.DoubleMLData.from_arrays(x=X, y=Y, d=A, z=Z)
return dml_data


@pytest.mark.ci
def test_coverage_robust_confset():
# Test parameters
true_ATE = 0.5
instrument_size = 0.005
n_samples = 1000
n_simulations = 100

np.random.seed(3141)
coverage = []
for _ in range(n_simulations):
data = generate_weak_iv_data(n_samples, instrument_size, true_ATE)

# Set machine learning methods
learner_g = LinearRegression()
classifier_m = LogisticRegression()
classifier_r = RandomForestClassifier(n_estimators=20, max_depth=5)

# Create and fit new model
dml_iivm_obj = dml.DoubleMLIIVM(data, learner_g, classifier_m, classifier_r)
dml_iivm_obj.fit()

# Get confidence set
conf_set = dml_iivm_obj.robust_confset()

# check if conf_set is a list of tuples
assert isinstance(conf_set, list)
assert all(isinstance(x, tuple) and len(x) == 2 for x in conf_set)

# Check if true ATE is in confidence set
ate_in_confset = any(x[0] < true_ATE < x[1] for x in conf_set)
coverage.append(ate_in_confset)

# Calculate coverage rate
coverage_rate = np.mean(coverage)
assert coverage_rate >= 0.9, f"Coverage rate {coverage_rate} is below 0.9"


@pytest.mark.ci
def test_exceptions_robust_confset():
# Test parameters
true_ATE = 0.5
instrument_size = 0.005
n_samples = 1000

np.random.seed(3141)
data = generate_weak_iv_data(n_samples, instrument_size, true_ATE)

# create new model
learner_g = LinearRegression()
classifier_m = LogisticRegression()
classifier_r = RandomForestClassifier(n_estimators=20, max_depth=5)
dml_iivm_obj = dml.DoubleMLIIVM(data, learner_g, classifier_m, classifier_r)

# Check if the robust_confset method raises an exception when called before fitting
msg = r"Apply fit\(\) before robust_confset\(\)."
with pytest.raises(ValueError, match=msg):
dml_iivm_obj.robust_confset()

# Check if str representation of the object is working
str_repr = str(dml_iivm_obj)
assert isinstance(str_repr, str)
assert "Robust" not in str_repr

# Fit the model
dml_iivm_obj.fit()

# Check invalid inputs
msg = "The confidence level must be of float type. 0.95 of type <class 'str'> was passed."
with pytest.raises(TypeError, match=msg):
dml_iivm_obj.robust_confset(level="0.95")
msg = r"The confidence level must be in \(0,1\). 1.5 was passed."
with pytest.raises(ValueError, match=msg):
dml_iivm_obj.robust_confset(level=1.5)

# Check if str representation of the object is working
str_repr = str(dml_iivm_obj)
assert isinstance(str_repr, str)
assert "Robust Confidence Set" in str_repr
50 changes: 50 additions & 0 deletions doubleml/utils/_estimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,53 @@ def _set_external_predictions(external_predictions, learners, treatment, i_rep):
else:
ext_prediction_dict[learner] = None
return ext_prediction_dict


def _solve_quadratic_inequality(a: float, b: float, c: float):
"""
Solves the quadratic inequation a*x^2 + b*x + c <= 0 and returns the intervals.

Parameters
----------
a : float
Coefficient of x^2.
b : float
Coefficient of x.
c : float
Constant term.

Returns
-------
List[Tuple[float, float]]
A list of intervals where the inequation holds.
"""

# Handle special cases
if abs(a) < 1e-12: # a is effectively zero
if abs(b) < 1e-12: # constant case
return [(-np.inf, np.inf)] if c <= 0 else []
# Linear case:
else:
root = -c / b
return [(-np.inf, root)] if b > 0 else [(root, np.inf)]

# Standard case: quadratic equation
roots = np.polynomial.polynomial.polyroots([c, b, a])
real_roots = np.sort(roots[np.isreal(roots)].real)

if len(real_roots) == 0: # No real roots
if a > 0: # parabola opens upwards, no real roots
return []
else: # parabola opens downwards, always <= 0
return [(-np.inf, np.inf)]
elif len(real_roots) == 1 or np.allclose(real_roots[0], real_roots[1]): # One real root
if a > 0:
return [(real_roots[0], real_roots[0])] # parabola touches x-axis at one point
else:
return [(-np.inf, np.inf)] # parabola is always <= 0
else:
assert len(real_roots) == 2
if a > 0: # happy quadratic (parabola opens upwards)
return [(real_roots[0], real_roots[1])]
else: # sad quadratic (parabola opens downwards)
return [(-np.inf, real_roots[0]), (real_roots[1], np.inf)]
41 changes: 41 additions & 0 deletions doubleml/utils/tests/test_quadratic_inequality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import numpy as np
import pytest

from doubleml.utils._estimation import _solve_quadratic_inequality


@pytest.mark.parametrize(
"a, b, c, expected",
[
(1, 0, -4, [(-2.0, 2.0)]), # happy quadratic, determinant > 0
(-1, 0, 4, [(-np.inf, -2), (2, np.inf)]), # sad quadratic, determinant > 0
(1, 0, 4, []), # happy quadratic, determinant < 0
(-1, 0, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant < 0
(1, 0, 0, [(0.0, 0.0)]), # happy quadratic, determinant = 0
(-1, 0, 0, [(-np.inf, np.inf)]), # sad quadratic, determinant = 0
(1, 3, -4, [(-4.0, 1.0)]), # happy quadratic, determinant > 0
(-1, 3, 4, [(-np.inf, -1), (4, np.inf)]), # sad quadratic, determinant > 0
(-1, -3, 4, [(-np.inf, -4), (1, np.inf)]), # sad quadratic, determinant > 0
(1, 3, 4, []), # happy quadratic, determinant < 0
(-1, 3, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant < 0
(1, 4, 4, [(-2.0, -2.0)]), # happy quadratic, determinant = 0
(-1, 4, -4, [(-np.inf, np.inf)]), # sad quadratic, determinant = 0
(0, 0, 0, [(-np.inf, np.inf)]), # constant and equal to zero
(0, 0, 1, []), # constant and larger than zero
(0, 1, 0, [(-np.inf, 0.0)]), # increasing linear function
(0, -1, -1, [(-1.0, np.inf)]), # decreasing linear function
],
)
def test_solve_quadratic_inequation(a, b, c, expected):
result = _solve_quadratic_inequality(a, b, c)

assert len(result) == len(expected), f"Expected {len(expected)} intervals but got {len(result)}"

for i, tpl in enumerate(result):
if tpl[0] == -np.inf:
assert np.isinf(tpl[0])
if tpl[1] == np.inf:
assert np.isinf(tpl[1])
else:
assert np.isclose(tpl[0], expected[i][0]), f"Expected {expected[i][0]} but got {tpl[0]}"
assert np.isclose(tpl[1], expected[i][1]), f"Expected {expected[i][1]} but got {tpl[1]}"
Loading