diff --git a/chirho/robust/ops.py b/chirho/robust/ops.py index 3ae74d98..e9acd198 100644 --- a/chirho/robust/ops.py +++ b/chirho/robust/ops.py @@ -19,8 +19,8 @@ def influence_fn( model: Callable[P, Any], guide: Callable[P, Any], functional: Optional[Functional[P, S]] = None, - **linearize_kwargs -) -> Callable[Concatenate[Point[T], P], S]: + **linearize_kwargs, +) -> Callable[Concatenate[Point[T], bool, P], S]: from chirho.robust.internals.linearize import linearize from chirho.robust.internals.predictive import PredictiveFunctional from chirho.robust.internals.utils import make_functional_call @@ -43,7 +43,7 @@ def _fn( points: Point[T], pointwise_influence: bool = False, *args: P.args, - **kwargs: P.kwargs + **kwargs: P.kwargs, ) -> S: param_eif = linearized( points, pointwise_influence=pointwise_influence, *args, **kwargs @@ -53,6 +53,21 @@ def _fn( lambda p: func_target(p, *args, **kwargs), (target_params,), (d,) )[1], in_dims=0, + randomness="different", )(param_eif) return _fn + + +def one_step_correction( + model: Callable[P, Any], + guide: Callable[P, Any], + test_data: Point[T], + functional: Optional[Functional[P, S]] = None, + **influence_kwargs, +) -> Callable[P, S]: + def _one_step(*args, **kwargs) -> S: + eif_fn = influence_fn(model, guide, functional, **influence_kwargs) + return eif_fn(test_data, pointwise_influence=False, *args, **kwargs) + + return _one_step diff --git a/docs/source/automated_dr_learner.ipynb b/docs/source/automated_dr_learner.ipynb index 8631edc6..3c6d2465 100644 --- a/docs/source/automated_dr_learner.ipynb +++ b/docs/source/automated_dr_learner.ipynb @@ -8,32 +8,73 @@ "# Automated doubly robust estimation with ChiRho" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Outline\n", + "\n", + "- [Setup](#setup)\n", + "\n", + "- [Overview: Systematically adjusting for observed confounding](#overview:-systematically-adjusting-for-observed-confounding)\n", + " - [Task: Treatment effect estimation with observational data](#task:-treatment-effect-estimation-with-observational-data)\n", + " - [Challenge: Confounding](#challenge:-confounding)\n", + " - [Assumptions: All confounders observed](#assumptions:-all-confounders-observed)\n", + " - [Intuition: Statistically adjusting for confounding](#intuition:-statistically-adjusting-for-confounding)\n", + "\n", + "- [Causal Probabilistic Program](#causal-probabilistic-program)\n", + " - [Model description](#model-description)\n", + " - [Generating data](#generating-data)\n", + " - [Fit parameters via maximum likelihood](#fit-parameters-via-maximum-likelihood)\n", + "\n", + "- [Causal Query: average treatment effect (ATE)](#causal-query:-average-treatment-effect-\\(ATE\\))\n", + " - [Defining the target functional](#defining-the-target-functional)\n", + " - [Closed form doubly robust correction](#closed-form-doubly-robust-correction)\n", + " - [Computing automated doubly robust correction via Monte Carlo](#computing-automated-doubly-robust-correction-via-monte-carlo)\n", + " - [Results](#results)\n", + "\n", + "- [References](#references)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here, we install the necessary Pytorch, Pyro, and ChiRho dependencies for this example." + ] + }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 38, "metadata": {}, "outputs": [], "source": [ - "import collections\n", + "from typing import Callable, Optional, Tuple\n", + "\n", "import functools\n", + "import torch\n", "import math\n", "import seaborn as sns\n", + "import pandas as pd\n", + "import numpy as np\n", "import matplotlib.pyplot as plt\n", - "from typing import Callable, Optional, Tuple, TypedDict, TypeVar\n", "\n", - "import torch\n", "import pyro\n", "import pyro.distributions as dist\n", - "from pyro.infer.autoguide import AutoNormal\n", "from pyro.infer import Predictive\n", - "from typing import Callable, Dict, List, Optional, Tuple, Union\n", - "from pyro.nn import PyroModule, PyroParam, PyroSample\n", - "from chirho.robust.internals.utils import ParamDict\n", - "from chirho.robust.ops import Point\n", - "from chirho.robust.ops import influence_fn\n", "\n", - "from chirho.observational.handlers import condition\n", - "from chirho.robust.internals.linearize import linearize\n", + "from chirho.counterfactual.handlers import MultiWorldCounterfactual\n", + "from chirho.indexed.ops import IndexSet, gather\n", + "from chirho.interventional.handlers import do\n", + "from chirho.robust.internals.utils import ParamDict\n", + "from chirho.robust.ops import one_step_correction \n", "\n", "pyro.settings.set(module_local_params=True)\n", "\n", @@ -42,12 +83,53 @@ "pyro.set_rng_seed(321) # for reproducibility" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Overview\n", + "\n", + "In this tutorial, we will use ChiRho to estimate the average treatment effect (ATE) from observational data. We will use a simple example to illustrate the basic concepts of doubly robust estimation and how ChiRho can be used to automate the process for more general summaries of interest. \n", + "\n", + "There are five main steps to our doubly robust estimation procedure but only the last step is different from a standard probabilistic programming workflow:\n", + "1. Write model of interest\n", + " - Define probabilistic model of interest using Pyro\n", + "2. Feed in data\n", + " - Observed data used to train the model\n", + "3. Run inference\n", + " - Use Pyro's rich inference library to fit the model to the data\n", + "4. Define target functional\n", + " - This is the model summary of interest (e.g. average treatment effect)\n", + "5. Compute robust estimate\n", + " - Use ChiRho to compute the doubly robust estimate of the target functional\n", + " - Importantly, this step is automated and does not require refitting the model for each new functional\n", + "\n", + "\n", + "Our proposed automated robust inference pipeline is summarized in the figure below.\n", + "\n", + "![fig1](figures/robust_pipeline.png)" + ] + }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "# Probabilistic model for the data generating process" + "## Causal Probabilistic Program\n", + "\n", + "### Model Description\n", + "In this example, we will focus on a cannonical model `CausalGLM` consisting of three types of variables: binary treatment (`A`), confounders (`X`), and response (`Y`). For simplicitly, we assume that the response is generated from a generalized linear model with link function $g$. The model is described by the following generative process:\n", + "\n", + "$$\n", + "\\begin{align*}\n", + "X &\\sim \\text{Normal}(0, I_p) \\\\\n", + "A &\\sim \\text{Bernoulli}(\\pi(X)) \\\\\n", + "\\mu &= \\beta_0 + \\beta_1^T X + \\tau A \\\\\n", + "Y &\\sim \\text{ExponentialFamily}(\\text{mean} = g^{-1}(\\mu))\n", + "\\end{align*}\n", + "$$\n", + "\n", + "where $p$ denotes the number of confounders, $\\pi(X)$ is the probability of treatment conditional on confounders $X$, $\\beta_0$ is the intercept, $\\beta_1$ is the confounder effect, and $\\tau$ is the treatment effect." ] }, { @@ -56,23 +138,7 @@ "metadata": {}, "outputs": [], "source": [ - "class DataConditionedModel(PyroModule):\n", - " r\"\"\"\n", - " Helper class for conditioning on data.\n", - " \"\"\"\n", - "\n", - " def __init__(self, model: PyroModule):\n", - " super().__init__()\n", - " self.model = model\n", - "\n", - " def forward(self, D: Point[torch.Tensor]):\n", - " with condition(data=D):\n", - " # Assume first dimension corresponds to # of datapoints\n", - " N = D[next(iter(D))].shape[0]\n", - " return self.model(N=N)\n", - "\n", - "\n", - "class HighDimLinearModel(pyro.nn.PyroModule):\n", + "class CausalGLM(pyro.nn.PyroModule):\n", " def __init__(\n", " self,\n", " p: int,\n", @@ -106,50 +172,239 @@ " return pyro.sample(\"treatment_weight\", dist.Normal(0.0, 1.0))\n", "\n", " def sample_covariate_loc_scale(self):\n", - " loc = pyro.sample(\n", - " \"covariate_loc\", dist.Normal(0.0, 1.0).expand((self.p,)).to_event(1)\n", - " )\n", - " scale = pyro.sample(\n", - " \"covariate_scale\", dist.LogNormal(0, 1).expand((self.p,)).to_event(1)\n", + " return torch.zeros(self.p), torch.ones(self.p)\n", + "\n", + " def forward(self):\n", + " intercept = self.sample_intercept()\n", + " outcome_weights = self.sample_outcome_weights()\n", + " propensity_weights = self.sample_propensity_weights()\n", + " tau = self.sample_treatment_weight()\n", + " x_loc, x_scale = self.sample_covariate_loc_scale()\n", + " X = pyro.sample(\"X\", dist.Normal(x_loc, x_scale).to_event(1))\n", + " A = pyro.sample(\n", + " \"A\",\n", + " dist.Bernoulli(\n", + " logits=torch.einsum(\"...i,...i->...\", X, propensity_weights)\n", + " ),\n", " )\n", - " return loc, scale\n", "\n", - " def forward(self, N: int = 1):\n", + " return pyro.sample(\n", + " \"Y\",\n", + " self.link_fn(\n", + " torch.einsum(\"...i,...i->...\", X, outcome_weights) + A * tau + intercept\n", + " ),\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we will condition on both treatment and confounders to estimate the causal effect of treatment on the outcome. We will use the following causal probabilistic program to do so:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "class ConditionedCausalGLM(CausalGLM):\n", + " def __init__(\n", + " self,\n", + " X: torch.Tensor,\n", + " A: torch.Tensor,\n", + " Y: torch.Tensor,\n", + " link_fn: Callable[..., dist.Distribution] = lambda mu: dist.Normal(mu, 1.0),\n", + " prior_scale: Optional[float] = None,\n", + " ):\n", + " p = X.shape[1]\n", + " super().__init__(p, link_fn, prior_scale)\n", + " self.X = X\n", + " self.A = A\n", + " self.Y = Y\n", + "\n", + " def forward(self):\n", " intercept = self.sample_intercept()\n", " outcome_weights = self.sample_outcome_weights()\n", " propensity_weights = self.sample_propensity_weights()\n", " tau = self.sample_treatment_weight()\n", " x_loc, x_scale = self.sample_covariate_loc_scale()\n", - " with pyro.plate(\"obs\", N, dim=-1):\n", - " X = pyro.sample(\"X\", dist.Normal(x_loc, x_scale).to_event(1))\n", + " with pyro.plate(\"__train__\", size=self.X.shape[0], dim=-1):\n", + " X = pyro.sample(\"X\", dist.Normal(x_loc, x_scale).to_event(1), obs=self.X)\n", " A = pyro.sample(\n", " \"A\",\n", " dist.Bernoulli(\n", - " logits=torch.einsum(\"...np,...ap->...n\", X, propensity_weights.expand(torch.broadcast_shapes(propensity_weights.shape, (1,) + propensity_weights.shape[-1:])))\n", + " logits=torch.einsum(\"ni,i->n\", self.X, propensity_weights)\n", " ),\n", + " obs=self.A,\n", " )\n", - " return pyro.sample(\n", + " pyro.sample(\n", " \"Y\",\n", " self.link_fn(\n", - " torch.einsum(\"...np,...ap->...n\", X, outcome_weights.expand(torch.broadcast_shapes(outcome_weights.shape, (1,) + outcome_weights.shape[-1:])))\n", + " torch.einsum(\"ni,i->n\", X, outcome_weights)\n", " + A * tau\n", " + intercept\n", " ),\n", - " )\n", - "\n", - "\n", - "class KnownCovariateDistModel(HighDimLinearModel):\n", - " def sample_covariate_loc_scale(self):\n", - " return torch.zeros(self.p), torch.ones(self.p)\n", - "\n", + " obs=self.Y,\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster___train__\n", + "\n", + "__train__\n", + "\n", + "\n", + "\n", + "intercept\n", + "\n", + "intercept\n", + "\n", + "\n", + "\n", + "Y\n", + "\n", + "Y\n", + "\n", + "\n", + "\n", + "intercept->Y\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "outcome_weights\n", + "\n", + "outcome_weights\n", + "\n", + "\n", + "\n", + "outcome_weights->Y\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "propensity_weights\n", + "\n", + "propensity_weights\n", + "\n", + "\n", + "\n", + "A\n", + "\n", + "A\n", + "\n", + "\n", + "\n", + "propensity_weights->A\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "treatment_weight\n", + "\n", + "treatment_weight\n", + "\n", + "\n", + "\n", + "treatment_weight->Y\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "X\n", + "\n", + "X\n", + "\n", + "\n", + "\n", + "X->Y\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "A->Y\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "distribution_description_node\n", + "intercept ~ Normal\n", + "outcome_weights ~ Normal\n", + "propensity_weights ~ Normal\n", + "treatment_weight ~ Normal\n", + "X ~ Normal\n", + "A ~ Bernoulli\n", + "Y ~ Normal\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Visualize the model\n", + "pyro.render_model(\n", + " ConditionedCausalGLM(torch.zeros(1, 1), torch.zeros(1), torch.zeros(1)),\n", + " render_params=True, \n", + " render_distributions=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Generating data\n", "\n", - "class BenchmarkLinearModel(HighDimLinearModel):\n", + "For evaluation, we generate `N_datasets` datasets, each with `N` samples. We compare vanilla estimates of the target functional with the double robust estimates of the target functional across the `N_sims` datasets. We use a similar data generating process as in Kennedy (2022)." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "class GroundTruthModel(CausalGLM):\n", " def __init__(\n", " self,\n", " p: int,\n", - " link_fn: Callable[..., dist.Distribution],\n", " alpha: int,\n", " beta: int,\n", + " link_fn: Callable[..., dist.Distribution] = lambda mu: dist.Normal(mu, 1.0),\n", " treatment_weight: float = 0.0,\n", " ):\n", " super().__init__(p, link_fn)\n", @@ -162,9 +417,6 @@ " outcome_weights[self.beta :] = 0.0\n", " return outcome_weights\n", "\n", - " def sample_treatment_null_weight(self):\n", - " return torch.tensor(0.0)\n", - "\n", " def sample_propensity_weights(self):\n", " propensity_weights = 1 / math.sqrt(self.alpha) * torch.ones(self.p)\n", " propensity_weights[self.alpha :] = 0.0\n", @@ -174,79 +426,106 @@ " return torch.tensor(self.treatment_weight)\n", "\n", " def sample_intercept(self):\n", - " return torch.tensor(0.0)\n", - "\n", - " def sample_covariate_loc_scale(self):\n", - " return torch.zeros(self.p), torch.ones(self.p)\n", + " return torch.tensor(0.0)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "N_datasets = 100\n", + "simulated_datasets = []\n", "\n", + "# Data configuration\n", + "p = 200\n", + "alpha = 50\n", + "beta = 50\n", + "N_train = 500\n", + "N_test = 500\n", "\n", - "class MLEGuide(torch.nn.Module):\n", - " def __init__(self, mle_est: ParamDict):\n", - " super().__init__()\n", - " self.names = list(mle_est.keys())\n", - " for name, value in mle_est.items():\n", - " setattr(self, name + \"_param\", torch.nn.Parameter(value))\n", + "true_model = GroundTruthModel(p, alpha, beta)\n", "\n", - " def forward(self, *args, **kwargs):\n", - " for name in self.names:\n", - " value = getattr(self, name + \"_param\")\n", - " pyro.sample(name, dist.Delta(value, event_dim=len(value.shape)))" + "for _ in range(N_datasets):\n", + " # Generate data\n", + " D_train = Predictive(\n", + " true_model, num_samples=N_train, return_sites=[\"X\", \"A\", \"Y\"]\n", + " )()\n", + " D_test = Predictive(\n", + " true_model, num_samples=N_test, return_sites=[\"X\", \"A\", \"Y\"]\n", + " )()\n", + " simulated_datasets.append((D_train, D_test))" ] }, { - "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "# Closed-form influence function for average treatment effect" + "### Fit parameters via maximum likelihood" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ - "class ATETestPoint(TypedDict):\n", - " X: torch.Tensor\n", - " A: torch.Tensor\n", - " Y: torch.Tensor\n", + "fitted_params = []\n", + "for i in range(N_datasets):\n", + " # Generate data\n", + " D_train = simulated_datasets[i][0]\n", "\n", + " # Fit model using maximum likelihood\n", + " conditioned_model = ConditionedCausalGLM(\n", + " X=D_train[\"X\"], A=D_train[\"A\"], Y=D_train[\"Y\"]\n", + " )\n", + " \n", + " guide_train = pyro.infer.autoguide.AutoDelta(conditioned_model)\n", + " elbo = pyro.infer.Trace_ELBO()(conditioned_model, guide_train)\n", "\n", - "class ATEParamDict(TypedDict):\n", - " propensity_weights: torch.Tensor\n", - " outcome_weights: torch.Tensor\n", - " treatment_weight: torch.Tensor\n", - " intercept: torch.Tensor\n", + " # initialize parameters\n", + " elbo()\n", + " adam = torch.optim.Adam(elbo.parameters(), lr=0.03)\n", "\n", + " # Do gradient steps\n", + " for _ in range(2000):\n", + " adam.zero_grad()\n", + " loss = elbo()\n", + " loss.backward()\n", + " adam.step()\n", "\n", - "def closed_form_ate_correction(\n", - " X_test: ATETestPoint, theta: ATEParamDict\n", - ") -> Tuple[torch.Tensor, torch.Tensor]:\n", - " X = X_test[\"X\"]\n", - " A = X_test[\"A\"]\n", - " Y = X_test[\"Y\"]\n", - " pi_X = torch.sigmoid(X.mv(theta[\"propensity_weights\"]))\n", - " mu_X = (\n", - " X.mv(theta[\"outcome_weights\"])\n", - " + A * theta[\"treatment_weight\"]\n", - " + theta[\"intercept\"]\n", - " )\n", - " analytic_eif_at_test_pts = (A / pi_X - (1 - A) / (1 - pi_X)) * (Y - mu_X)\n", - " analytic_correction = analytic_eif_at_test_pts.mean()\n", - " return analytic_correction, analytic_eif_at_test_pts" + " theta_hat = {\n", + " k: v.clone().detach().requires_grad_(True) for k, v in guide_train().items()\n", + " }\n", + " fitted_params.append(theta_hat)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Causal Query: Average treatment effect (ATE)\n", + "\n", + "The average treatment effect summarizes, on average, how much the treatment changes the response, $ATE = \\mathbb{E}[Y|do(A=1)] - \\mathbb{E}[Y|do(A=0)]$. The `do` notation indicates that the expectations are taken according to *intervened* versions of the model, with $A$ set to a particular value. Note from our [tutorial](tutorial_i.ipynb) that this is different from conditioning on $A$ in the original `causal_model`, which assumes $X$ and $T$ are dependent.\n", + "\n", + "\n", + "To implement this query in ChiRho, we define the `ATEFunctional` class which take in a `model` and `guide` and returns the average treatment effect by simulating from the posterior predictive distribution of the model and guide." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Defining the target functional" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ - "from chirho.counterfactual.handlers import MultiWorldCounterfactual\n", - "from chirho.indexed.ops import IndexSet, gather\n", - "from chirho.interventional.handlers import do\n", - "\n", "class ATEFunctional(torch.nn.Module):\n", " def __init__(self, model: Callable, guide: Callable, num_monte_carlo: int = 7):\n", " super().__init__()\n", @@ -255,225 +534,269 @@ " self.num_monte_carlo = num_monte_carlo\n", " \n", " def forward(self, *args, **kwargs):\n", - "\n", " with MultiWorldCounterfactual():\n", " with pyro.plate(\"monte_carlo_functional\", size=self.num_monte_carlo, dim=-2):\n", - " posterior_guide_samples = pyro.poutine.trace(\n", - " self.guide\n", - " ).get_trace(*args, **kwargs)\n", + " posterior_guide_samples = pyro.poutine.trace(self.guide).get_trace(\n", + " *args, **kwargs\n", + " )\n", " model_at_theta = pyro.poutine.replay(\n", " trace=posterior_guide_samples\n", " )(self.model)\n", " with do(actions=dict(A=(torch.tensor(0.0), torch.tensor(1.0)))):\n", - " with pyro.poutine.trace() as tr:\n", - " Ys = model_at_theta(*args, **kwargs)\n", + " Ys = model_at_theta(*args, **kwargs)\n", " Y0 = gather(Ys, IndexSet(A={1}), event_dim=0)\n", " Y1 = gather(Ys, IndexSet(A={2}), event_dim=0)\n", - " return pyro.deterministic(\"ATE\", (Y1 - Y0).mean(dim=-2, keepdim=True).mean(dim=-1, keepdim=True).squeeze())\n" + " ate = (Y1 - Y0).mean(dim=-2, keepdim=True).mean(dim=-1, keepdim=True).squeeze()\n", + " return pyro.deterministic(\"ATE\", ate)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Closed form doubly robust correction\n", + "\n", + "For the average treatment effect functional, there exists a closed-form analytical formula for the doubly robust correction. This formula is derived in Kennedy (2022) and is implemented below:" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ - "def one_step_correction(\n", - " model: Callable,\n", - " guide: Callable,\n", - " target: Callable,\n", - " test_data: Dict[str, torch.Tensor],\n", - " **influence_kwargs, \n", - ") -> Callable:\n", - " def _one_step(*args, **kwargs):\n", - " eif_fn = influence_fn(model, guide, target, **influence_kwargs)\n", - " batched_eif = torch.func.vmap(lambda data: eif_fn(data, *args, **kwargs), randomness='different')\n", - " return batched_eif(test_data).mean(axis=0)\n", - " # result = 0\n", - " # num_samples = next(iter(test_data.values())).shape[0]\n", - " # for i in range(num_samples):\n", - " # datapoint = {k: v[i] for k, v in test_data.items()}\n", - " # result = result + eif_fn(datapoint, *args, **kwargs) / num_samples\n", - " # return result\n", - " return _one_step" + "# Closed form expression\n", + "def closed_form_doubly_robust_ate_correction(X_test, theta) -> Tuple[torch.Tensor, torch.Tensor]:\n", + " X = X_test[\"X\"]\n", + " A = X_test[\"A\"]\n", + " Y = X_test[\"Y\"]\n", + " pi_X = torch.sigmoid(X.mv(theta[\"propensity_weights\"]))\n", + " mu_X = (\n", + " X.mv(theta[\"outcome_weights\"])\n", + " + A * theta[\"treatment_weight\"]\n", + " + theta[\"intercept\"]\n", + " )\n", + " analytic_eif_at_test_pts = (A / pi_X - (1 - A) / (1 - pi_X)) * (Y - mu_X)\n", + " analytic_correction = analytic_eif_at_test_pts.mean()\n", + " return analytic_correction, analytic_eif_at_test_pts" ] }, { - "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "# Empirical evaluation" + "### Computing automated doubly robust correction via Monte Carlo\n", + "\n", + "While the doubly robust correction term is known in closed-form for the average treatment effect functional, our `one_step_correction` function in `ChiRho` works for a wide class of other functionals. We focus on the average treatment effect functional here so that we have a ground truth to compare `one_step_correction` against." ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 10, "metadata": {}, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Run 0/1\n", - "torch.Size([500, 20]) torch.Size([500]) torch.Size([500])\n" - ] - }, { "name": "stderr", "output_type": "stream", "text": [ + "/Users/raj/Desktop/causal_pyro/chirho/robust/internals/predictive.py:127: UserWarning: Since max_plate_nesting is not specified, the first call to NMCLogPredictiveLikelihood will not be seeded properly. See https://github.com/BasisResearch/chirho/pull/408\n", + " warnings.warn(\n", "/opt/homebrew/anaconda3/envs/basis/lib/python3.10/site-packages/torch/nn/functional.py:3195: UserWarning: There is a performance drop because we have not yet implemented the batching rule for aten::fill.Scalar. Please file us an issue on GitHub so that we can prioritize its implementation. (Triggered internally at /Users/runner/work/pytorch/pytorch/pytorch/aten/src/ATen/functorch/BatchedFallback.cpp:84.)\n", " return torch.binary_cross_entropy_with_logits(input, target, weight, pos_weight, reduction_enum)\n" ] } ], "source": [ - "# Since we have access to the true data-generating distribution, we\n", - "# can keep simulating new datasets and retraining the model to get the \n", - "# sampling distribution of each estimator. In practice, one would need to \n", - "# use bootstrapping to approximate the true sampling distribution.\n", - "\n", - "N_runs = 1\n", - "plug_in_estimates = []\n", - "one_step_correction_monte_carlo = []\n", - "one_step_correction_analytic = []\n", - "\n", - "p = 20\n", - "alpha = 5\n", - "beta = 5\n", - "N_train = 500\n", - "N_test = 500\n", - "\n", - "link = lambda mu: dist.Normal(mu, 1.0)\n", - "\n", - "for i in range(N_runs):\n", - " pyro.clear_param_store()\n", - "\n", - " # Generate data\n", - " benchmark_model = BenchmarkLinearModel(p, link, alpha, beta)\n", - "\n", - " D_train = Predictive(\n", - " benchmark_model, num_samples=1, return_sites=[\"X\", \"A\", \"Y\"]\n", - " )(N=N_train)\n", - " D_train = {k: v[0] for k, v in D_train.items()}\n", - " D_test = Predictive(\n", - " benchmark_model, num_samples=N_test, return_sites=[\"X\", \"A\", \"Y\"]\n", - " )(N=1)\n", - " D_test_flat = {k: v[:,0] for k, v in D_test.items()}\n", - "\n", - " print(f\"Run {i}/{N_runs}\")\n", - " model = KnownCovariateDistModel(p, link)\n", - " conditioned_model = DataConditionedModel(model)\n", - " guide_train = pyro.infer.autoguide.AutoDelta(conditioned_model)\n", - " elbo = pyro.infer.Trace_ELBO()(conditioned_model, guide_train)\n", - "\n", - " print(D_train['X'].shape, D_train['A'].shape, D_train['Y'].shape)\n", - " # initialize parameters\n", - " elbo(D_train)\n", - "\n", - " adam = torch.optim.Adam(elbo.parameters(), lr=0.03)\n", - "\n", - " # Do gradient steps\n", - " for step in range(2000):\n", - " adam.zero_grad()\n", - " loss = elbo(D_train)\n", - " loss.backward()\n", - " adam.step()\n", - "\n", - " theta_hat = {k: v.clone().detach().requires_grad_(True) for k, v in guide_train().items()}\n", - " analytic_correction, analytic_eif_at_test_pts = closed_form_ate_correction(D_test_flat, theta_hat)\n", - "\n", - " guide = MLEGuide(theta_hat)\n", - " functional = functools.partial(ATEFunctional, num_monte_carlo=1000)\n", - " one_step_result = one_step_correction(model, guide, functional, D_test, max_plate_nesting=1, num_samples_outer=1000, num_samples_inner=1)(N=1)\n", - " ate_plug_in = functional(model, guide)(N=1)\n", - "\n", - "\n", - "# ATE_plugin =average_treatment_effect(model, theta_hat, n_monte_carlo=10000)\n", - "\n", - "# ATE_correction = one_step_correction(\n", - "# model,\n", - "# theta_hat,\n", - "# average_treatment_effect,\n", - "# D_test,\n", - "# pointwise_influence = False,\n", - "# n_monte_carlo=100*p,\n", - "# )\n", - "\n", - "# ATE_onestep = ATE_plugin + ATE_correction\n", - "\n", - "# analytic_correction, analytic_eif_at_test_pts = closed_form_ate_correction(D_test, theta_hat)\n", + "# Helper class to create a trivial guide that returns the maximum likelihood estimate\n", + "class MLEGuide(torch.nn.Module):\n", + " def __init__(self, mle_est: ParamDict):\n", + " super().__init__()\n", + " self.names = list(mle_est.keys())\n", + " for name, value in mle_est.items():\n", + " setattr(self, name + \"_param\", torch.nn.Parameter(value))\n", "\n", - "# plug_in_estimates.append(ATE_plugin.item())\n", - "# one_step_correction_monte_carlo.append(ATE_correction.item())\n", - "# one_step_correction_analytic.append(analytic_correction.item())\n", + " def forward(self, *args, **kwargs):\n", + " for name in self.names:\n", + " value = getattr(self, name + \"_param\")\n", + " pyro.sample(\n", + " name, pyro.distributions.Delta(value, event_dim=len(value.shape))\n", + " )\n", "\n", - "# plug_in_estimates = torch.tensor(plug_in_estimates)\n", - "# one_step_correction_monte_carlo = torch.tensor(one_step_correction_monte_carlo)\n", - "# one_step_correction_analytic = torch.tensor(one_step_correction_analytic)" + "# Compute doubly robust ATE estimates using both the automated and closed form expressions\n", + "plug_in_ates = []\n", + "analytic_corrections = []\n", + "automated_monte_carlo_corrections = []\n", + "for i in range(N_datasets):\n", + " theta_hat = fitted_params[i]\n", + " D_test = simulated_datasets[i][1]\n", + " mle_guide = MLEGuide(theta_hat)\n", + " functional = functools.partial(ATEFunctional, num_monte_carlo=10000)\n", + " ate_plug_in = functional(CausalGLM(p), mle_guide)()\n", + " analytic_correction, analytic_eif_at_test_pts = closed_form_doubly_robust_ate_correction(D_test, theta_hat)\n", + " automated_monte_carlo_correction = one_step_correction(\n", + " CausalGLM(p), \n", + " mle_guide, \n", + " D_test,\n", + " functional, \n", + " num_samples_outer=max(10000, 100 * p), \n", + " num_samples_inner=1\n", + " )()\n", + "\n", + " plug_in_ates.append(ate_plug_in.detach().item())\n", + " analytic_corrections.append(analytic_correction.detach().item())\n", + " automated_monte_carlo_corrections.append(automated_monte_carlo_correction.detach().item())\n", + "\n", + "plug_in_ates = np.array(plug_in_ates)\n", + "analytic_corrections = np.array(analytic_corrections)\n", + "automated_monte_carlo_corrections = np.array(automated_monte_carlo_corrections)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Results" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 23, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "tensor(0.5682, grad_fn=)" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ - "ate_plug_in + one_step_result" + "results = pd.DataFrame(\n", + " {\n", + " \"plug_in_ate\": plug_in_ates,\n", + " \"analytic_correction\": plug_in_ates + analytic_corrections,\n", + " \"automated_monte_carlo_correction\": plug_in_ates + automated_monte_carlo_corrections,\n", + " }\n", + ")" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 26, "metadata": {}, "outputs": [ { "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
plug_in_ateanalytic_correctionautomated_monte_carlo_correction
count100.00100.00100.00
mean0.330.230.23
std0.100.110.11
min0.09-0.10-0.09
25%0.270.160.17
50%0.330.220.22
75%0.410.300.30
max0.600.480.49
\n", + "
" + ], "text/plain": [ - "(tensor(0.0837, grad_fn=),\n", - " tensor(0.4845, grad_fn=),\n", - " tensor(0.1284, grad_fn=))" + " plug_in_ate analytic_correction automated_monte_carlo_correction\n", + "count 100.00 100.00 100.00\n", + "mean 0.33 0.23 0.23\n", + "std 0.10 0.11 0.11\n", + "min 0.09 -0.10 -0.09\n", + "25% 0.27 0.16 0.17\n", + "50% 0.33 0.22 0.22\n", + "75% 0.41 0.30 0.30\n", + "max 0.60 0.48 0.49" ] }, - "execution_count": 8, + "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "ate_plug_in, one_step_result, analytic_correction" + "# The true treatment effect is 0, so a mean estimate closer to zero is better\n", + "results.describe().round(2)" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "" + "" ] }, - "execution_count": 8, + "execution_count": 24, "metadata": {}, "output_type": "execute_result" }, { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -483,20 +806,21 @@ } ], "source": [ + "# Visualize the results\n", "fig, ax = plt.subplots()\n", "\n", "sns.kdeplot(\n", - " plug_in_estimates, \n", + " results['plug_in_ate'], \n", " label=\"Plug-in\", ax=ax\n", ")\n", "\n", "sns.kdeplot(\n", - " plug_in_estimates + one_step_correction_monte_carlo, \n", + " results['automated_monte_carlo_correction'], \n", " label=\"DR-Monte Carlo\", ax=ax\n", ")\n", "\n", "sns.kdeplot(\n", - " plug_in_estimates + one_step_correction_analytic, \n", + " results['analytic_correction'], \n", " label=\"DR-Analytic\", ax=ax\n", ")\n", "\n", @@ -505,6 +829,42 @@ "sns.despine()\n", "ax.legend(loc=\"upper right\")" ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "plt.scatter(\n", + " results['automated_monte_carlo_correction'],\n", + " results['analytic_correction'],\n", + ")\n", + "plt.plot(np.linspace(-.1, .5), np.linspace(-.1, .5), color=\"black\", linestyle=\"dashed\")\n", + "plt.xlabel(\"DR-Monte Carlo\")\n", + "plt.ylabel(\"DR-Analytic\")\n", + "sns.despine()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## References\n", + "\n", + "Kennedy, Edward. \"Towards optimal doubly robust estimation of heterogeneous causal effects\", 2022. https://arxiv.org/abs/2004.14497." + ] } ], "metadata": { diff --git a/docs/source/figures/robust_pipeline.png b/docs/source/figures/robust_pipeline.png new file mode 100644 index 00000000..7d05a948 Binary files /dev/null and b/docs/source/figures/robust_pipeline.png differ