diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 1e55a1c..a030b0c 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -67,9 +67,12 @@ jobs: POLICYENGINE_US_DATA_GITHUB_TOKEN: ${{ secrets.POLICYENGINE_US_DATA_GITHUB_TOKEN }} - name: Build datasets run: make data - env: - TEST_LITE: true - name: Run tests run: pytest - name: Test documentation builds - run: make documentation \ No newline at end of file + run: make documentation + - name: Upload ECPS 2024 + uses: actions/upload-artifact@v4 + with: + name: enhanced_cps_2024.h5 + path: policyengine_us_data/storage/enhanced_cps_2024.h5 diff --git a/Makefile b/Makefile index 43de4c5..e2e1c15 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,9 @@ documentation: jb clean docs && jb build docs data: + python policyengine_us_data/datasets/acs/acs.py python policyengine_us_data/datasets/cps/cps.py + python policyengine_us_data/datasets/cps/extended_cps.py python policyengine_us_data/datasets/cps/enhanced_cps.py clean: diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..9257e6f 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,6 @@ +- bump: minor + changes: + added: + - Migrate the ACS from the US-repository. + changed: + - Enhanced CPS now uses a 3-year pooled CPS. diff --git a/policyengine_us_data/datasets/__init__.py b/policyengine_us_data/datasets/__init__.py index ba0e5f6..ffde8e4 100644 --- a/policyengine_us_data/datasets/__init__.py +++ b/policyengine_us_data/datasets/__init__.py @@ -5,6 +5,7 @@ CPS_2022, CPS_2023, CPS_2024, + Pooled_3_Year_CPS_2023, CensusCPS_2018, CensusCPS_2019, CensusCPS_2020, @@ -15,5 +16,6 @@ ReweightedCPS_2024, ) from .puf import PUF_2015, PUF_2021, PUF_2024, IRS_PUF_2015 +from .acs import ACS_2022 -DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024] +DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024, ACS_2022] diff --git a/policyengine_us_data/datasets/acs/README.md b/policyengine_us_data/datasets/acs/README.md new file mode 100644 index 0000000..633e04e --- /dev/null +++ b/policyengine_us_data/datasets/acs/README.md @@ -0,0 +1,6 @@ +2022 ACS 1 Year Data Dictionary: +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2022.pdf +User Guide: +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/2022ACS_PUMS_User_Guide.pdf +PUMS Documentation: +https://www.census.gov/programs-surveys/acs/microdata/documentation.html diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py new file mode 100644 index 0000000..03bd355 --- /dev/null +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -0,0 +1,2 @@ +from .acs import * +from .census_acs import * diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py new file mode 100644 index 0000000..349d9b6 --- /dev/null +++ b/policyengine_us_data/datasets/acs/acs.py @@ -0,0 +1,118 @@ +import logging +from policyengine_core.data import Dataset +import h5py +from policyengine_us_data.datasets.acs.census_acs import CensusACS_2022 +from policyengine_us_data.storage import STORAGE_FOLDER +from pandas import DataFrame +import numpy as np +import pandas as pd + + +class ACS(Dataset): + data_format = Dataset.ARRAYS + time_period = None + census_acs = None + + def generate(self) -> None: + """Generates the ACS dataset.""" + + raw_data = self.census_acs(require=True).load() + acs = h5py.File(self.file_path, mode="w") + person, household = [ + raw_data[entity] for entity in ("person", "household") + ] + + self.add_id_variables(acs, person, household) + self.add_person_variables(acs, person, household) + self.add_household_variables(acs, household) + + acs.close() + raw_data.close() + + @staticmethod + def add_id_variables( + acs: h5py.File, + person: DataFrame, + household: DataFrame, + ) -> None: + # Create numeric IDs based on SERIALNO + h_id_to_number = pd.Series( + np.arange(len(household)), index=household["SERIALNO"] + ) + household["household_id"] = h_id_to_number[ + household["SERIALNO"] + ].values + person["household_id"] = h_id_to_number[person["SERIALNO"]].values + person["person_id"] = person.index + 1 + + acs["person_id"] = person["person_id"] + acs["household_id"] = household["household_id"] + acs["spm_unit_id"] = acs["household_id"] + acs["tax_unit_id"] = acs["household_id"] + acs["family_id"] = acs["household_id"] + acs["marital_unit_id"] = acs["household_id"] + acs["person_household_id"] = person["household_id"] + acs["person_spm_unit_id"] = person["household_id"] + acs["person_tax_unit_id"] = person["household_id"] + acs["person_family_id"] = person["household_id"] + acs["person_marital_unit_id"] = person["household_id"] + acs["household_weight"] = household.WGTP + + @staticmethod + def add_person_variables( + acs: h5py.File, person: DataFrame, household: DataFrame + ) -> None: + acs["age"] = person.AGEP + acs["is_male"] = person.SEX == 1 + acs["employment_income"] = person.WAGP + acs["self_employment_income"] = person.SEMP + acs["social_security"] = person.SSP + acs["taxable_private_pension_income"] = person.RETP + person[["rent", "real_estate_taxes"]] = ( + household.set_index("household_id") + .loc[person["household_id"]][["RNTP", "TAXAMT"]] + .values + ) + acs["is_household_head"] = person.SPORDER == 1 + factor = person.SPORDER == 1 + person.rent *= factor * 12 + person.real_estate_taxes *= factor + acs["rent"] = person.rent + acs["real_estate_taxes"] = person.real_estate_taxes + acs["tenure_type"] = ( + household.TEN.astype(int) + .map( + { + 1: "OWNED_WITH_MORTGAGE", + 2: "OWNED_OUTRIGHT", + 3: "RENTED", + } + ) + .fillna("NONE") + .astype("S") + ) + + @staticmethod + def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: + acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES + acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD + + @staticmethod + def add_household_variables(acs: h5py.File, household: DataFrame) -> None: + acs["household_vehicles_owned"] = household.VEH + acs["state_fips"] = acs["household_state_fips"] = household.ST.astype( + int + ) + + +class ACS_2022(ACS): + name = "acs_2022" + label = "ACS 2022" + time_period = 2022 + file_path = STORAGE_FOLDER / "acs_2022.h5" + census_acs = CensusACS_2022 + url = "release://PolicyEngine/policyengine-us-data/release/acs_2022.h5" + + +if __name__ == "__main__": + ACS_2022().generate() diff --git a/policyengine_us_data/datasets/acs/census_acs.py b/policyengine_us_data/datasets/acs/census_acs.py new file mode 100644 index 0000000..842af62 --- /dev/null +++ b/policyengine_us_data/datasets/acs/census_acs.py @@ -0,0 +1,208 @@ +from io import BytesIO +import logging +from typing import List +from zipfile import ZipFile +import pandas as pd +from policyengine_core.data import Dataset +import requests +from tqdm import tqdm +from policyengine_us_data.storage import STORAGE_FOLDER + +logging.getLogger().setLevel(logging.INFO) + +PERSON_COLUMNS = [ + "SERIALNO", # Household ID + "SPORDER", # Person number within household + "PWGTP", # Person weight + "AGEP", # Age + "CIT", # Citizenship + "MAR", # Marital status + "WAGP", # Wage/salary + "SSP", # Social security income + "SSIP", # Supplemental security income + "SEX", # Sex + "SEMP", # Self-employment income + "SCHL", # Educational attainment + "RETP", # Retirement income + "PAP", # Public assistance income + "OIP", # Other income + "PERNP", # Total earnings + "PINCP", # Total income + "POVPIP", # Income-to-poverty line percentage + "RAC1P", # Race +] + +HOUSEHOLD_COLUMNS = [ + "SERIALNO", # Household ID + "PUMA", # PUMA area code + "ST", # State code + "ADJHSG", # Adjustment factor for housing dollar amounts + "ADJINC", # Adjustment factor for income + "WGTP", # Household weight + "NP", # Number of persons in household + "BDSP", # Number of bedrooms + "ELEP", # Electricity monthly cost + "FULP", # Fuel monthly cost + "GASP", # Gas monthly cost + "RMSP", # Number of rooms + "RNTP", # Monthly rent + "TEN", # Tenure + "VEH", # Number of vehicles + "FINCP", # Total income + "GRNTP", # Gross rent + "TAXAMT", # Property taxes +] + + +class CensusACS(Dataset): + data_format = Dataset.TABLES + + def generate(self) -> None: + spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{self.time_period}_pu.dta" + person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_pus.zip" + household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_hus.zip" + + with pd.HDFStore(self.file_path, mode="w") as storage: + household = self.process_household_data( + household_url, "psam_hus", HOUSEHOLD_COLUMNS + ) + person = self.process_person_data( + person_url, "psam_pus", PERSON_COLUMNS + ) + person = person[person.SERIALNO.isin(household.SERIALNO)] + household = household[household.SERIALNO.isin(person.SERIALNO)] + storage["household"] = household + storage["person"] = person + + @staticmethod + def process_household_data( + url: str, prefix: str, columns: List[str] + ) -> pd.DataFrame: + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + a = pd.read_csv( + zf.open(prefix + "a.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) + b = pd.read_csv( + zf.open(prefix + "b.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + + # Ensure correct data types + res["ST"] = res["ST"].astype(int) + + return res + + @staticmethod + def process_person_data( + url: str, prefix: str, columns: List[str] + ) -> pd.DataFrame: + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + a = pd.read_csv( + zf.open(prefix + "a.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) + b = pd.read_csv( + zf.open(prefix + "b.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + + # Ensure correct data types + res["SPORDER"] = res["SPORDER"].astype(int) + + return res + + @staticmethod + def create_spm_unit_table( + storage: pd.HDFStore, person: pd.DataFrame + ) -> None: + SPM_UNIT_COLUMNS = [ + "CAPHOUSESUB", + "CAPWKCCXPNS", + "CHILDCAREXPNS", + "EITC", + "ENGVAL", + "EQUIVSCALE", + "FEDTAX", + "FEDTAXBC", + "FICA", + "GEOADJ", + "MEDXPNS", + "NUMADULTS", + "NUMKIDS", + "NUMPER", + "POOR", + "POVTHRESHOLD", + "RESOURCES", + "SCHLUNCH", + "SNAPSUB", + "STTAX", + "TENMORTSTATUS", + "TOTVAL", + "WCOHABIT", + "WICVAL", + "WKXPNS", + "WUI_LT15", + "ID", + ] + spm_table = ( + person[["SPM_" + column for column in SPM_UNIT_COLUMNS]] + .groupby(person.SPM_ID) + .first() + ) + + original_person_table = storage["person"] + original_person_table.to_csv("person.csv") + person.to_csv("spm_person.csv") + + # Ensure SERIALNO is treated as string + JOIN_COLUMNS = ["SERIALNO", "SPORDER"] + original_person_table["SERIALNO"] = original_person_table[ + "SERIALNO" + ].astype(str) + original_person_table["SPORDER"] = original_person_table[ + "SPORDER" + ].astype(int) + person["SERIALNO"] = person["SERIALNO"].astype(str) + person["SPORDER"] = person["SPORDER"].astype(int) + + # Add SPM_ID from the SPM person table to the original person table. + combined_person_table = pd.merge( + original_person_table, + person[JOIN_COLUMNS + ["SPM_ID"]], + on=JOIN_COLUMNS, + ) + + storage["person_matched"] = combined_person_table + storage["spm_unit"] = spm_table + + +class CensusACS_2022(CensusACS): + label = "Census ACS (2022)" + name = "census_acs_2022.h5" + file_path = STORAGE_FOLDER / "census_acs_2022.h5" + time_period = 2022 diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 8a4494f..74097f9 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -11,6 +11,7 @@ from policyengine_us_data.utils.uprating import ( create_policyengine_uprating_factors_table, ) +from policyengine_us_data.utils import QRF class CPS(Dataset): @@ -28,25 +29,14 @@ def generate(self): if self.raw_cps is None: # Extrapolate from CPS 2023 - cps_2022 = CPS_2023(require=True) - uprating = create_policyengine_uprating_factors_table() - arrays = cps_2022.load_dataset() - for variable in uprating.index.unique(): - if variable in arrays: - current_index = uprating[uprating.index == variable][ - self.time_period - ].values[0] - start_index = uprating[uprating.index == variable][ - 2023 - ].values[0] - growth = current_index / start_index - arrays[variable] = arrays[variable] * growth - + cps_2023 = CPS_2023(require=True) + arrays = cps_2023.load_dataset() + arrays = uprate_cps_data(arrays, 2023, self.time_period) self.save_dataset(arrays) return raw_data = self.raw_cps(require=True).load() - cps = h5py.File(self.file_path, mode="w") + cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") person, tax_unit, family, spm_unit, household = [ @@ -59,20 +49,83 @@ def generate(self): add_previous_year_income(self, cps) add_spm_variables(cps, spm_unit) add_household_variables(cps, household) - add_rent(cps, person, household) + add_rent(self, cps, person, household) raw_data.close() - cps.close() + self.save_dataset(cps) -def add_rent(cps: h5py.File, person: DataFrame, household: DataFrame): - is_renting = household.H_TENURE == 2 - AVERAGE_RENT = 1_300 * 12 - # Project down to the first person in the household - person_is_renting = ( - household.set_index("H_SEQ").loc[person.PH_SEQ].H_TENURE.values == 2 - ) - cps["pre_subsidy_rent"] = np.where(person_is_renting, AVERAGE_RENT, 0) +def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): + cps["tenure_type"] = household.H_TENURE.map( + { + 0: "NONE", + 1: "OWNED_WITH_MORTGAGE", + 2: "RENTED", + 3: "NONE", + } + ).astype("S") + self.save_dataset(cps) + + from policyengine_us_data.datasets.acs.acs import ACS_2022 + from policyengine_us import Microsimulation + + acs = Microsimulation(dataset=ACS_2022) + cps_sim = Microsimulation(dataset=self) + + PREDICTORS = [ + "is_household_head", + "age", + "is_male", + "tenure_type", + "employment_income", + "self_employment_income", + "social_security", + "pension_income", + ] + IMPUTATIONS = ["rent", "real_estate_taxes"] + train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS) + train_df.tenure_type = train_df.tenure_type.map( + { + "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", + }, + na_action="ignore", + ).fillna(train_df.tenure_type) + train_df = train_df[train_df.is_household_head].sample(100_000) + inference_df = cps_sim.calculate_dataframe(PREDICTORS) + mask = inference_df.is_household_head.values + inference_df = inference_df[mask] + + qrf = QRF() + print("Training imputation model for rent and real estate taxes.") + qrf.fit(train_df[PREDICTORS], train_df[IMPUTATIONS]) + print("Imputing rent and real estate taxes.") + imputed_values = qrf.predict(inference_df[PREDICTORS]) + print("Imputation complete.") + cps["rent"] = np.zeros_like(cps["age"]) + cps["rent"][mask] = imputed_values["rent"] + # Assume zero housing assistance since + cps["pre_subsidy_rent"] = cps["rent"] + cps["housing_assistance"] = np.zeros_like( + cps["spm_unit_capped_housing_subsidy_reported"] + ) + cps["real_estate_taxes"] = np.zeros_like(cps["age"]) + cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] + + +def uprate_cps_data(data, from_period, to_period): + uprating = create_policyengine_uprating_factors_table() + for variable in uprating.index.unique(): + if variable in data: + current_index = uprating[uprating.index == variable][ + to_period + ].values[0] + start_index = uprating[uprating.index == variable][ + from_period + ].values[0] + growth = current_index / start_index + data[variable] = data[variable] * growth + + return data def add_id_variables( @@ -105,7 +158,7 @@ def add_id_variables( cps["spm_unit_id"] = spm_unit.SPM_ID cps["person_household_id"] = person.PH_SEQ cps["person_family_id"] = person.PH_SEQ * 10 + person.PF_SEQ - + cps["is_household_head"] = person.P_SEQ == 1 cps["household_weight"] = household.HSUP_WGT / 1e2 # Marital units @@ -283,14 +336,14 @@ def add_personal_income_variables( # Allocate retirement distributions by taxability. for source_with_taxable_fraction in ["401k", "403b", "sep"]: cps[f"taxable_{source_with_taxable_fraction}_distributions"] = ( - cps[f"{source_with_taxable_fraction}_distributions"][...] + cps[f"{source_with_taxable_fraction}_distributions"] * p[ f"taxable_{source_with_taxable_fraction}_distribution_fraction" ] ) cps[f"tax_exempt_{source_with_taxable_fraction}_distributions"] = cps[ f"{source_with_taxable_fraction}_distributions" - ][...] * ( + ] * ( 1 - p[ f"taxable_{source_with_taxable_fraction}_distribution_fraction" @@ -430,14 +483,14 @@ def add_spm_variables(cps: h5py.File, spm_unit: DataFrame) -> None: cps[openfisca_variable] = spm_unit[asec_variable] cps["reduced_price_school_meals_reported"] = ( - cps["free_school_meals_reported"][...] * 0 + cps["free_school_meals_reported"] * 0 ) def add_household_variables(cps: h5py.File, household: DataFrame) -> None: cps["state_fips"] = household.GESTFIPS cps["county_fips"] = household.GTCO - state_county_fips = cps["state_fips"][...] * 1e3 + cps["county_fips"][...] + state_county_fips = cps["state_fips"] * 1e3 + cps["county_fips"] # Assign is_nyc here instead of as a variable formula so that it shows up # as toggleable in the webapp. # List county FIPS codes for each NYC county/borough. @@ -570,5 +623,68 @@ class CPS_2024(CPS): url = "release://policyengine/policyengine-us-data/release/cps_2024.h5" +class PooledCPS(Dataset): + data_format = Dataset.ARRAYS + input_datasets: list + time_period: int + + def generate(self): + data = [ + input_dataset(require=True).load_dataset() + for input_dataset in self.input_datasets + ] + time_periods = [dataset.time_period for dataset in self.input_datasets] + data = [ + uprate_cps_data(data, time_period, self.time_period) + for data, time_period in zip(data, time_periods) + ] + + new_data = {} + + for i in range(len(data)): + for variable in data[i]: + data_values = data[i][variable] + if variable not in new_data: + new_data[variable] = data_values + elif "_id" in variable: + previous_max = new_data[variable].max() + new_data[variable] = np.concatenate( + [ + new_data[variable], + data_values + previous_max, + ] + ) + else: + new_data[variable] = np.concatenate( + [ + new_data[variable], + data_values, + ] + ) + + new_data["household_weight"] = new_data["household_weight"] / len( + self.input_datasets + ) + + self.save_dataset(new_data) + + +class Pooled_3_Year_CPS_2023(PooledCPS): + label = "CPS 2023 (3-year pooled)" + name = "pooled_3_year_cps_2023" + file_path = STORAGE_FOLDER / "pooled_3_year_cps_2023.h5" + input_datasets = [ + CPS_2021, + CPS_2022, + CPS_2023, + ] + time_period = 2023 + url = "release://PolicyEngine/policyengine-us-data/release/pooled_3_year_cps_2023.h5" + + if __name__ == "__main__": + CPS_2021().generate() + CPS_2022().generate() + CPS_2023().generate() CPS_2024().generate() + Pooled_3_Year_CPS_2023().generate() diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 56ef392..39918c5 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -99,13 +99,13 @@ def train_previous_year_income_model(): df = sim.calculate_dataframe(VARIABLES + OUTPUTS, 2019, map_to="person") df_train = df[df.previous_year_income_available] - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF - income_last_year = Imputation() + income_last_year = QRF() X = df_train[VARIABLES[1:]] y = df_train[OUTPUTS] - income_last_year.train(X, y) + income_last_year.fit(X, y) return income_last_year @@ -115,7 +115,6 @@ class EnhancedCPS(Dataset): input_dataset: Type[Dataset] start_year: int end_year: int - url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024.h5" def generate(self): from policyengine_us import Microsimulation @@ -175,6 +174,7 @@ class EnhancedCPS_2024(EnhancedCPS): name = "enhanced_cps_2024" label = "Enhanced CPS 2024" file_path = STORAGE_FOLDER / "enhanced_cps_2024.h5" + url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024.h5" if __name__ == "__main__": diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 970fd6b..6f36f0d 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -1,10 +1,12 @@ from policyengine_core.data import Dataset from policyengine_us_data.storage import STORAGE_FOLDER from typing import Type -from .cps import * -from ..puf import * +from policyengine_us_data.datasets.cps.cps import * +from policyengine_us_data.datasets.puf import * import pandas as pd import os +from policyengine_us_data.utils import QRF +import time # These are sorted by magnitude. # First 15 contain 90%. @@ -28,7 +30,6 @@ "qualified_dividend_income", "charitable_cash_donations", "self_employed_pension_contribution_ald", - "real_estate_taxes", "unrecaptured_section_1250_gain", "taxable_unemployment_compensation", "taxable_interest_income", @@ -80,7 +81,6 @@ class ExtendedCPS(Dataset): def generate(self): from policyengine_us import Microsimulation - from survey_enhance import Imputation cps_sim = Microsimulation(dataset=self.cps) puf_sim = Microsimulation(dataset=self.puf) @@ -100,16 +100,20 @@ def generate(self): X = cps_sim.calculate_dataframe(INPUTS) y = pd.DataFrame(columns=IMPUTED_VARIABLES, index=X.index) - model = Imputation() - model.train( + model = QRF() + start = time.time() + model.fit( X_train, y_train, - verbose=True, - sample_weight=puf_sim.calculate( - "household_weight", map_to="person" - ).values, ) - y = model.predict(X, verbose=True) + print( + f"Training imputation models from the PUF took {time.time() - start:.2f} seconds" + ) + start = time.time() + y = model.predict(X) + print( + f"Predicting imputed values took {time.time() - start:.2f} seconds" + ) data = cps_sim.dataset.load_dataset() new_data = {} @@ -151,3 +155,7 @@ class ExtendedCPS_2024(ExtendedCPS): label = "Extended CPS (2024)" file_path = STORAGE_FOLDER / "extended_cps_2024.h5" time_period = 2024 + + +if __name__ == "__main__": + ExtendedCPS_2024().generate() diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 9633edb..e4700a5 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -22,14 +22,13 @@ def impute_pension_contributions_to_puf(puf_df): ["employment_income", "household_weight", "pre_tax_contributions"] ) - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF - pension_contributions = Imputation() + pension_contributions = QRF() - pension_contributions.train( - X=cps_df[["employment_income"]], - Y=cps_df[["pre_tax_contributions"]], - sample_weight=cps_df["household_weight"], + pension_contributions.fit( + cps_df[["employment_income"]], + cps_df[["pre_tax_contributions"]], ) return pension_contributions.predict( X=puf_df[["employment_income"]], @@ -39,7 +38,7 @@ def impute_pension_contributions_to_puf(puf_df): def impute_missing_demographics( puf: pd.DataFrame, demographics: pd.DataFrame ) -> pd.DataFrame: - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF puf_with_demographics = ( puf[puf.RECID.isin(demographics.RECID)] @@ -63,11 +62,11 @@ def impute_missing_demographics( "XTOT", ] - demographics_from_puf = Imputation() + demographics_from_puf = QRF() - demographics_from_puf.train( - X=puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES], - Y=puf_with_demographics[DEMOGRAPHIC_VARIABLES], + demographics_from_puf.fit( + puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES], + puf_with_demographics[DEMOGRAPHIC_VARIABLES], ) puf_without_demographics = puf[ diff --git a/policyengine_us_data/storage/upload_completed_datasets.py b/policyengine_us_data/storage/upload_completed_datasets.py index e4b9bed..c094c4d 100644 --- a/policyengine_us_data/storage/upload_completed_datasets.py +++ b/policyengine_us_data/storage/upload_completed_datasets.py @@ -26,3 +26,11 @@ "puf_2024.h5", FOLDER / "puf_2024.h5", ) + +upload( + "PolicyEngine", + "policyengine-us-data", + "release", + "acs_2022.h5", + FOLDER / "acs_2022.h5", +) diff --git a/policyengine_us_data/storage/uprating_factors.csv b/policyengine_us_data/storage/uprating_factors.csv index b5374d9..3269bdc 100644 --- a/policyengine_us_data/storage/uprating_factors.csv +++ b/policyengine_us_data/storage/uprating_factors.csv @@ -50,8 +50,9 @@ pre_tax_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1. prior_year_minimum_tax_credit,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 qualified_dividend_income,1.0,1.2,1.269,1.283,1.325,1.376,1.414,1.445,1.483,1.533,1.624,1.714,1.801,1.885,1.966 qualified_tuition_expenses,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 -real_estate_taxes,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 +real_estate_taxes,1.0,1.011,1.083,1.142,1.164,1.161,1.18,1.198,1.219,1.238,1.26,1.282,1.305,1.329,1.353 recapture_of_investment_credit,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 +rent,1.0,1.011,1.083,1.142,1.164,1.161,1.18,1.198,1.219,1.238,1.26,1.282,1.305,1.329,1.353 rental_income,1.0,0.976,0.961,1.017,1.071,1.104,1.13,1.163,1.196,1.228,1.266,1.307,1.348,1.392,1.438 roth_401k_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 roth_ira_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 diff --git a/policyengine_us_data/storage/uprating_growth_factors.csv b/policyengine_us_data/storage/uprating_growth_factors.csv index 2829eaa..d020204 100644 --- a/policyengine_us_data/storage/uprating_growth_factors.csv +++ b/policyengine_us_data/storage/uprating_growth_factors.csv @@ -50,8 +50,9 @@ pre_tax_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0. prior_year_minimum_tax_credit,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 qualified_dividend_income,0,0.2,0.057,0.011,0.033,0.038,0.028,0.022,0.026,0.034,0.059,0.055,0.051,0.047,0.043 qualified_tuition_expenses,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 -real_estate_taxes,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 +real_estate_taxes,0,0.011,0.071,0.054,0.019,-0.003,0.016,0.015,0.018,0.016,0.018,0.017,0.018,0.018,0.018 recapture_of_investment_credit,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 +rent,0,0.011,0.071,0.054,0.019,-0.003,0.016,0.015,0.018,0.016,0.018,0.017,0.018,0.018,0.018 rental_income,0,-0.024,-0.015,0.058,0.053,0.031,0.024,0.029,0.028,0.027,0.031,0.032,0.031,0.033,0.033 roth_401k_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 roth_ira_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 diff --git a/policyengine_us_data/tests/test_datasets/test_acs.py b/policyengine_us_data/tests/test_datasets/test_acs.py new file mode 100644 index 0000000..5c0d612 --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_acs.py @@ -0,0 +1,14 @@ +import pytest +from policyengine_us import Microsimulation + + +@pytest.mark.parametrize("year", [2022]) +def test_acs_generates(year: int): + from policyengine_us_data.datasets.acs.acs import ACS_2022 + + dataset_by_year = { + 2022: ACS_2022, + } + + dataset = dataset_by_year[year]() + dataset.generate() # This will generate the dataset diff --git a/policyengine_us_data/utils/__init__.py b/policyengine_us_data/utils/__init__.py index 1ccbd39..d25c6c2 100644 --- a/policyengine_us_data/utils/__init__.py +++ b/policyengine_us_data/utils/__init__.py @@ -2,3 +2,4 @@ from .soi import * from .uprating import * from .loss import * +from .qrf import * diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index f068011..6f9f30c 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -184,7 +184,7 @@ def build_loss_matrix(dataset: type, time_period): # Medical expenses, sum of spm thresholds # Child support expenses - CPS_DERIVED_TOTALS_2024 = { + HARD_CODED_TOTALS = { "health_insurance_premiums_without_medicare_part_b": 385e9, "other_medical_expenses": 278e9, "medicare_part_b_premiums": 112e9, @@ -198,9 +198,12 @@ def build_loss_matrix(dataset: type, time_period): # Alimony could be targeted via SOI "alimony_income": 13e9, "alimony_expense": 13e9, + # Rough estimate, not CPS derived + "real_estate_taxes": 400e9, # Rough estimate between 350bn and 600bn total property tax collections + "rent": 735e9, # ACS total uprated by CPI } - for variable_name, target in CPS_DERIVED_TOTALS_2024.items(): + for variable_name, target in HARD_CODED_TOTALS.items(): label = f"census/{variable_name}" loss_matrix[label] = sim.calculate( variable_name, map_to="household" diff --git a/policyengine_us_data/utils/qrf.py b/policyengine_us_data/utils/qrf.py new file mode 100644 index 0000000..13a298f --- /dev/null +++ b/policyengine_us_data/utils/qrf.py @@ -0,0 +1,70 @@ +try: + from quantile_forest import RandomForestQuantileRegressor +except ImportError: + pass +import pandas as pd +import numpy as np +import pickle + + +class QRF: + categorical_columns: list = None + encoded_columns: list = None + output_columns: list = None + + def __init__(self, seed=0, file_path=None): + self.seed = seed + + if file_path is not None: + with open(file_path, "rb") as f: + data = pickle.load(f) + self.seed = data["seed"] + self.categorical_columns = data["categorical_columns"] + self.encoded_columns = data["encoded_columns"] + self.output_columns = data["output_columns"] + self.qrf = data["qrf"] + + def fit(self, X, y, **qrf_kwargs): + self.categorical_columns = X.select_dtypes(include=["object"]).columns + X = pd.get_dummies( + X, columns=self.categorical_columns, drop_first=True + ) + self.encoded_columns = X.columns + self.output_columns = y.columns + self.qrf = RandomForestQuantileRegressor( + random_state=self.seed, **qrf_kwargs + ) + self.qrf.fit(X, y) + + def predict(self, X, count_samples=10, mean_quantile=0.5): + X = pd.get_dummies( + X, columns=self.categorical_columns, drop_first=True + ) + X = X[self.encoded_columns] + pred = self.qrf.predict( + X, quantiles=list(np.linspace(0, 1, count_samples)) + ) + random_generator = np.random.default_rng(self.seed) + a = mean_quantile / (1 - mean_quantile) + input_quantiles = ( + random_generator.beta(a, 1, size=len(X)) * count_samples + ) + input_quantiles = input_quantiles.astype(int) + if len(pred.shape) == 2: + predictions = pred[np.arange(len(pred)), input_quantiles] + else: + predictions = pred[np.arange(len(pred)), :, input_quantiles] + return pd.DataFrame(predictions, columns=self.output_columns) + + def save(self, path): + with open(path, "wb") as f: + pickle.dump( + { + "seed": self.seed, + "categorical_columns": self.categorical_columns, + "encoded_columns": self.encoded_columns, + "output_columns": self.output_columns, + "qrf": self.qrf, + }, + f, + ) diff --git a/pyproject.toml b/pyproject.toml index d0fe62e..4fa24f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,8 +22,8 @@ dependencies = [ dev = [ "black", "pytest", - "policyengine_us==1.71.1", - "survey_enhance", + "policyengine_us @ git+https://github.com/policyengine/policyengine-us@us-rent-data", + "quantile-forest", "torch", "tables", "tabulate",