Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelise dummy_outcome_refuter #1200

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 175 additions & 141 deletions dowhy/causal_refuters/dummy_outcome_refuter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.neighbors import KNeighborsRegressor
Expand Down Expand Up @@ -241,6 +242,151 @@ def refute_estimate(self, show_progress_bar: bool = False):
return refutes


def _refute_once(
data: pd.DataFrame,
estimate: CausalEstimate,
treatment_name: str,
outcome_name: str,
estimator_present: bool,
unobserved_confounder_values,
causal_effect_map,
identified_estimand,
test_fraction,
chosen_variables: Optional[List] = None,
transformation_list: List = DEFAULT_TRANSFORMATION,
true_causal_effect: Callable = DEFAULT_TRUE_CAUSAL_EFFECT,
min_data_point_threshold: float = MIN_DATA_POINT_THRESHOLD,
bucket_size_scale_factor: float = DEFAULT_BUCKET_SCALE_FACTOR,
):
estimates = []

if estimator_present == False:
# Warn the user that the specified parameter is not applicable when no estimator is present in the transformation
if test_fraction != DEFAULT_TEST_FRACTION:
logger.warning("'test_fraction' is not applicable as there is no base treatment value.")

# Adding an unobserved confounder if provided by the user
if unobserved_confounder_values is not None:
data["simulated"] = unobserved_confounder_values
chosen_variables.append("simulated")
# We set X_train = 0 and outcome_train to be 0
validation_df = data
X_train = None
outcome_train = None
X_validation_df = validation_df[chosen_variables]

X_validation = X_validation_df.values
outcome_validation = validation_df[outcome_name].values

# Get the final outcome, after running through all the values in the transformation list
outcome_validation = process_data(
outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list
)

# Check if the value of true effect has been already stored
# We use None as the key as we have no base category for this refutation
if None not in causal_effect_map:
# As we currently support only one treatment
causal_effect_map[None] = true_causal_effect(validation_df[treatment_name[0]])

outcome_validation += causal_effect_map[None]

new_data = validation_df.assign(dummy_outcome=outcome_validation)

new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand)
new_estimator.fit(
new_data,
effect_modifier_names=estimate.estimator._effect_modifier_names,
**new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {},
)
new_effect = new_estimator.estimate_effect(
new_data,
control_value=estimate.control_value,
treatment_value=estimate.treatment_value,
target_units=estimate.estimator._target_units,
)
estimates.append(new_effect.value)

else:
groups = preprocess_data_by_treatment(
data, treatment_name, unobserved_confounder_values, bucket_size_scale_factor, chosen_variables
)
group_count = 0

if len(test_fraction) == 1:
test_fraction = len(groups) * test_fraction

for key_train, _ in groups:
base_train = groups.get_group(key_train).sample(frac=test_fraction[group_count].base)
train_set = set([tuple(line) for line in base_train.values])
total_set = set([tuple(line) for line in groups.get_group(key_train).values])
base_validation = pd.DataFrame(list(total_set.difference(train_set)), columns=base_train.columns)
X_train_df = base_train[chosen_variables]

X_train = X_train_df.values
outcome_train = base_train[outcome_name].values

validation_df = []
transformation_list_temp = transformation_list
validation_df.append(base_validation)

for key_validation, _ in groups:
if key_validation != key_train:
validation_df.append(groups.get_group(key_validation).sample(frac=test_fraction[group_count].other))

validation_df = pd.concat(validation_df)
X_validation_df = validation_df[chosen_variables]

X_validation = X_validation_df.values
outcome_validation = validation_df[outcome_name].values

# If the number of data points is too few, run the default transformation: [("zero",""),("noise", {'std_dev':1} )]
if X_train.shape[0] <= min_data_point_threshold:
transformation_list_temp = DEFAULT_TRANSFORMATION
logger.warning(
"The number of data points in X_train:{} for category:{} is less than threshold:{}".format(
X_train.shape[0], key_train, min_data_point_threshold
)
)
logger.warning(
"Therefore, defaulting to the minimal set of transformations:{}".format(transformation_list_temp)
)

outcome_validation = process_data(
outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list_temp
)

# Check if the value of true effect has been already stored
# This ensures that we calculate the causal effect only once.
# We use key_train as we map data with respect to the base category of the data

if key_train not in causal_effect_map:
# As we currently support only one treatment
causal_effect_map[key_train] = true_causal_effect(validation_df[treatment_name[0]])

# Add h(t) to f(W) to get the dummy outcome
outcome_validation += causal_effect_map[key_train]

new_data = validation_df.assign(dummy_outcome=outcome_validation)
new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand)
new_estimator.fit(
new_data,
effect_modifier_names=estimate.estimator._effect_modifier_names,
**new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {},
)
new_effect = new_estimator.estimate_effect(
new_data,
control_value=estimate.control_value,
treatment_value=estimate.treatment_value,
target_units=estimate.estimator._target_units,
)

estimates.append(new_effect.value)
group_count += 1

return estimates


def refute_dummy_outcome(
data: pd.DataFrame,
target_estimand: IdentifiedEstimand,
Expand All @@ -256,6 +402,8 @@ def refute_dummy_outcome(
unobserved_confounder_values: Optional[List] = DEFAULT_NEW_DATA_WITH_UNOBSERVED_CONFOUNDING,
true_causal_effect: Callable = DEFAULT_TRUE_CAUSAL_EFFECT,
show_progress_bar=False,
n_jobs: int = 1,
verbose: int = 0,
**_,
) -> List[CausalRefutation]:
"""Refute an estimate by replacing the outcome with a simulated variable
Expand Down Expand Up @@ -447,159 +595,45 @@ def refute_dummy_outcome(
# Train and the Validation Datasets. Thus, we run the simulation loop followed by the training and the validation
# loops. Thus, we can get different values everytime we get the estimator.

# for _ in range( self._num_simulations ):
for _ in tqdm(
range(num_simulations),
colour=CausalRefuter.PROGRESS_BAR_COLOR,
disable=not show_progress_bar,
desc="Refuting Estimates: ",
):
estimates = []

if estimator_present == False:

# Warn the user that the specified parameter is not applicable when no estimator is present in the transformation
if test_fraction != DEFAULT_TEST_FRACTION:
logger.warning("'test_fraction' is not applicable as there is no base treatment value.")

# Adding an unobserved confounder if provided by the user
if unobserved_confounder_values is not None:
data["simulated"] = unobserved_confounder_values
chosen_variables.append("simulated")
# We set X_train = 0 and outcome_train to be 0
validation_df = data
X_train = None
outcome_train = None
X_validation_df = validation_df[chosen_variables]

X_validation = X_validation_df.values
outcome_validation = validation_df[outcome_name].values

# Get the final outcome, after running through all the values in the transformation list
outcome_validation = process_data(
outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list
)

# Check if the value of true effect has been already stored
# We use None as the key as we have no base category for this refutation
if None not in causal_effect_map:
# As we currently support only one treatment
causal_effect_map[None] = true_causal_effect(validation_df[treatment_name[0]])

outcome_validation += causal_effect_map[None]

new_data = validation_df.assign(dummy_outcome=outcome_validation)

new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand)
new_estimator.fit(
new_data,
effect_modifier_names=estimate.estimator._effect_modifier_names,
**new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {},
)
new_effect = new_estimator.estimate_effect(
new_data,
control_value=estimate.control_value,
treatment_value=estimate.treatment_value,
target_units=estimate.estimator._target_units,
)
estimates.append(new_effect.value)

else:

groups = preprocess_data_by_treatment(
data, treatment_name, unobserved_confounder_values, bucket_size_scale_factor, chosen_variables
)
group_count = 0

if len(test_fraction) == 1:
test_fraction = len(groups) * test_fraction

for key_train, _ in groups:
base_train = groups.get_group(key_train).sample(frac=test_fraction[group_count].base)
train_set = set([tuple(line) for line in base_train.values])
total_set = set([tuple(line) for line in groups.get_group(key_train).values])
base_validation = pd.DataFrame(list(total_set.difference(train_set)), columns=base_train.columns)
X_train_df = base_train[chosen_variables]

X_train = X_train_df.values
outcome_train = base_train[outcome_name].values

validation_df = []
transformation_list_temp = transformation_list
validation_df.append(base_validation)

for key_validation, _ in groups:
if key_validation != key_train:
validation_df.append(
groups.get_group(key_validation).sample(frac=test_fraction[group_count].other)
)

validation_df = pd.concat(validation_df)
X_validation_df = validation_df[chosen_variables]

X_validation = X_validation_df.values
outcome_validation = validation_df[outcome_name].values

# If the number of data points is too few, run the default transformation: [("zero",""),("noise", {'std_dev':1} )]
if X_train.shape[0] <= min_data_point_threshold:
transformation_list_temp = DEFAULT_TRANSFORMATION
logger.warning(
"The number of data points in X_train:{} for category:{} is less than threshold:{}".format(
X_train.shape[0], key_train, min_data_point_threshold
)
)
logger.warning(
"Therefore, defaulting to the minimal set of transformations:{}".format(
transformation_list_temp
)
)

outcome_validation = process_data(
outcome_name, X_train, outcome_train, X_validation, outcome_validation, transformation_list_temp
)

# Check if the value of true effect has been already stored
# This ensures that we calculate the causal effect only once.
# We use key_train as we map data with respect to the base category of the data

if key_train not in causal_effect_map:
# As we currently support only one treatment
causal_effect_map[key_train] = true_causal_effect(validation_df[treatment_name[0]])

# Add h(t) to f(W) to get the dummy outcome
outcome_validation += causal_effect_map[key_train]

new_data = validation_df.assign(dummy_outcome=outcome_validation)
new_estimator = estimate.estimator.get_new_estimator_object(identified_estimand)
new_estimator.fit(
new_data,
effect_modifier_names=estimate.estimator._effect_modifier_names,
**new_estimator._fit_params if hasattr(new_estimator, "_fit_params") else {},
)
new_effect = new_estimator.estimate_effect(
new_data,
control_value=estimate.control_value,
treatment_value=estimate.treatment_value,
target_units=estimate.estimator._target_units,
)

estimates.append(new_effect.value)
group_count += 1
sample_estimates = Parallel(n_jobs=n_jobs, verbose=verbose)(
delayed(_refute_once)(
data=data,
estimate=estimate,
treatment_name=treatment_name,
outcome_name=outcome_name,
estimator_present=estimator_present,
unobserved_confounder_values=unobserved_confounder_values,
causal_effect_map=causal_effect_map,
identified_estimand=identified_estimand,
chosen_variables=chosen_variables,
transformation_list=transformation_list,
true_causal_effect=true_causal_effect,
min_data_point_threshold=min_data_point_threshold,
bucket_size_scale_factor=bucket_size_scale_factor,
test_fraction=test_fraction,
)
for _ in tqdm(
range(num_simulations),
colour=CausalRefuter.PROGRESS_BAR_COLOR,
disable=not show_progress_bar,
desc="Refuting Estimates: ",
)
)

simulation_results.append(estimates)
# simulation_results.append(estimates)

# We convert to ndarray for ease in indexing
# The data is of the form
# sim1: cat1 cat2 ... catn
# sim2: cat1 cat2 ... catn
simulation_results = np.array(simulation_results)
simulation_results = np.array(sample_estimates)

# print('SIMULATION RESULTS::::: ', simulation_results)
# Note: We would like the causal_estimator to find the true causal estimate that we have specified through this
# refuter. Let the value of the true causal effect be h(t). In the following section of code, we wish to find out if h(t) falls in the
# distribution of the refuter.

if estimator_present == False:

dummy_estimate = CausalEstimate(
data=None,
treatment_name=estimate._treatment_name,
Expand Down
Loading