From d9146dd5a75ca9c8cf0cfb8196f7f1284b68dfac Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Mon, 9 Sep 2024 23:03:28 +0200 Subject: [PATCH 01/38] Migrate ACS from policyengine-us Fixes #31 --- policyengine_us_data/datasets/acs/__init__.py | 2 + policyengine_us_data/datasets/acs/acs.py | 118 +++++++++++ policyengine_us_data/datasets/acs/raw_acs.py | 198 ++++++++++++++++++ 3 files changed, 318 insertions(+) create mode 100644 policyengine_us_data/datasets/acs/__init__.py create mode 100644 policyengine_us_data/datasets/acs/acs.py create mode 100644 policyengine_us_data/datasets/acs/raw_acs.py diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py new file mode 100644 index 0000000..5656048 --- /dev/null +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -0,0 +1,2 @@ +from policyengine_us.data.datasets.acs.raw_acs import RawACS +from policyengine_us.data.datasets.acs.acs import ACS diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py new file mode 100644 index 0000000..e5b1646 --- /dev/null +++ b/policyengine_us_data/datasets/acs/acs.py @@ -0,0 +1,118 @@ +import logging +from policyengine_core.data import PublicDataset +import h5py +from policyengine_us.data.datasets.acs.raw_acs import RawACS +from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER +from pandas import DataFrame + + +class ACS(PublicDataset): + name = "acs" + is_openfisca_compatible = True + label = "ACS" + folder_path = policyengine_us_MICRODATA_FOLDER + + def generate(self, year: int) -> None: + """Generates the ACS dataset. + + Args: + year (int): The year of the raw ACS to use. + """ + + # Prepare raw ACS tables + year = int(year) + if year in self.years: + self.remove(year) + if year not in RawACS.years: + RawACS.generate(year) + + raw_data = RawACS.load(year) + acs = h5py.File(ACS.file(year), mode="w") + + person, spm_unit, household = [ + raw_data[entity] for entity in ("person", "spm_unit", "household") + ] + # Add primary and foreign keys + + household.SERIALNO = household.SERIALNO.astype(int) + person.SERIALNO = person.SERIALNO.astype(int) + person.SPORDER = person.SPORDER.astype(int) + person.SPM_ID = person.SPM_ID.astype(int) + spm_unit.SPM_ID = spm_unit.SPM_ID.astype(int) + + logging.info( + f"Persons with a linked household {person.SERIALNO.isin(household.SERIALNO).mean():.1%}" + ) + person = person[person.SERIALNO.isin(household.SERIALNO)] + logging.info( + f"Households with a linked person {household.SERIALNO.isin(person.SERIALNO).mean():.1%}" + ) + household = household[household.SERIALNO.isin(person.SERIALNO)] + logging.info( + f"SPM units with a linked person {spm_unit.SPM_ID.isin(person.SPM_ID).mean():.1%}" + ) + spm_unit = spm_unit[spm_unit.SPM_ID.isin(person.SPM_ID)] + + add_id_variables(acs, person, spm_unit, household) + add_person_variables(acs, person) + add_spm_variables(acs, spm_unit) + add_household_variables(acs, household) + + raw_data.close() + acs.close() + + +ACS = ACS() + + +def add_id_variables( + acs: h5py.File, + person: DataFrame, + spm_unit: DataFrame, + household: DataFrame, +) -> None: + """Add basic ID and weight variables. + + Args: + acs (h5py.File): The ACS dataset file. + person (DataFrame): The person table of the ACS. + spm_unit (DataFrame): The SPM unit table created from the person table + of the ACS. + household (DataFrame): The household table of the ACS. + """ + acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER + acs["person_spm_unit_id"] = person.SPM_ID + acs["spm_unit_id"] = spm_unit.SPM_ID + # ACS doesn't have tax units. + acs["tax_unit_id"] = spm_unit.SPM_ID + # Until we add a family table, we'll use the person table. + acs["family_id"] = spm_unit.SPM_ID + acs["person_household_id"] = person.SERIALNO + acs["person_tax_unit_id"] = person.SPM_ID + acs["person_family_id"] = person.SPM_ID + acs["household_id"] = household.SERIALNO + + # TODO: add marital unit IDs - using person IDs for now + acs["person_marital_unit_id"] = person.SERIALNO + acs["marital_unit_id"] = person.SERIALNO.unique() + + # Add weights + acs["person_weight"] = person.PWGTP + acs["household_weight"] = household.WGTP + + +def add_person_variables(acs: h5py.File, person: DataFrame) -> None: + acs["age"] = person.AGEP + acs["employment_income"] = person.WAGP + acs["self_employment_income"] = person.SEMP + acs["total_income"] = person.PINCP + + +def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: + acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES + acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD + + +def add_household_variables(acs: h5py.File, household: DataFrame) -> None: + acs["household_vehicles_owned"] = household.VEH + acs["state_fips"] = acs["household_state_fips"] = household.ST diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py new file mode 100644 index 0000000..5104cd6 --- /dev/null +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -0,0 +1,198 @@ +from io import BytesIO +import logging +from typing import List +from zipfile import ZipFile +import pandas as pd +from policyengine_core.data import PublicDataset +import requests +from tqdm import tqdm +from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER + + +logging.getLogger().setLevel(logging.INFO) + +PERSON_COLUMNS = [ + "SERIALNO", # Household ID + "SPORDER", # Person number within household + "PWGTP", # Person weight + "AGEP", # Age + "CIT", # Citizenship + "MAR", # Marital status + "WAGP", # Wage/salary + "SSP", # Social security income + "SSIP", # Supplemental security income + "SEX", # Sex + "SEMP", # Self-employment income + "SCHL", # Educational attainment + "RETP", # Retirement income + "PAP", # Public assistance income + "OIP", # Other income + "PERNP", # Total earnings + "PINCP", # Total income + "POVPIP", # Income-to-poverty line percentage + "RAC1P", # Race +] + +HOUSEHOLD_COLUMNS = [ + "SERIALNO", # Household ID + "PUMA", # PUMA area code + "ST", # State code + "ADJHSG", # Adjustment factor for housing dollar amounts + "ADJINC", # Adjustment factor for income + "WGTP", # Household weight + "NP", # Number of persons in household + "BDSP", # Number of bedrooms + "ELEP", # Electricity monthly cost + "FULP", # Fuel monthly cost + "GASP", # Gas monthly cost + "RMSP", # Number of rooms + "RNTP", # Monthly rent + "TEN", # Tenure + "VEH", # Number of vehicles + "FINCP", # Total income + "GRNTP", # Gross rent +] + + +class RawACS(PublicDataset): + name = "raw_acs" + label = "Raw ACS" + is_openfisca_compatible = False + folder_path = policyengine_us_MICRODATA_FOLDER + + def generate(self, year: int) -> None: + year = int(year) + if year in self.years: + self.remove(year) + + spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{year}_pu.dta" + person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_pus.zip" + household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_hus.zip" + + # The data dictionary for 2019 can be found here: https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2019.pdf + + try: + with pd.HDFStore(RawACS.file(year)) as storage: + # Household file + logging.info(f"Downloading household file") + household = concat_zipped_csvs( + household_url, "psam_hus", HOUSEHOLD_COLUMNS + ) + # Remove group quarters (zero weight) + household = household[ + ~household.SERIALNO.str.contains("2019GQ") + ] + household["SERIALNO"] = household["SERIALNO"].apply( + lambda x: int(x.replace("2019HU", "")) + ) + storage["household"] = household + # Person file + logging.info(f"Downloading person file") + person = concat_zipped_csvs( + person_url, "psam_pus", PERSON_COLUMNS + ) + person = person[~person.SERIALNO.str.contains("2019GQ")] + person["SERIALNO"] = person["SERIALNO"].apply( + lambda x: int(x.replace("2019HU", "")) + ) + storage["person"] = person + # SPM unit file + logging.info(f"Downloading SPM unit file") + spm_person = pd.read_stata(spm_url).fillna(0) + spm_person.columns = spm_person.columns.str.upper() + create_spm_unit_table(storage, spm_person) + except Exception as e: + RawACS.remove(year) + logging.error( + f"Attempted to extract and save the CSV files, but encountered an error: {e}" + ) + raise e + + +RawACS = RawACS() + + +def concat_zipped_csvs( + url: str, prefix: str, columns: List[str] +) -> pd.DataFrame: + """Downloads the ACS microdata, which is a zip file containing two halves in CSV format. + + Args: + url (str): The URL of the data server. + prefix (str): The prefix of the filenames, before a/b.csv. + columns (List[str]): The columns to filter (avoids hitting memory limits). + + Returns: + pd.DataFrame: The concatenated DataFrame. + """ + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + logging.info(f"Loading the first half of the dataset") + a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns) + logging.info(f"Loading the second half of the dataset") + b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns) + logging.info(f"Concatenating datasets") + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + return res + + +def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: + SPM_UNIT_COLUMNS = [ + "CAPHOUSESUB", + "CAPWKCCXPNS", + "CHILDCAREXPNS", + "EITC", + "ENGVAL", + "EQUIVSCALE", + "FEDTAX", + "FEDTAXBC", + "FICA", + "GEOADJ", + "MEDXPNS", + "NUMADULTS", + "NUMKIDS", + "NUMPER", + "POOR", + "POVTHRESHOLD", + "RESOURCES", + "SCHLUNCH", + "SNAPSUB", + "STTAX", + "TENMORTSTATUS", + "TOTVAL", + "WCOHABIT", + "WICVAL", + "WKXPNS", + "WUI_LT15", + "ID", + ] + spm_table = ( + person[["SPM_" + column for column in SPM_UNIT_COLUMNS]] + .groupby(person.SPM_ID) + .first() + ) + + original_person_table = storage["person"] + # Ensure that join keys are the same type. + JOIN_COLUMNS = ["SERIALNO", "SPORDER"] + original_person_table[JOIN_COLUMNS] = original_person_table[ + JOIN_COLUMNS + ].astype(int) + person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) + # Add SPM_ID from the SPM person table to the original person table. + combined_person_table = pd.merge( + original_person_table, + person[JOIN_COLUMNS + ["SPM_ID"]], + on=JOIN_COLUMNS, + ) + + storage["person"] = combined_person_table + storage["spm_unit"] = spm_table From 5065de3cd42f61f90573c8b57322a322fd796a0b Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Fri, 13 Sep 2024 01:31:39 +0200 Subject: [PATCH 02/38] populate acs --- acs.py | 3 + .../data_storage/upload_completed_datasets.py | 8 + policyengine_us_data/datasets/__init__.py | 3 +- policyengine_us_data/datasets/acs/__init__.py | 4 +- policyengine_us_data/datasets/acs/acs.py | 180 ++++++-------- policyengine_us_data/datasets/acs/raw_acs.py | 227 +++++++++--------- 6 files changed, 201 insertions(+), 224 deletions(-) create mode 100644 acs.py diff --git a/acs.py b/acs.py new file mode 100644 index 0000000..aabb704 --- /dev/null +++ b/acs.py @@ -0,0 +1,3 @@ +from policyengine_us_data import ACS_2022 + +ACS_2022().generate() diff --git a/policyengine_us_data/data_storage/upload_completed_datasets.py b/policyengine_us_data/data_storage/upload_completed_datasets.py index 335a99f..130359c 100644 --- a/policyengine_us_data/data_storage/upload_completed_datasets.py +++ b/policyengine_us_data/data_storage/upload_completed_datasets.py @@ -26,3 +26,11 @@ "puf_2024.h5", FOLDER / "puf_2015.h5", ) + +upload( + "PolicyEngine", + "policyengine-us-data", + "release", + "acs_2022.h5", + FOLDER / "acs_2022.h5", +) diff --git a/policyengine_us_data/datasets/__init__.py b/policyengine_us_data/datasets/__init__.py index 1aec253..a49fd1b 100644 --- a/policyengine_us_data/datasets/__init__.py +++ b/policyengine_us_data/datasets/__init__.py @@ -1,4 +1,5 @@ from .cps import * from .puf import * +from .acs import * -DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024] +DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024, ACS_2022] diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py index 5656048..5e08156 100644 --- a/policyengine_us_data/datasets/acs/__init__.py +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -1,2 +1,2 @@ -from policyengine_us.data.datasets.acs.raw_acs import RawACS -from policyengine_us.data.datasets.acs.acs import ACS +from .acs import * +from .raw_acs import * \ No newline at end of file diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index e5b1646..3ab8b7c 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -1,118 +1,86 @@ import logging -from policyengine_core.data import PublicDataset +from policyengine_core.data import Dataset import h5py -from policyengine_us.data.datasets.acs.raw_acs import RawACS -from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER +from policyengine_us_data.datasets.acs.raw_acs import RawACS +from policyengine_us_data.data_storage import STORAGE_FOLDER from pandas import DataFrame +import os -class ACS(PublicDataset): +class ACS(Dataset): name = "acs" - is_openfisca_compatible = True label = "ACS" - folder_path = policyengine_us_MICRODATA_FOLDER - - def generate(self, year: int) -> None: - """Generates the ACS dataset. - - Args: - year (int): The year of the raw ACS to use. - """ - - # Prepare raw ACS tables - year = int(year) - if year in self.years: - self.remove(year) - if year not in RawACS.years: - RawACS.generate(year) - - raw_data = RawACS.load(year) - acs = h5py.File(ACS.file(year), mode="w") - + data_format = Dataset.ARRAYS + time_period = None + + def __init__(self): + super().__init__() + self.raw_acs = RawACS() + + def generate(self) -> None: + """Generates the ACS dataset.""" + if self.time_period is None: + raise ValueError("time_period must be set in child classes") + + if os.path.exists(self.file_path): + os.remove(self.file_path) + + if self.time_period not in self.raw_acs.years: + self.raw_acs.generate(self.time_period) + + raw_data = self.raw_acs.load(self.time_period) + acs = h5py.File(self.file_path, mode="w") person, spm_unit, household = [ raw_data[entity] for entity in ("person", "spm_unit", "household") ] - # Add primary and foreign keys - - household.SERIALNO = household.SERIALNO.astype(int) - person.SERIALNO = person.SERIALNO.astype(int) - person.SPORDER = person.SPORDER.astype(int) - person.SPM_ID = person.SPM_ID.astype(int) - spm_unit.SPM_ID = spm_unit.SPM_ID.astype(int) - - logging.info( - f"Persons with a linked household {person.SERIALNO.isin(household.SERIALNO).mean():.1%}" - ) - person = person[person.SERIALNO.isin(household.SERIALNO)] - logging.info( - f"Households with a linked person {household.SERIALNO.isin(person.SERIALNO).mean():.1%}" - ) - household = household[household.SERIALNO.isin(person.SERIALNO)] - logging.info( - f"SPM units with a linked person {spm_unit.SPM_ID.isin(person.SPM_ID).mean():.1%}" - ) - spm_unit = spm_unit[spm_unit.SPM_ID.isin(person.SPM_ID)] - - add_id_variables(acs, person, spm_unit, household) - add_person_variables(acs, person) - add_spm_variables(acs, spm_unit) - add_household_variables(acs, household) - - raw_data.close() + + self.add_id_variables(acs, person, spm_unit, household) + self.add_person_variables(acs, person) + self.add_spm_variables(acs, spm_unit) + self.add_household_variables(acs, household) + acs.close() - -ACS = ACS() - - -def add_id_variables( - acs: h5py.File, - person: DataFrame, - spm_unit: DataFrame, - household: DataFrame, -) -> None: - """Add basic ID and weight variables. - - Args: - acs (h5py.File): The ACS dataset file. - person (DataFrame): The person table of the ACS. - spm_unit (DataFrame): The SPM unit table created from the person table - of the ACS. - household (DataFrame): The household table of the ACS. - """ - acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER - acs["person_spm_unit_id"] = person.SPM_ID - acs["spm_unit_id"] = spm_unit.SPM_ID - # ACS doesn't have tax units. - acs["tax_unit_id"] = spm_unit.SPM_ID - # Until we add a family table, we'll use the person table. - acs["family_id"] = spm_unit.SPM_ID - acs["person_household_id"] = person.SERIALNO - acs["person_tax_unit_id"] = person.SPM_ID - acs["person_family_id"] = person.SPM_ID - acs["household_id"] = household.SERIALNO - - # TODO: add marital unit IDs - using person IDs for now - acs["person_marital_unit_id"] = person.SERIALNO - acs["marital_unit_id"] = person.SERIALNO.unique() - - # Add weights - acs["person_weight"] = person.PWGTP - acs["household_weight"] = household.WGTP - - -def add_person_variables(acs: h5py.File, person: DataFrame) -> None: - acs["age"] = person.AGEP - acs["employment_income"] = person.WAGP - acs["self_employment_income"] = person.SEMP - acs["total_income"] = person.PINCP - - -def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: - acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES - acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD - - -def add_household_variables(acs: h5py.File, household: DataFrame) -> None: - acs["household_vehicles_owned"] = household.VEH - acs["state_fips"] = acs["household_state_fips"] = household.ST + @staticmethod + def add_id_variables( + acs: h5py.File, + person: DataFrame, + spm_unit: DataFrame, + household: DataFrame, + ) -> None: + acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER + acs["person_spm_unit_id"] = person.SPM_ID + acs["spm_unit_id"] = spm_unit.SPM_ID + acs["tax_unit_id"] = spm_unit.SPM_ID + acs["family_id"] = spm_unit.SPM_ID + acs["person_household_id"] = person.SERIALNO + acs["person_tax_unit_id"] = person.SPM_ID + acs["person_family_id"] = person.SPM_ID + acs["household_id"] = household.SERIALNO + acs["person_marital_unit_id"] = person.SERIALNO + acs["marital_unit_id"] = person.SERIALNO.unique() + acs["person_weight"] = person.PWGTP + acs["household_weight"] = household.WGTP + + @staticmethod + def add_person_variables(acs: h5py.File, person: DataFrame) -> None: + acs["age"] = person.AGEP + acs["employment_income"] = person.WAGP + acs["self_employment_income"] = person.SEMP + acs["total_income"] = person.PINCP + + @staticmethod + def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: + acs["spm_unit_net_income_reported"] = spm_unit.SPM_RESOURCES + acs["spm_unit_spm_threshold"] = spm_unit.SPM_POVTHRESHOLD + + @staticmethod + def add_household_variables(acs: h5py.File, household: DataFrame) -> None: + acs["household_vehicles_owned"] = household.VEH + acs["state_fips"] = acs["household_state_fips"] = household.ST + +class ACS_2022(ACS): + name = "acs_2022" + label = "ACS 2022" + time_period = 2022 + file_path = STORAGE_FOLDER / "acs_2022.h5" diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index 5104cd6..c714f47 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -3,11 +3,10 @@ from typing import List from zipfile import ZipFile import pandas as pd -from policyengine_core.data import PublicDataset +from policyengine_core.data import Dataset import requests from tqdm import tqdm -from policyengine_us.data.storage import policyengine_us_MICRODATA_FOLDER - +from policyengine_us_data.data_storage import STORAGE_FOLDER logging.getLogger().setLevel(logging.INFO) @@ -54,11 +53,16 @@ ] -class RawACS(PublicDataset): +class RawACS(Dataset): name = "raw_acs" label = "Raw ACS" - is_openfisca_compatible = False - folder_path = policyengine_us_MICRODATA_FOLDER + data_format = Dataset.TABLES + years = [] # This will be populated as datasets are generated + file_path = STORAGE_FOLDER / "raw_acs_{year}.h5" + + @staticmethod + def file(year: int): + return STORAGE_FOLDER / f"raw_acs_{year}.h5" def generate(self, year: int) -> None: year = int(year) @@ -69,130 +73,123 @@ def generate(self, year: int) -> None: person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_pus.zip" household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_hus.zip" - # The data dictionary for 2019 can be found here: https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2019.pdf - try: - with pd.HDFStore(RawACS.file(year)) as storage: - # Household file + with pd.HDFStore(self.file(year)) as storage: logging.info(f"Downloading household file") - household = concat_zipped_csvs( + household = self.concat_zipped_csvs( household_url, "psam_hus", HOUSEHOLD_COLUMNS ) - # Remove group quarters (zero weight) - household = household[ - ~household.SERIALNO.str.contains("2019GQ") - ] - household["SERIALNO"] = household["SERIALNO"].apply( - lambda x: int(x.replace("2019HU", "")) - ) storage["household"] = household - # Person file + logging.info(f"Downloading person file") - person = concat_zipped_csvs( + person = self.concat_zipped_csvs( person_url, "psam_pus", PERSON_COLUMNS ) - person = person[~person.SERIALNO.str.contains("2019GQ")] - person["SERIALNO"] = person["SERIALNO"].apply( - lambda x: int(x.replace("2019HU", "")) - ) storage["person"] = person - # SPM unit file + logging.info(f"Downloading SPM unit file") spm_person = pd.read_stata(spm_url).fillna(0) spm_person.columns = spm_person.columns.str.upper() - create_spm_unit_table(storage, spm_person) + self.create_spm_unit_table(storage, spm_person) + + self.years.append(year) # Add the year to the list of available years + logging.info(f"Successfully generated Raw ACS data for {year}") except Exception as e: - RawACS.remove(year) + self.remove(year) logging.error( f"Attempted to extract and save the CSV files, but encountered an error: {e}" ) raise e - -RawACS = RawACS() - - -def concat_zipped_csvs( - url: str, prefix: str, columns: List[str] -) -> pd.DataFrame: - """Downloads the ACS microdata, which is a zip file containing two halves in CSV format. - - Args: - url (str): The URL of the data server. - prefix (str): The prefix of the filenames, before a/b.csv. - columns (List[str]): The columns to filter (avoids hitting memory limits). - - Returns: - pd.DataFrame: The concatenated DataFrame. - """ - req = requests.get(url, stream=True) - with BytesIO() as f: - pbar = tqdm() - for chunk in req.iter_content(chunk_size=1024): - if chunk: # filter out keep-alive new chunks - pbar.update(len(chunk)) - f.write(chunk) - f.seek(0) - zf = ZipFile(f) - logging.info(f"Loading the first half of the dataset") - a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns) - logging.info(f"Loading the second half of the dataset") - b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns) - logging.info(f"Concatenating datasets") - res = pd.concat([a, b]).fillna(0) - res.columns = res.columns.str.upper() - return res - - -def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: - SPM_UNIT_COLUMNS = [ - "CAPHOUSESUB", - "CAPWKCCXPNS", - "CHILDCAREXPNS", - "EITC", - "ENGVAL", - "EQUIVSCALE", - "FEDTAX", - "FEDTAXBC", - "FICA", - "GEOADJ", - "MEDXPNS", - "NUMADULTS", - "NUMKIDS", - "NUMPER", - "POOR", - "POVTHRESHOLD", - "RESOURCES", - "SCHLUNCH", - "SNAPSUB", - "STTAX", - "TENMORTSTATUS", - "TOTVAL", - "WCOHABIT", - "WICVAL", - "WKXPNS", - "WUI_LT15", - "ID", - ] - spm_table = ( - person[["SPM_" + column for column in SPM_UNIT_COLUMNS]] - .groupby(person.SPM_ID) - .first() - ) - - original_person_table = storage["person"] - # Ensure that join keys are the same type. - JOIN_COLUMNS = ["SERIALNO", "SPORDER"] - original_person_table[JOIN_COLUMNS] = original_person_table[ - JOIN_COLUMNS - ].astype(int) - person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) - # Add SPM_ID from the SPM person table to the original person table. - combined_person_table = pd.merge( - original_person_table, - person[JOIN_COLUMNS + ["SPM_ID"]], - on=JOIN_COLUMNS, - ) - - storage["person"] = combined_person_table - storage["spm_unit"] = spm_table + @staticmethod + def concat_zipped_csvs( + url: str, prefix: str, columns: List[str] + ) -> pd.DataFrame: + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + logging.info(f"Loading the first half of the dataset") + a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns) + logging.info(f"Loading the second half of the dataset") + b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns) + logging.info(f"Concatenating datasets") + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + return res + + @staticmethod + def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: + SPM_UNIT_COLUMNS = [ + "CAPHOUSESUB", + "CAPWKCCXPNS", + "CHILDCAREXPNS", + "EITC", + "ENGVAL", + "EQUIVSCALE", + "FEDTAX", + "FEDTAXBC", + "FICA", + "GEOADJ", + "MEDXPNS", + "NUMADULTS", + "NUMKIDS", + "NUMPER", + "POOR", + "POVTHRESHOLD", + "RESOURCES", + "SCHLUNCH", + "SNAPSUB", + "STTAX", + "TENMORTSTATUS", + "TOTVAL", + "WCOHABIT", + "WICVAL", + "WKXPNS", + "WUI_LT15", + "ID", + ] + spm_table = ( + person[["SPM_" + column for column in SPM_UNIT_COLUMNS]] + .groupby(person.SPM_ID) + .first() + ) + + original_person_table = storage["person"] + + # Convert SERIALNO to string in both DataFrames + JOIN_COLUMNS = ["SERIALNO", "SPORDER"] + original_person_table[JOIN_COLUMNS] = original_person_table[JOIN_COLUMNS].astype(int) + person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) + + # Add SPM_ID from the SPM person table to the original person table. + combined_person_table = pd.merge( + original_person_table, + person[JOIN_COLUMNS + ["SPM_ID"]], + on=JOIN_COLUMNS + ) + + storage["person"] = combined_person_table + storage["spm_unit"] = spm_table + + def load(self, year: int) -> dict: + if not self.file(year).exists(): + raise FileNotFoundError(f"Raw ACS data for {year} not found. Please generate it first.") + + with pd.HDFStore(self.file(year), mode="r") as store: + return { + "person": store["person"], + "household": store["household"], + "spm_unit": store["spm_unit"], + } + + def remove(self, year: int) -> None: + if self.file(year).exists(): + self.file(year).unlink() + if year in self.years: + self.years.remove(year) From b9e0f227ded4622968f4df6e00ff1ec43747aa05 Mon Sep 17 00:00:00 2001 From: "Github Actions[bot]" Date: Thu, 12 Sep 2024 23:31:58 +0000 Subject: [PATCH 03/38] Update PolicyEngine US data --- CHANGELOG.md | 12 ++++++++++++ changelog.yaml | 9 ++++++++- changelog_entry.yaml | 6 ------ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 625a96f..89eab24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.0] - 2024-09-12 23:31:57 + +### Changed + +- Improved logging +- Updated required Python version +- Removed setuptools_scm + ## [1.0.0] - 2024-09-09 17:29:10 ### Added - Initialized changelogging + + + +[1.1.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.0.0...1.1.0 diff --git a/changelog.yaml b/changelog.yaml index ae72ed7..24cf8e4 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -2,4 +2,11 @@ added: - Initialized changelogging date: 2024-09-09 17:29:10 - version: 1.0.0 \ No newline at end of file + version: 1.0.0 +- bump: minor + changes: + changed: + - Improved logging + - Updated required Python version + - Removed setuptools_scm + date: 2024-09-12 23:31:57 diff --git a/changelog_entry.yaml b/changelog_entry.yaml index 00a9a58..e69de29 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,6 +0,0 @@ -- bump: minor - changes: - changed: - - Improved logging - - Updated required Python version - - Removed setuptools_scm \ No newline at end of file From 74b53bf9a8fa4350f60da3ad3ece1fab06aca7d0 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Fri, 13 Sep 2024 01:36:24 +0200 Subject: [PATCH 04/38] format --- policyengine_us_data/datasets/acs/__init__.py | 2 +- policyengine_us_data/datasets/acs/acs.py | 15 ++++++------ policyengine_us_data/datasets/acs/raw_acs.py | 24 ++++++++++++------- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py index 5e08156..d44317b 100644 --- a/policyengine_us_data/datasets/acs/__init__.py +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -1,2 +1,2 @@ from .acs import * -from .raw_acs import * \ No newline at end of file +from .raw_acs import * diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 3ab8b7c..8b290c4 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -11,34 +11,34 @@ class ACS(Dataset): name = "acs" label = "ACS" data_format = Dataset.ARRAYS - time_period = None + time_period = None def __init__(self): super().__init__() - self.raw_acs = RawACS() + self.raw_acs = RawACS() def generate(self) -> None: """Generates the ACS dataset.""" if self.time_period is None: raise ValueError("time_period must be set in child classes") - + if os.path.exists(self.file_path): os.remove(self.file_path) - + if self.time_period not in self.raw_acs.years: self.raw_acs.generate(self.time_period) - + raw_data = self.raw_acs.load(self.time_period) acs = h5py.File(self.file_path, mode="w") person, spm_unit, household = [ raw_data[entity] for entity in ("person", "spm_unit", "household") ] - + self.add_id_variables(acs, person, spm_unit, household) self.add_person_variables(acs, person) self.add_spm_variables(acs, spm_unit) self.add_household_variables(acs, household) - + acs.close() @staticmethod @@ -79,6 +79,7 @@ def add_household_variables(acs: h5py.File, household: DataFrame) -> None: acs["household_vehicles_owned"] = household.VEH acs["state_fips"] = acs["household_state_fips"] = household.ST + class ACS_2022(ACS): name = "acs_2022" label = "ACS 2022" diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index c714f47..cf599ce 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -92,7 +92,9 @@ def generate(self, year: int) -> None: spm_person.columns = spm_person.columns.str.upper() self.create_spm_unit_table(storage, spm_person) - self.years.append(year) # Add the year to the list of available years + self.years.append( + year + ) # Add the year to the list of available years logging.info(f"Successfully generated Raw ACS data for {year}") except Exception as e: self.remove(year) @@ -124,7 +126,9 @@ def concat_zipped_csvs( return res @staticmethod - def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: + def create_spm_unit_table( + storage: pd.HDFStore, person: pd.DataFrame + ) -> None: SPM_UNIT_COLUMNS = [ "CAPHOUSESUB", "CAPWKCCXPNS", @@ -161,17 +165,19 @@ def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: ) original_person_table = storage["person"] - + # Convert SERIALNO to string in both DataFrames JOIN_COLUMNS = ["SERIALNO", "SPORDER"] - original_person_table[JOIN_COLUMNS] = original_person_table[JOIN_COLUMNS].astype(int) + original_person_table[JOIN_COLUMNS] = original_person_table[ + JOIN_COLUMNS + ].astype(int) person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) - + # Add SPM_ID from the SPM person table to the original person table. combined_person_table = pd.merge( original_person_table, person[JOIN_COLUMNS + ["SPM_ID"]], - on=JOIN_COLUMNS + on=JOIN_COLUMNS, ) storage["person"] = combined_person_table @@ -179,8 +185,10 @@ def create_spm_unit_table(storage: pd.HDFStore, person: pd.DataFrame) -> None: def load(self, year: int) -> dict: if not self.file(year).exists(): - raise FileNotFoundError(f"Raw ACS data for {year} not found. Please generate it first.") - + raise FileNotFoundError( + f"Raw ACS data for {year} not found. Please generate it first." + ) + with pd.HDFStore(self.file(year), mode="r") as store: return { "person": store["person"], From 0ba70a69a32c794083d8bd0cbb0849ff435db7f7 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Fri, 13 Sep 2024 12:25:17 +0200 Subject: [PATCH 05/38] data fix --- policyengine_us_data/datasets/acs/acs.py | 24 ++++--- policyengine_us_data/datasets/acs/raw_acs.py | 69 ++++++++++++-------- 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 8b290c4..98b46a0 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -4,9 +4,9 @@ from policyengine_us_data.datasets.acs.raw_acs import RawACS from policyengine_us_data.data_storage import STORAGE_FOLDER from pandas import DataFrame +import numpy as np import os - class ACS(Dataset): name = "acs" label = "ACS" @@ -48,17 +48,21 @@ def add_id_variables( spm_unit: DataFrame, household: DataFrame, ) -> None: - acs["person_id"] = person.SERIALNO * 1e2 + person.SPORDER + # Create numeric IDs based on SERIALNO + person['numeric_id'] = person['SERIALNO'].astype('category').cat.codes + household['numeric_id'] = household['SERIALNO'].astype('category').cat.codes + + acs["person_id"] = person['numeric_id'] * 100 + person.SPORDER.astype(int) acs["person_spm_unit_id"] = person.SPM_ID - acs["spm_unit_id"] = spm_unit.SPM_ID - acs["tax_unit_id"] = spm_unit.SPM_ID - acs["family_id"] = spm_unit.SPM_ID - acs["person_household_id"] = person.SERIALNO + acs["spm_unit_id"] = spm_unit.index + acs["tax_unit_id"] = spm_unit.index # Using SPM unit as proxy for tax unit + acs["family_id"] = spm_unit.index # Using SPM unit as proxy for family + acs["person_household_id"] = person['numeric_id'] acs["person_tax_unit_id"] = person.SPM_ID acs["person_family_id"] = person.SPM_ID - acs["household_id"] = household.SERIALNO - acs["person_marital_unit_id"] = person.SERIALNO - acs["marital_unit_id"] = person.SERIALNO.unique() + acs["household_id"] = household['numeric_id'] + acs["person_marital_unit_id"] = person['numeric_id'] + acs["marital_unit_id"] = np.unique(person['numeric_id']) acs["person_weight"] = person.PWGTP acs["household_weight"] = household.WGTP @@ -77,7 +81,7 @@ def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: @staticmethod def add_household_variables(acs: h5py.File, household: DataFrame) -> None: acs["household_vehicles_owned"] = household.VEH - acs["state_fips"] = acs["household_state_fips"] = household.ST + acs["state_fips"] = acs["household_state_fips"] = household.ST.astype(int) class ACS_2022(ACS): diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index cf599ce..f5be4b7 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -52,7 +52,6 @@ "GRNTP", # Gross rent ] - class RawACS(Dataset): name = "raw_acs" label = "Raw ACS" @@ -76,15 +75,11 @@ def generate(self, year: int) -> None: try: with pd.HDFStore(self.file(year)) as storage: logging.info(f"Downloading household file") - household = self.concat_zipped_csvs( - household_url, "psam_hus", HOUSEHOLD_COLUMNS - ) + household = self.process_household_data(household_url, "psam_hus", HOUSEHOLD_COLUMNS) storage["household"] = household logging.info(f"Downloading person file") - person = self.concat_zipped_csvs( - person_url, "psam_pus", PERSON_COLUMNS - ) + person = self.process_person_data(person_url, "psam_pus", PERSON_COLUMNS) storage["person"] = person logging.info(f"Downloading SPM unit file") @@ -92,21 +87,39 @@ def generate(self, year: int) -> None: spm_person.columns = spm_person.columns.str.upper() self.create_spm_unit_table(storage, spm_person) - self.years.append( - year - ) # Add the year to the list of available years + self.years.append(year) logging.info(f"Successfully generated Raw ACS data for {year}") except Exception as e: self.remove(year) - logging.error( - f"Attempted to extract and save the CSV files, but encountered an error: {e}" - ) + logging.error(f"Error generating Raw ACS data for {year}: {e}") raise e @staticmethod - def concat_zipped_csvs( - url: str, prefix: str, columns: List[str] - ) -> pd.DataFrame: + def process_household_data(url: str, prefix: str, columns: List[str]) -> pd.DataFrame: + req = requests.get(url, stream=True) + with BytesIO() as f: + pbar = tqdm() + for chunk in req.iter_content(chunk_size=1024): + if chunk: + pbar.update(len(chunk)) + f.write(chunk) + f.seek(0) + zf = ZipFile(f) + logging.info(f"Loading the first half of the household dataset") + a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns, dtype={'SERIALNO': str}) + logging.info(f"Loading the second half of the household dataset") + b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns, dtype={'SERIALNO': str}) + logging.info(f"Concatenating household datasets") + res = pd.concat([a, b]).fillna(0) + res.columns = res.columns.str.upper() + + # Ensure correct data types + res['ST'] = res['ST'].astype(int) + + return res + + @staticmethod + def process_person_data(url: str, prefix: str, columns: List[str]) -> pd.DataFrame: req = requests.get(url, stream=True) with BytesIO() as f: pbar = tqdm() @@ -116,13 +129,17 @@ def concat_zipped_csvs( f.write(chunk) f.seek(0) zf = ZipFile(f) - logging.info(f"Loading the first half of the dataset") - a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns) - logging.info(f"Loading the second half of the dataset") - b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns) - logging.info(f"Concatenating datasets") + logging.info(f"Loading the first half of the person dataset") + a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns, dtype={'SERIALNO': str}) + logging.info(f"Loading the second half of the person dataset") + b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns, dtype={'SERIALNO': str}) + logging.info(f"Concatenating person datasets") res = pd.concat([a, b]).fillna(0) res.columns = res.columns.str.upper() + + # Ensure correct data types + res['SPORDER'] = res['SPORDER'].astype(int) + return res @staticmethod @@ -166,12 +183,12 @@ def create_spm_unit_table( original_person_table = storage["person"] - # Convert SERIALNO to string in both DataFrames + # Ensure SERIALNO is treated as string JOIN_COLUMNS = ["SERIALNO", "SPORDER"] - original_person_table[JOIN_COLUMNS] = original_person_table[ - JOIN_COLUMNS - ].astype(int) - person[JOIN_COLUMNS] = person[JOIN_COLUMNS].astype(int) + original_person_table['SERIALNO'] = original_person_table['SERIALNO'].astype(str) + original_person_table['SPORDER'] = original_person_table['SPORDER'].astype(int) + person['SERIALNO'] = person['SERIALNO'].astype(str) + person['SPORDER'] = person['SPORDER'].astype(int) # Add SPM_ID from the SPM person table to the original person table. combined_person_table = pd.merge( From 380d6b2999ab29fb38d329f93debe7604ec901f3 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Fri, 13 Sep 2024 18:21:41 +0200 Subject: [PATCH 06/38] test --- acs.py | 3 - policyengine_us_data/datasets/acs/acs.py | 27 +++++--- policyengine_us_data/datasets/acs/raw_acs.py | 65 ++++++++++++++----- .../tests/test_datasets/test_acs.py | 52 +++++++++++++++ 4 files changed, 117 insertions(+), 30 deletions(-) delete mode 100644 acs.py create mode 100644 policyengine_us_data/tests/test_datasets/test_acs.py diff --git a/acs.py b/acs.py deleted file mode 100644 index aabb704..0000000 --- a/acs.py +++ /dev/null @@ -1,3 +0,0 @@ -from policyengine_us_data import ACS_2022 - -ACS_2022().generate() diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 98b46a0..d10cba2 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -7,6 +7,7 @@ import numpy as np import os + class ACS(Dataset): name = "acs" label = "ACS" @@ -49,20 +50,26 @@ def add_id_variables( household: DataFrame, ) -> None: # Create numeric IDs based on SERIALNO - person['numeric_id'] = person['SERIALNO'].astype('category').cat.codes - household['numeric_id'] = household['SERIALNO'].astype('category').cat.codes + person["numeric_id"] = person["SERIALNO"].astype("category").cat.codes + household["numeric_id"] = ( + household["SERIALNO"].astype("category").cat.codes + ) - acs["person_id"] = person['numeric_id'] * 100 + person.SPORDER.astype(int) + acs["person_id"] = person["numeric_id"] * 100 + person.SPORDER.astype( + int + ) acs["person_spm_unit_id"] = person.SPM_ID acs["spm_unit_id"] = spm_unit.index - acs["tax_unit_id"] = spm_unit.index # Using SPM unit as proxy for tax unit + acs["tax_unit_id"] = ( + spm_unit.index + ) # Using SPM unit as proxy for tax unit acs["family_id"] = spm_unit.index # Using SPM unit as proxy for family - acs["person_household_id"] = person['numeric_id'] + acs["person_household_id"] = person["numeric_id"] acs["person_tax_unit_id"] = person.SPM_ID acs["person_family_id"] = person.SPM_ID - acs["household_id"] = household['numeric_id'] - acs["person_marital_unit_id"] = person['numeric_id'] - acs["marital_unit_id"] = np.unique(person['numeric_id']) + acs["household_id"] = household["numeric_id"] + acs["person_marital_unit_id"] = person["numeric_id"] + acs["marital_unit_id"] = np.unique(person["numeric_id"]) acs["person_weight"] = person.PWGTP acs["household_weight"] = household.WGTP @@ -81,7 +88,9 @@ def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: @staticmethod def add_household_variables(acs: h5py.File, household: DataFrame) -> None: acs["household_vehicles_owned"] = household.VEH - acs["state_fips"] = acs["household_state_fips"] = household.ST.astype(int) + acs["state_fips"] = acs["household_state_fips"] = household.ST.astype( + int + ) class ACS_2022(ACS): diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index f5be4b7..483c4a0 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -52,6 +52,7 @@ "GRNTP", # Gross rent ] + class RawACS(Dataset): name = "raw_acs" label = "Raw ACS" @@ -75,11 +76,15 @@ def generate(self, year: int) -> None: try: with pd.HDFStore(self.file(year)) as storage: logging.info(f"Downloading household file") - household = self.process_household_data(household_url, "psam_hus", HOUSEHOLD_COLUMNS) + household = self.process_household_data( + household_url, "psam_hus", HOUSEHOLD_COLUMNS + ) storage["household"] = household logging.info(f"Downloading person file") - person = self.process_person_data(person_url, "psam_pus", PERSON_COLUMNS) + person = self.process_person_data( + person_url, "psam_pus", PERSON_COLUMNS + ) storage["person"] = person logging.info(f"Downloading SPM unit file") @@ -95,7 +100,9 @@ def generate(self, year: int) -> None: raise e @staticmethod - def process_household_data(url: str, prefix: str, columns: List[str]) -> pd.DataFrame: + def process_household_data( + url: str, prefix: str, columns: List[str] + ) -> pd.DataFrame: req = requests.get(url, stream=True) with BytesIO() as f: pbar = tqdm() @@ -106,20 +113,30 @@ def process_household_data(url: str, prefix: str, columns: List[str]) -> pd.Data f.seek(0) zf = ZipFile(f) logging.info(f"Loading the first half of the household dataset") - a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns, dtype={'SERIALNO': str}) + a = pd.read_csv( + zf.open(prefix + "a.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) logging.info(f"Loading the second half of the household dataset") - b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns, dtype={'SERIALNO': str}) + b = pd.read_csv( + zf.open(prefix + "b.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) logging.info(f"Concatenating household datasets") res = pd.concat([a, b]).fillna(0) res.columns = res.columns.str.upper() - + # Ensure correct data types - res['ST'] = res['ST'].astype(int) - + res["ST"] = res["ST"].astype(int) + return res @staticmethod - def process_person_data(url: str, prefix: str, columns: List[str]) -> pd.DataFrame: + def process_person_data( + url: str, prefix: str, columns: List[str] + ) -> pd.DataFrame: req = requests.get(url, stream=True) with BytesIO() as f: pbar = tqdm() @@ -130,16 +147,24 @@ def process_person_data(url: str, prefix: str, columns: List[str]) -> pd.DataFra f.seek(0) zf = ZipFile(f) logging.info(f"Loading the first half of the person dataset") - a = pd.read_csv(zf.open(prefix + "a.csv"), usecols=columns, dtype={'SERIALNO': str}) + a = pd.read_csv( + zf.open(prefix + "a.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) logging.info(f"Loading the second half of the person dataset") - b = pd.read_csv(zf.open(prefix + "b.csv"), usecols=columns, dtype={'SERIALNO': str}) + b = pd.read_csv( + zf.open(prefix + "b.csv"), + usecols=columns, + dtype={"SERIALNO": str}, + ) logging.info(f"Concatenating person datasets") res = pd.concat([a, b]).fillna(0) res.columns = res.columns.str.upper() - + # Ensure correct data types - res['SPORDER'] = res['SPORDER'].astype(int) - + res["SPORDER"] = res["SPORDER"].astype(int) + return res @staticmethod @@ -185,10 +210,14 @@ def create_spm_unit_table( # Ensure SERIALNO is treated as string JOIN_COLUMNS = ["SERIALNO", "SPORDER"] - original_person_table['SERIALNO'] = original_person_table['SERIALNO'].astype(str) - original_person_table['SPORDER'] = original_person_table['SPORDER'].astype(int) - person['SERIALNO'] = person['SERIALNO'].astype(str) - person['SPORDER'] = person['SPORDER'].astype(int) + original_person_table["SERIALNO"] = original_person_table[ + "SERIALNO" + ].astype(str) + original_person_table["SPORDER"] = original_person_table[ + "SPORDER" + ].astype(int) + person["SERIALNO"] = person["SERIALNO"].astype(str) + person["SPORDER"] = person["SPORDER"].astype(int) # Add SPM_ID from the SPM person table to the original person table. combined_person_table = pd.merge( diff --git a/policyengine_us_data/tests/test_datasets/test_acs.py b/policyengine_us_data/tests/test_datasets/test_acs.py new file mode 100644 index 0000000..c346866 --- /dev/null +++ b/policyengine_us_data/tests/test_datasets/test_acs.py @@ -0,0 +1,52 @@ +import pytest +from policyengine_us import Microsimulation + + +@pytest.mark.parametrize("year", [2022]) +def test_acs_generates(year: int): + from policyengine_us_data.datasets.acs.acs import ACS_2022 + + dataset_by_year = { + 2022: ACS_2022, + } + + dataset = dataset_by_year[year]() + dataset.generate() # This will generate the dataset + + +@pytest.mark.parametrize("year", [2022]) +def test_acs_loads(year: int): + from policyengine_us_data.datasets.acs.acs import ACS_2022 + + dataset_by_year = { + 2022: ACS_2022, + } + + dataset = dataset_by_year[year]() + dataset.generate() # Ensure the dataset is generated before loading + + sim = Microsimulation(dataset=dataset) + + assert not sim.calculate("household_net_income").isna().any() + + +@pytest.mark.parametrize("year", [2022]) +def test_acs_has_all_tables(year: int): + from policyengine_us_data.datasets.acs.acs import ACS_2022 + + dataset_by_year = { + 2022: ACS_2022, + } + + dataset = dataset_by_year[year]() + dataset.generate() # Ensure the dataset is generated before checking tables + + TABLES = [ + "person", + "household", + "spm_unit", + ] + + for table in TABLES: + df = dataset.load(table) + assert len(df) > 0 From c72d8131563f054ae1f11e419cc83bb81d148327 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Fri, 13 Sep 2024 18:23:22 +0200 Subject: [PATCH 07/38] changelog --- changelog_entry.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..a43c7b1 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Migrate the ACS Dataset to the repository. From 9d2e3401d79b8a491db0ad7a47026a3f8ab71fe9 Mon Sep 17 00:00:00 2001 From: "Github Actions[bot]" Date: Fri, 13 Sep 2024 16:23:46 +0000 Subject: [PATCH 08/38] Update PolicyEngine US data --- CHANGELOG.md | 7 +++++++ changelog.yaml | 5 +++++ changelog_entry.yaml | 4 ---- pyproject.toml | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1250833..8d50467 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.2.0] - 2024-09-13 16:23:45 + +### Added + +- Migrate the ACS Dataset to the repository. + ## [1.1.1] - 2024-09-11 16:40:10 ### Fixed @@ -35,6 +41,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +[1.2.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.1.1...1.2.0 [1.1.1]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.1.0...1.1.1 [1.1.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.0.0...1.1.0 [1.0.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.0.0...1.0.0 diff --git a/changelog.yaml b/changelog.yaml index d054267..5d72266 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -21,3 +21,8 @@ - Run publish to PyPI GitHub Actions job only on push - Fix changelog GitHub Actions job date: 2024-09-11 16:40:10 +- bump: minor + changes: + added: + - Migrate the ACS Dataset to the repository. + date: 2024-09-13 16:23:45 diff --git a/changelog_entry.yaml b/changelog_entry.yaml index a43c7b1..e69de29 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +0,0 @@ -- bump: minor - changes: - added: - - Migrate the ACS Dataset to the repository. diff --git a/pyproject.toml b/pyproject.toml index 1b4ac41..76b5121 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "policyengine_us_data" -version = "1.1.1" +version = "1.2.0" description = "A package to create representative microdata for the US." readme = "README.md" authors = [ From 040ea97f07575da8b6aa65dce565b55731b810f7 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Sat, 14 Sep 2024 21:19:41 +0400 Subject: [PATCH 09/38] remove extra --- .../tests/test_datasets/test_acs.py | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/policyengine_us_data/tests/test_datasets/test_acs.py b/policyengine_us_data/tests/test_datasets/test_acs.py index c346866..5c0d612 100644 --- a/policyengine_us_data/tests/test_datasets/test_acs.py +++ b/policyengine_us_data/tests/test_datasets/test_acs.py @@ -12,41 +12,3 @@ def test_acs_generates(year: int): dataset = dataset_by_year[year]() dataset.generate() # This will generate the dataset - - -@pytest.mark.parametrize("year", [2022]) -def test_acs_loads(year: int): - from policyengine_us_data.datasets.acs.acs import ACS_2022 - - dataset_by_year = { - 2022: ACS_2022, - } - - dataset = dataset_by_year[year]() - dataset.generate() # Ensure the dataset is generated before loading - - sim = Microsimulation(dataset=dataset) - - assert not sim.calculate("household_net_income").isna().any() - - -@pytest.mark.parametrize("year", [2022]) -def test_acs_has_all_tables(year: int): - from policyengine_us_data.datasets.acs.acs import ACS_2022 - - dataset_by_year = { - 2022: ACS_2022, - } - - dataset = dataset_by_year[year]() - dataset.generate() # Ensure the dataset is generated before checking tables - - TABLES = [ - "person", - "household", - "spm_unit", - ] - - for table in TABLES: - df = dataset.load(table) - assert len(df) > 0 From 2390120366340f73490a6413a9b450e676fa4be9 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Sat, 14 Sep 2024 21:22:28 +0400 Subject: [PATCH 10/38] chagelog --- CHANGELOG.md | 6 ------ changelog.yaml | 7 +------ changelog_entry.yaml | 4 ++++ 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d50467..1af3c8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,6 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [1.2.0] - 2024-09-13 16:23:45 - -### Added - -- Migrate the ACS Dataset to the repository. - ## [1.1.1] - 2024-09-11 16:40:10 ### Fixed diff --git a/changelog.yaml b/changelog.yaml index 5d72266..662e4c5 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -20,9 +20,4 @@ - Added GitHub Actions test job to PR and push - Run publish to PyPI GitHub Actions job only on push - Fix changelog GitHub Actions job - date: 2024-09-11 16:40:10 -- bump: minor - changes: - added: - - Migrate the ACS Dataset to the repository. - date: 2024-09-13 16:23:45 + date: 2024-09-11 16:40:10 \ No newline at end of file diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..a43c7b1 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Migrate the ACS Dataset to the repository. From 9c8ecd517dc78df608c0bd8a3fe0b14106adb7b0 Mon Sep 17 00:00:00 2001 From: "Github Actions[bot]" Date: Sat, 14 Sep 2024 17:22:51 +0000 Subject: [PATCH 11/38] Update PolicyEngine US data --- CHANGELOG.md | 6 ++++++ changelog.yaml | 7 ++++++- changelog_entry.yaml | 4 ---- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1af3c8b..936d35e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.2.0] - 2024-09-14 17:22:50 + +### Added + +- Migrate the ACS Dataset to the repository. + ## [1.1.1] - 2024-09-11 16:40:10 ### Fixed diff --git a/changelog.yaml b/changelog.yaml index 662e4c5..5e54278 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -20,4 +20,9 @@ - Added GitHub Actions test job to PR and push - Run publish to PyPI GitHub Actions job only on push - Fix changelog GitHub Actions job - date: 2024-09-11 16:40:10 \ No newline at end of file + date: 2024-09-11 16:40:10 +- bump: minor + changes: + added: + - Migrate the ACS Dataset to the repository. + date: 2024-09-14 17:22:50 diff --git a/changelog_entry.yaml b/changelog_entry.yaml index a43c7b1..e69de29 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +0,0 @@ -- bump: minor - changes: - added: - - Migrate the ACS Dataset to the repository. From a292087660dfbd8eba01a29511247f61eec2be6e Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Sun, 15 Sep 2024 16:38:48 +0400 Subject: [PATCH 12/38] readme file --- policyengine_us_data/datasets/acs/README.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 policyengine_us_data/datasets/acs/README.md diff --git a/policyengine_us_data/datasets/acs/README.md b/policyengine_us_data/datasets/acs/README.md new file mode 100644 index 0000000..633e04e --- /dev/null +++ b/policyengine_us_data/datasets/acs/README.md @@ -0,0 +1,6 @@ +2022 ACS 1 Year Data Dictionary: +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/data_dict/PUMS_Data_Dictionary_2022.pdf +User Guide: +https://www2.census.gov/programs-surveys/acs/tech_docs/pums/2022ACS_PUMS_User_Guide.pdf +PUMS Documentation: +https://www.census.gov/programs-surveys/acs/microdata/documentation.html From 553f63fb097af2c8a15ebaf36fce04ebd3d7ba0f Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 17 Sep 2024 13:23:29 +0400 Subject: [PATCH 13/38] property tax --- policyengine_us_data/datasets/acs/raw_acs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index 483c4a0..6a6b0f5 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -50,6 +50,7 @@ "VEH", # Number of vehicles "FINCP", # Total income "GRNTP", # Gross rent + "TAXAMT", # Property taxes ] From 96013e92a4afb08614db95d80a4b2280227e4b1f Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 17 Sep 2024 13:24:01 +0400 Subject: [PATCH 14/38] changelog --- changelog_entry.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..9be2475 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Migrate ACS Dataset. From ed627e872a2cc1c2e4744d5c853b7e8dffbaab4c Mon Sep 17 00:00:00 2001 From: "Github Actions[bot]" Date: Tue, 17 Sep 2024 09:24:25 +0000 Subject: [PATCH 15/38] Update PolicyEngine US data --- CHANGELOG.md | 7 +++++++ changelog.yaml | 5 +++++ changelog_entry.yaml | 4 ---- pyproject.toml | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ff7a10..32da5de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.3.0] - 2024-09-17 09:24:23 + +### Added + +- Migrate ACS Dataset. + ## [1.2.0] - 2024-09-12 19:47:01 ### Added @@ -47,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +[1.3.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.2.0...1.3.0 [1.2.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.1.1...1.2.0 [1.1.1]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.1.0...1.1.1 [1.1.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.0.0...1.1.0 diff --git a/changelog.yaml b/changelog.yaml index 0f90ecc..a065b6e 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -30,3 +30,8 @@ changed: - Fixed upload script's use of tqdm date: 2024-09-12 19:47:01 +- bump: minor + changes: + added: + - Migrate ACS Dataset. + date: 2024-09-17 09:24:23 diff --git a/changelog_entry.yaml b/changelog_entry.yaml index 9be2475..e69de29 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +0,0 @@ -- bump: minor - changes: - added: - - Migrate ACS Dataset. diff --git a/pyproject.toml b/pyproject.toml index 76b5121..727e56d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "policyengine_us_data" -version = "1.2.0" +version = "1.3.0" description = "A package to create representative microdata for the US." readme = "README.md" authors = [ From 6d48d19118db8067c770cc701e12ded3757a6442 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 17 Sep 2024 13:27:09 +0400 Subject: [PATCH 16/38] format --- policyengine_us_data/datasets/acs/raw_acs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index 6a6b0f5..9a1ebff 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -50,7 +50,7 @@ "VEH", # Number of vehicles "FINCP", # Total income "GRNTP", # Gross rent - "TAXAMT", # Property taxes + "TAXAMT", # Property taxes ] From 317de2134f69a0613ab653e8c29c63bfb849c1fd Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Tue, 17 Sep 2024 19:26:52 +0400 Subject: [PATCH 17/38] changelog --- changelog_entry.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..8ce036a 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Migrate the ACS from the US repository. From 8914b9e20a71c0d1b361fd9ae2acdb0669533f41 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 12:22:08 +0100 Subject: [PATCH 18/38] Pool 3 CPS years Fixes #66 --- policyengine_us_data/datasets/cps/cps.py | 93 +++++++++++++++++++++--- 1 file changed, 82 insertions(+), 11 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 8a4494f..75140cf 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -29,18 +29,8 @@ def generate(self): # Extrapolate from CPS 2023 cps_2022 = CPS_2023(require=True) - uprating = create_policyengine_uprating_factors_table() arrays = cps_2022.load_dataset() - for variable in uprating.index.unique(): - if variable in arrays: - current_index = uprating[uprating.index == variable][ - self.time_period - ].values[0] - start_index = uprating[uprating.index == variable][ - 2023 - ].values[0] - growth = current_index / start_index - arrays[variable] = arrays[variable] * growth + arrays = uprate_cps_data(arrays, 2023, self.time_period) self.save_dataset(arrays) return @@ -65,6 +55,22 @@ def generate(self): cps.close() +def uprate_cps_data(data, from_period, to_period): + uprating = create_policyengine_uprating_factors_table() + for variable in uprating.index.unique(): + if variable in data: + current_index = uprating[uprating.index == variable][ + to_period + ].values[0] + start_index = uprating[uprating.index == variable][ + from_period + ].values[0] + growth = current_index / start_index + data[variable] = data[variable] * growth + + return data + + def add_rent(cps: h5py.File, person: DataFrame, household: DataFrame): is_renting = household.H_TENURE == 2 AVERAGE_RENT = 1_300 * 12 @@ -570,5 +576,70 @@ class CPS_2024(CPS): url = "release://policyengine/policyengine-us-data/release/cps_2024.h5" +class PooledCPS(Dataset): + data_format = Dataset.TIME_PERIOD_ARRAYS + input_datasets: list + time_period: int + + def generate(self): + data = [ + input_dataset(require=True).load_dataset() + for input_dataset in self.input_datasets + ] + time_periods = [dataset.time_period for dataset in self.input_datasets] + data = [ + uprate_cps_data(data, time_period, self.time_period) + for data, time_period in zip(data, time_periods) + ] + + new_data = {} + + for i in range(len(data)): + for variable in data[i]: + new_data[variable] = {} + for time_period in data[i][variable]: + data_values = data[i][variable][time_period] + if time_period not in new_data[variable]: + new_data[self.time_period] = data_values + elif "_id" in variable: + previous_max = new_data[time_period].max() + new_data[self.time_period] = np.concatenate( + [ + new_data[variable][time_period], + data_values + previous_max, + ] + ) + else: + new_data[self.time_period] = np.concatenate( + [ + new_data[variable][time_period], + data_values, + ] + ) + + for time_period in new_data["household_weight"]: + new_data["household_weight"][time_period] = new_data[ + "household_weight" + ][time_period] / len(self.input_datasets) + + self.save_dataset(new_data) + + +class Pooled_3_Year_CPS_2024(PooledCPS): + label = "CPS 2024 (3-year pooled)" + name = "pooled_3_year_cps_2024" + file_path = STORAGE_FOLDER / "pooled_3_year_cps_2024.h5" + input_datasets = [ + CPS_2021, + CPS_2022, + CPS_2023, + ] + time_period = 2024 + + if __name__ == "__main__": + CPS_2021().generate() + CPS_2022().generate() + CPS_2023().generate() CPS_2024().generate() + Pooled_3_Year_CPS_2024().generate() From 43e3bb78aecb007548af812b33bb7f0573617ebd Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 13:17:39 +0100 Subject: [PATCH 19/38] Upload ECPS result in PRs --- .github/workflows/pull_request.yaml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 6d39d38..fa1aa78 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -60,9 +60,12 @@ jobs: POLICYENGINE_US_DATA_GITHUB_TOKEN: ${{ secrets.POLICYENGINE_US_DATA_GITHUB_TOKEN }} - name: Build datasets run: make data - env: - TEST_LITE: true - name: Run tests run: pytest - name: Test documentation builds - run: make documentation \ No newline at end of file + run: make documentation + - name: Upload ECPS 2024 + uses: actions/upload-artifact@v4 + with: + name: enhanced_cps_2024.h5 + path: policyengine_us_data/storage/enhanced_cps_2024.h5 From 247230ce7b4b328032cfe6e2e457ca79353c25bb Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 13:17:51 +0100 Subject: [PATCH 20/38] Feed into ECPS --- policyengine_us_data/datasets/__init__.py | 1 + policyengine_us_data/datasets/cps/cps.py | 49 +++++++++---------- .../datasets/cps/extended_cps.py | 2 +- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/policyengine_us_data/datasets/__init__.py b/policyengine_us_data/datasets/__init__.py index ba0e5f6..58e64af 100644 --- a/policyengine_us_data/datasets/__init__.py +++ b/policyengine_us_data/datasets/__init__.py @@ -5,6 +5,7 @@ CPS_2022, CPS_2023, CPS_2024, + Pooled_3_Year_CPS_2024, CensusCPS_2018, CensusCPS_2019, CensusCPS_2020, diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 75140cf..7549f99 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -577,7 +577,7 @@ class CPS_2024(CPS): class PooledCPS(Dataset): - data_format = Dataset.TIME_PERIOD_ARRAYS + data_format = Dataset.ARRAYS input_datasets: list time_period: int @@ -596,31 +596,28 @@ def generate(self): for i in range(len(data)): for variable in data[i]: - new_data[variable] = {} - for time_period in data[i][variable]: - data_values = data[i][variable][time_period] - if time_period not in new_data[variable]: - new_data[self.time_period] = data_values - elif "_id" in variable: - previous_max = new_data[time_period].max() - new_data[self.time_period] = np.concatenate( - [ - new_data[variable][time_period], - data_values + previous_max, - ] - ) - else: - new_data[self.time_period] = np.concatenate( - [ - new_data[variable][time_period], - data_values, - ] - ) - - for time_period in new_data["household_weight"]: - new_data["household_weight"][time_period] = new_data[ - "household_weight" - ][time_period] / len(self.input_datasets) + data_values = data[i][variable] + if variable not in new_data: + new_data[variable] = data_values + elif "_id" in variable: + previous_max = new_data[variable].max() + new_data[variable] = np.concatenate( + [ + new_data[variable], + data_values + previous_max, + ] + ) + else: + new_data[variable] = np.concatenate( + [ + new_data[variable], + data_values, + ] + ) + + new_data["household_weight"] = new_data["household_weight"] / len( + self.input_datasets + ) self.save_dataset(new_data) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 970fd6b..80a7989 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -145,7 +145,7 @@ def generate(self): class ExtendedCPS_2024(ExtendedCPS): - cps = CPS_2024 + cps = Pooled_3_Year_CPS_2024 puf = PUF_2024 name = "extended_cps_2024" label = "Extended CPS (2024)" From 84ac3256210e7b338c7df955a2fca9efc3ababe2 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 13:20:31 +0100 Subject: [PATCH 21/38] Bump version and ECPS file --- CHANGELOG.md | 7 +++++++ changelog.yaml | 5 +++++ policyengine_us_data/datasets/cps/cps.py | 1 + policyengine_us_data/datasets/cps/enhanced_cps.py | 4 ++-- pyproject.toml | 2 +- 5 files changed, 16 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f010375..a16fb28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.5.0] - 2024-09-19 13:19:37 + +### Changed + +- Enhanced CPS now uses a 3-year pooled CPS. + ## [1.4.3] - 2024-09-18 20:57:03 ### Changed @@ -93,6 +99,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +[1.5.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.3...1.5.0 [1.4.3]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.2...1.4.3 [1.4.2]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.1...1.4.2 [1.4.1]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.0...1.4.1 diff --git a/changelog.yaml b/changelog.yaml index 5daadf1..c1d0ca3 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -67,3 +67,8 @@ changed: - Fixed CI/CD push script date: 2024-09-18 20:57:03 +- bump: minor + changes: + changed: + - Enhanced CPS now uses a 3-year pooled CPS. + date: 2024-09-19 13:19:37 diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 7549f99..4329eed 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -632,6 +632,7 @@ class Pooled_3_Year_CPS_2024(PooledCPS): CPS_2023, ] time_period = 2024 + url = "release://PolicyEngine/policyengine-us-data/release/pooled_3_year_cps_2024.h5" if __name__ == "__main__": diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 56ef392..cc88a0e 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -115,7 +115,6 @@ class EnhancedCPS(Dataset): input_dataset: Type[Dataset] start_year: int end_year: int - url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024.h5" def generate(self): from policyengine_us import Microsimulation @@ -174,7 +173,8 @@ class EnhancedCPS_2024(EnhancedCPS): end_year = 2024 name = "enhanced_cps_2024" label = "Enhanced CPS 2024" - file_path = STORAGE_FOLDER / "enhanced_cps_2024.h5" + file_path = STORAGE_FOLDER / "enhanced_cps_2024_v_1_5_0.h5" + url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024_v_1_5_0.h5" if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 9a4dec3..622f7f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "policyengine_us_data" -version = "1.4.3" +version = "1.5.0" description = "A package to create representative microdata for the US." readme = "README.md" authors = [ From 0338cb9554382dcb15d04f99f6a1eda71da38850 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Thu, 19 Sep 2024 18:23:05 +0400 Subject: [PATCH 22/38] changelog --- changelog_entry.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog_entry.yaml b/changelog_entry.yaml index 8ce036a..6f5e4c0 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -1,4 +1,4 @@ - bump: minor changes: added: - - Migrate the ACS from the US repository. + - Migrate the ACS from the US-repository. From 42fdd242ac64091ac1e23c4a96adb084f546acc7 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 15:34:19 +0100 Subject: [PATCH 23/38] Move back to old ECPS --- Makefile | 1 + policyengine_us_data/datasets/cps/enhanced_cps.py | 4 ++-- policyengine_us_data/datasets/cps/extended_cps.py | 5 ++++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 43de4c5..cc003af 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,7 @@ documentation: data: python policyengine_us_data/datasets/cps/cps.py + python policyengine_us_data/datasets/cps/extended_cps.py python policyengine_us_data/datasets/cps/enhanced_cps.py clean: diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index cc88a0e..195b46e 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -173,8 +173,8 @@ class EnhancedCPS_2024(EnhancedCPS): end_year = 2024 name = "enhanced_cps_2024" label = "Enhanced CPS 2024" - file_path = STORAGE_FOLDER / "enhanced_cps_2024_v_1_5_0.h5" - url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024_v_1_5_0.h5" + file_path = STORAGE_FOLDER / "enhanced_cps_2024.h5" + url = "release://policyengine/policyengine-us-data/release/enhanced_cps_2024.h5" if __name__ == "__main__": diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 80a7989..69bc6a1 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -145,9 +145,12 @@ def generate(self): class ExtendedCPS_2024(ExtendedCPS): - cps = Pooled_3_Year_CPS_2024 + cps = CPS_2024 puf = PUF_2024 name = "extended_cps_2024" label = "Extended CPS (2024)" file_path = STORAGE_FOLDER / "extended_cps_2024.h5" time_period = 2024 + +if __name__ == "__main__": + ExtendedCPS_2024().generate() From abf512e9c45315d4a44010dc4b155026a0faf886 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Thu, 19 Sep 2024 18:50:17 +0400 Subject: [PATCH 24/38] init --- policyengine_us_data/datasets/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/policyengine_us_data/datasets/__init__.py b/policyengine_us_data/datasets/__init__.py index aaa49f7..c0002cd 100644 --- a/policyengine_us_data/datasets/__init__.py +++ b/policyengine_us_data/datasets/__init__.py @@ -15,5 +15,6 @@ ReweightedCPS_2024, ) from .puf import PUF_2015, PUF_2021, PUF_2024, IRS_PUF_2015 +from .acs import ACS_2022 DATASETS = [CPS_2022, PUF_2021, CPS_2024, EnhancedCPS_2024, ACS_2022] From 33251a905d0d3f92e668e0cdf42fde2828d35979 Mon Sep 17 00:00:00 2001 From: PavelMakarchuk Date: Thu, 19 Sep 2024 19:09:45 +0400 Subject: [PATCH 25/38] storage --- policyengine_us_data/datasets/acs/acs.py | 2 +- policyengine_us_data/datasets/acs/raw_acs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index d10cba2..1833abd 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -2,7 +2,7 @@ from policyengine_core.data import Dataset import h5py from policyengine_us_data.datasets.acs.raw_acs import RawACS -from policyengine_us_data.data_storage import STORAGE_FOLDER +from policyengine_us_data.storage import STORAGE_FOLDER from pandas import DataFrame import numpy as np import os diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/raw_acs.py index 9a1ebff..5fbb0a7 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/raw_acs.py @@ -6,7 +6,7 @@ from policyengine_core.data import Dataset import requests from tqdm import tqdm -from policyengine_us_data.data_storage import STORAGE_FOLDER +from policyengine_us_data.storage import STORAGE_FOLDER logging.getLogger().setLevel(logging.INFO) From 8af92c3023c0ffb85e7f6fe409feb19ccb5186e3 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Thu, 19 Sep 2024 16:37:40 +0100 Subject: [PATCH 26/38] Fix imports --- policyengine_us_data/datasets/cps/extended_cps.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 69bc6a1..93bf55f 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -1,8 +1,8 @@ from policyengine_core.data import Dataset from policyengine_us_data.storage import STORAGE_FOLDER from typing import Type -from .cps import * -from ..puf import * +from policyengine_us_data.datasets.cps.cps import * +from policyengine_us_data.datasets.puf import * import pandas as pd import os @@ -152,5 +152,6 @@ class ExtendedCPS_2024(ExtendedCPS): file_path = STORAGE_FOLDER / "extended_cps_2024.h5" time_period = 2024 + if __name__ == "__main__": ExtendedCPS_2024().generate() From 80be6b95c8c0bb0c01ad4a6c671b02c99cebd3e6 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Sep 2024 10:33:19 +0100 Subject: [PATCH 27/38] Move versioning back --- CHANGELOG.md | 7 ------- changelog.yaml | 5 ----- changelog_entry.yaml | 4 ++++ pyproject.toml | 2 +- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a16fb28..f010375 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,6 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [1.5.0] - 2024-09-19 13:19:37 - -### Changed - -- Enhanced CPS now uses a 3-year pooled CPS. - ## [1.4.3] - 2024-09-18 20:57:03 ### Changed @@ -99,7 +93,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -[1.5.0]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.3...1.5.0 [1.4.3]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.2...1.4.3 [1.4.2]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.1...1.4.2 [1.4.1]: https://github.com/PolicyEngine/policyengine-us-data/compare/1.4.0...1.4.1 diff --git a/changelog.yaml b/changelog.yaml index c1d0ca3..5daadf1 100644 --- a/changelog.yaml +++ b/changelog.yaml @@ -67,8 +67,3 @@ changed: - Fixed CI/CD push script date: 2024-09-18 20:57:03 -- bump: minor - changes: - changed: - - Enhanced CPS now uses a 3-year pooled CPS. - date: 2024-09-19 13:19:37 diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..000a32f 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + changed: + - Enhanced CPS now uses a 3-year pooled CPS. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 622f7f7..9a4dec3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "policyengine_us_data" -version = "1.5.0" +version = "1.4.3" description = "A package to create representative microdata for the US." readme = "README.md" authors = [ From 7edbccc4918486772cfc8cbfdd23826dd6f91620 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Fri, 20 Sep 2024 10:37:15 +0100 Subject: [PATCH 28/38] Add URL for ACS 2022 --- policyengine_us_data/datasets/acs/acs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 1833abd..6d5d743 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -98,3 +98,4 @@ class ACS_2022(ACS): label = "ACS 2022" time_period = 2022 file_path = STORAGE_FOLDER / "acs_2022.h5" + url = "release://PolicyEngine/policyengine-us-data/release/acs_2022.h5" From 95d798017182fa2bb795c9c287a1193f0c7fabba Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 21:47:37 +0100 Subject: [PATCH 29/38] Add QRF rewrite and full imputations --- policyengine_us_data/datasets/acs/__init__.py | 2 +- policyengine_us_data/datasets/acs/acs.py | 103 ++++++++++-------- .../acs/{raw_acs.py => census_acs.py} | 92 +++++----------- policyengine_us_data/datasets/cps/cps.py | 75 ++++++++++--- .../datasets/cps/extended_cps.py | 10 +- policyengine_us_data/utils/__init__.py | 1 + policyengine_us_data/utils/qrf.py | 64 +++++++++++ 7 files changed, 211 insertions(+), 136 deletions(-) rename policyengine_us_data/datasets/acs/{raw_acs.py => census_acs.py} (66%) create mode 100644 policyengine_us_data/utils/qrf.py diff --git a/policyengine_us_data/datasets/acs/__init__.py b/policyengine_us_data/datasets/acs/__init__.py index d44317b..03bd355 100644 --- a/policyengine_us_data/datasets/acs/__init__.py +++ b/policyengine_us_data/datasets/acs/__init__.py @@ -1,2 +1,2 @@ from .acs import * -from .raw_acs import * +from .census_acs import * diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 6d5d743..5acd111 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -1,84 +1,96 @@ import logging from policyengine_core.data import Dataset import h5py -from policyengine_us_data.datasets.acs.raw_acs import RawACS +from policyengine_us_data.datasets.acs.census_acs import CensusACS_2022 from policyengine_us_data.storage import STORAGE_FOLDER from pandas import DataFrame import numpy as np -import os +import pandas as pd class ACS(Dataset): - name = "acs" - label = "ACS" data_format = Dataset.ARRAYS time_period = None - - def __init__(self): - super().__init__() - self.raw_acs = RawACS() + census_acs = None def generate(self) -> None: """Generates the ACS dataset.""" - if self.time_period is None: - raise ValueError("time_period must be set in child classes") - - if os.path.exists(self.file_path): - os.remove(self.file_path) - - if self.time_period not in self.raw_acs.years: - self.raw_acs.generate(self.time_period) - raw_data = self.raw_acs.load(self.time_period) + raw_data = self.census_acs(require=True).load() acs = h5py.File(self.file_path, mode="w") - person, spm_unit, household = [ - raw_data[entity] for entity in ("person", "spm_unit", "household") + person, household = [ + raw_data[entity] for entity in ("person", "household") ] - self.add_id_variables(acs, person, spm_unit, household) - self.add_person_variables(acs, person) - self.add_spm_variables(acs, spm_unit) + self.add_id_variables(acs, person, household) + self.add_person_variables(acs, person, household) self.add_household_variables(acs, household) acs.close() + raw_data.close() @staticmethod def add_id_variables( acs: h5py.File, person: DataFrame, - spm_unit: DataFrame, household: DataFrame, ) -> None: # Create numeric IDs based on SERIALNO - person["numeric_id"] = person["SERIALNO"].astype("category").cat.codes - household["numeric_id"] = ( - household["SERIALNO"].astype("category").cat.codes + h_id_to_number = pd.Series( + np.arange(len(household)), index=household["SERIALNO"] ) - - acs["person_id"] = person["numeric_id"] * 100 + person.SPORDER.astype( - int - ) - acs["person_spm_unit_id"] = person.SPM_ID - acs["spm_unit_id"] = spm_unit.index - acs["tax_unit_id"] = ( - spm_unit.index - ) # Using SPM unit as proxy for tax unit - acs["family_id"] = spm_unit.index # Using SPM unit as proxy for family - acs["person_household_id"] = person["numeric_id"] - acs["person_tax_unit_id"] = person.SPM_ID - acs["person_family_id"] = person.SPM_ID - acs["household_id"] = household["numeric_id"] - acs["person_marital_unit_id"] = person["numeric_id"] - acs["marital_unit_id"] = np.unique(person["numeric_id"]) - acs["person_weight"] = person.PWGTP + household["household_id"] = h_id_to_number[ + household["SERIALNO"] + ].values + person["household_id"] = h_id_to_number[person["SERIALNO"]].values + person["person_id"] = person.index + 1 + + acs["person_id"] = person["person_id"] + acs["household_id"] = household["household_id"] + acs["spm_unit_id"] = acs["household_id"] + acs["tax_unit_id"] = acs["household_id"] + acs["family_id"] = acs["household_id"] + acs["marital_unit_id"] = acs["household_id"] + acs["person_household_id"] = person["household_id"] + acs["person_spm_unit_id"] = person["household_id"] + acs["person_tax_unit_id"] = person["household_id"] + acs["person_family_id"] = person["household_id"] + acs["person_marital_unit_id"] = person["household_id"] acs["household_weight"] = household.WGTP @staticmethod - def add_person_variables(acs: h5py.File, person: DataFrame) -> None: + def add_person_variables( + acs: h5py.File, person: DataFrame, household: DataFrame + ) -> None: acs["age"] = person.AGEP + acs["is_male"] = person.SEX == 1 acs["employment_income"] = person.WAGP acs["self_employment_income"] = person.SEMP - acs["total_income"] = person.PINCP + acs["social_security"] = person.SSP + acs["taxable_private_pension_income"] = person.RETP + person[["rent", "real_estate_taxes"]] = ( + household.set_index("household_id") + .loc[person["household_id"]][["RNTP", "TAXAMT"]] + .values + ) + acs["is_household_head"] = person.SPORDER == 1 + factor = person.SPORDER == 1 + person.rent *= factor * 12 + person.real_estate_taxes *= factor + acs["rent"] = person.rent + acs["real_estate_taxes"] = person.real_estate_taxes + acs["tenure_type"] = ( + household.TEN.astype(int) + .map( + { + 1: "OWNED_WITH_MORTGAGE", + 2: "OWNED_OUTRIGHT", + 3: "RENTED", + } + ) + .fillna("NONE") + .astype("S") + ) @staticmethod def add_spm_variables(acs: h5py.File, spm_unit: DataFrame) -> None: @@ -98,4 +110,5 @@ class ACS_2022(ACS): label = "ACS 2022" time_period = 2022 file_path = STORAGE_FOLDER / "acs_2022.h5" + census_acs = CensusACS_2022 url = "release://PolicyEngine/policyengine-us-data/release/acs_2022.h5" diff --git a/policyengine_us_data/datasets/acs/raw_acs.py b/policyengine_us_data/datasets/acs/census_acs.py similarity index 66% rename from policyengine_us_data/datasets/acs/raw_acs.py rename to policyengine_us_data/datasets/acs/census_acs.py index 5fbb0a7..842af62 100644 --- a/policyengine_us_data/datasets/acs/raw_acs.py +++ b/policyengine_us_data/datasets/acs/census_acs.py @@ -54,51 +54,25 @@ ] -class RawACS(Dataset): - name = "raw_acs" - label = "Raw ACS" +class CensusACS(Dataset): data_format = Dataset.TABLES - years = [] # This will be populated as datasets are generated - file_path = STORAGE_FOLDER / "raw_acs_{year}.h5" - @staticmethod - def file(year: int): - return STORAGE_FOLDER / f"raw_acs_{year}.h5" - - def generate(self, year: int) -> None: - year = int(year) - if year in self.years: - self.remove(year) - - spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{year}_pu.dta" - person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_pus.zip" - household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{year}/1-Year/csv_hus.zip" - - try: - with pd.HDFStore(self.file(year)) as storage: - logging.info(f"Downloading household file") - household = self.process_household_data( - household_url, "psam_hus", HOUSEHOLD_COLUMNS - ) - storage["household"] = household - - logging.info(f"Downloading person file") - person = self.process_person_data( - person_url, "psam_pus", PERSON_COLUMNS - ) - storage["person"] = person - - logging.info(f"Downloading SPM unit file") - spm_person = pd.read_stata(spm_url).fillna(0) - spm_person.columns = spm_person.columns.str.upper() - self.create_spm_unit_table(storage, spm_person) - - self.years.append(year) - logging.info(f"Successfully generated Raw ACS data for {year}") - except Exception as e: - self.remove(year) - logging.error(f"Error generating Raw ACS data for {year}: {e}") - raise e + def generate(self) -> None: + spm_url = f"https://www2.census.gov/programs-surveys/supplemental-poverty-measure/datasets/spm/spm_{self.time_period}_pu.dta" + person_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_pus.zip" + household_url = f"https://www2.census.gov/programs-surveys/acs/data/pums/{self.time_period}/1-Year/csv_hus.zip" + + with pd.HDFStore(self.file_path, mode="w") as storage: + household = self.process_household_data( + household_url, "psam_hus", HOUSEHOLD_COLUMNS + ) + person = self.process_person_data( + person_url, "psam_pus", PERSON_COLUMNS + ) + person = person[person.SERIALNO.isin(household.SERIALNO)] + household = household[household.SERIALNO.isin(person.SERIALNO)] + storage["household"] = household + storage["person"] = person @staticmethod def process_household_data( @@ -113,19 +87,16 @@ def process_household_data( f.write(chunk) f.seek(0) zf = ZipFile(f) - logging.info(f"Loading the first half of the household dataset") a = pd.read_csv( zf.open(prefix + "a.csv"), usecols=columns, dtype={"SERIALNO": str}, ) - logging.info(f"Loading the second half of the household dataset") b = pd.read_csv( zf.open(prefix + "b.csv"), usecols=columns, dtype={"SERIALNO": str}, ) - logging.info(f"Concatenating household datasets") res = pd.concat([a, b]).fillna(0) res.columns = res.columns.str.upper() @@ -147,19 +118,16 @@ def process_person_data( f.write(chunk) f.seek(0) zf = ZipFile(f) - logging.info(f"Loading the first half of the person dataset") a = pd.read_csv( zf.open(prefix + "a.csv"), usecols=columns, dtype={"SERIALNO": str}, ) - logging.info(f"Loading the second half of the person dataset") b = pd.read_csv( zf.open(prefix + "b.csv"), usecols=columns, dtype={"SERIALNO": str}, ) - logging.info(f"Concatenating person datasets") res = pd.concat([a, b]).fillna(0) res.columns = res.columns.str.upper() @@ -208,6 +176,8 @@ def create_spm_unit_table( ) original_person_table = storage["person"] + original_person_table.to_csv("person.csv") + person.to_csv("spm_person.csv") # Ensure SERIALNO is treated as string JOIN_COLUMNS = ["SERIALNO", "SPORDER"] @@ -227,24 +197,12 @@ def create_spm_unit_table( on=JOIN_COLUMNS, ) - storage["person"] = combined_person_table + storage["person_matched"] = combined_person_table storage["spm_unit"] = spm_table - def load(self, year: int) -> dict: - if not self.file(year).exists(): - raise FileNotFoundError( - f"Raw ACS data for {year} not found. Please generate it first." - ) - with pd.HDFStore(self.file(year), mode="r") as store: - return { - "person": store["person"], - "household": store["household"], - "spm_unit": store["spm_unit"], - } - - def remove(self, year: int) -> None: - if self.file(year).exists(): - self.file(year).unlink() - if year in self.years: - self.years.remove(year) +class CensusACS_2022(CensusACS): + label = "Census ACS (2022)" + name = "census_acs_2022.h5" + file_path = STORAGE_FOLDER / "census_acs_2022.h5" + time_period = 2022 diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 8a4494f..7a0f791 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -11,6 +11,7 @@ from policyengine_us_data.utils.uprating import ( create_policyengine_uprating_factors_table, ) +from policyengine_us_data.utils import QRF class CPS(Dataset): @@ -46,7 +47,7 @@ def generate(self): return raw_data = self.raw_cps(require=True).load() - cps = h5py.File(self.file_path, mode="w") + cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") person, tax_unit, family, spm_unit, household = [ @@ -59,20 +60,62 @@ def generate(self): add_previous_year_income(self, cps) add_spm_variables(cps, spm_unit) add_household_variables(cps, household) - add_rent(cps, person, household) + add_rent(self, cps, person, household) raw_data.close() - cps.close() + self.save_dataset(cps) -def add_rent(cps: h5py.File, person: DataFrame, household: DataFrame): - is_renting = household.H_TENURE == 2 - AVERAGE_RENT = 1_300 * 12 - # Project down to the first person in the household - person_is_renting = ( - household.set_index("H_SEQ").loc[person.PH_SEQ].H_TENURE.values == 2 - ) - cps["pre_subsidy_rent"] = np.where(person_is_renting, AVERAGE_RENT, 0) +def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): + cps["tenure_type"] = household.H_TENURE.map( + { + 0: "NONE", + 1: "OWNED_WITH_MORTGAGE", + 2: "RENTED", + 3: "NONE", + } + ).astype("S") + self.save_dataset(cps) + + from policyengine_us_data.datasets.acs.acs import ACS_2022 + from policyengine_us import Microsimulation + + acs = Microsimulation(dataset=ACS_2022) + cps_sim = Microsimulation(dataset=self) + + PREDICTORS = [ + "is_household_head", + "age", + "is_male", + "tenure_type", + "employment_income", + "self_employment_income", + "social_security", + "pension_income", + ] + IMPUTATIONS = ["rent", "real_estate_taxes"] + train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS) + train_df.tenure_type = train_df.tenure_type.map( + { + "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", + }, + na_action="ignore", + ).fillna(train_df.tenure_type) + train_df = train_df[train_df.is_household_head].sample(100_000) + inference_df = cps_sim.calculate_dataframe(PREDICTORS) + mask = inference_df.is_household_head.values + inference_df = inference_df[mask] + + qrf = QRF() + print("Training imputation model for rent and real estate taxes.") + qrf.fit(train_df[PREDICTORS], train_df[IMPUTATIONS]) + print("Imputing rent and real estate taxes.") + imputed_values = qrf.predict(inference_df[PREDICTORS]) + print("Imputation complete.") + cps["rent"] = np.zeros_like(cps["age"]) + cps["rent"][mask] = imputed_values["rent"] + cps["real_estate_taxes"] = np.zeros_like(cps["age"]) + cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] def add_id_variables( @@ -105,7 +148,7 @@ def add_id_variables( cps["spm_unit_id"] = spm_unit.SPM_ID cps["person_household_id"] = person.PH_SEQ cps["person_family_id"] = person.PH_SEQ * 10 + person.PF_SEQ - + cps["is_household_head"] = person.P_SEQ == 1 cps["household_weight"] = household.HSUP_WGT / 1e2 # Marital units @@ -283,14 +326,14 @@ def add_personal_income_variables( # Allocate retirement distributions by taxability. for source_with_taxable_fraction in ["401k", "403b", "sep"]: cps[f"taxable_{source_with_taxable_fraction}_distributions"] = ( - cps[f"{source_with_taxable_fraction}_distributions"][...] + cps[f"{source_with_taxable_fraction}_distributions"] * p[ f"taxable_{source_with_taxable_fraction}_distribution_fraction" ] ) cps[f"tax_exempt_{source_with_taxable_fraction}_distributions"] = cps[ f"{source_with_taxable_fraction}_distributions" - ][...] * ( + ] * ( 1 - p[ f"taxable_{source_with_taxable_fraction}_distribution_fraction" @@ -430,14 +473,14 @@ def add_spm_variables(cps: h5py.File, spm_unit: DataFrame) -> None: cps[openfisca_variable] = spm_unit[asec_variable] cps["reduced_price_school_meals_reported"] = ( - cps["free_school_meals_reported"][...] * 0 + cps["free_school_meals_reported"] * 0 ) def add_household_variables(cps: h5py.File, household: DataFrame) -> None: cps["state_fips"] = household.GESTFIPS cps["county_fips"] = household.GTCO - state_county_fips = cps["state_fips"][...] * 1e3 + cps["county_fips"][...] + state_county_fips = cps["state_fips"] * 1e3 + cps["county_fips"] # Assign is_nyc here instead of as a variable formula so that it shows up # as toggleable in the webapp. # List county FIPS codes for each NYC county/borough. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 970fd6b..e889163 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -5,6 +5,7 @@ from ..puf import * import pandas as pd import os +from policyengine_us_data.utils import QRF # These are sorted by magnitude. # First 15 contain 90%. @@ -80,7 +81,6 @@ class ExtendedCPS(Dataset): def generate(self): from policyengine_us import Microsimulation - from survey_enhance import Imputation cps_sim = Microsimulation(dataset=self.cps) puf_sim = Microsimulation(dataset=self.puf) @@ -100,16 +100,12 @@ def generate(self): X = cps_sim.calculate_dataframe(INPUTS) y = pd.DataFrame(columns=IMPUTED_VARIABLES, index=X.index) - model = Imputation() + model = QRF() model.train( X_train, y_train, - verbose=True, - sample_weight=puf_sim.calculate( - "household_weight", map_to="person" - ).values, ) - y = model.predict(X, verbose=True) + y = model.predict(X) data = cps_sim.dataset.load_dataset() new_data = {} diff --git a/policyengine_us_data/utils/__init__.py b/policyengine_us_data/utils/__init__.py index 1ccbd39..d25c6c2 100644 --- a/policyengine_us_data/utils/__init__.py +++ b/policyengine_us_data/utils/__init__.py @@ -2,3 +2,4 @@ from .soi import * from .uprating import * from .loss import * +from .qrf import * diff --git a/policyengine_us_data/utils/qrf.py b/policyengine_us_data/utils/qrf.py new file mode 100644 index 0000000..30519b9 --- /dev/null +++ b/policyengine_us_data/utils/qrf.py @@ -0,0 +1,64 @@ +from quantile_forest import RandomForestQuantileRegressor +import pandas as pd +import numpy as np +import pickle + + +class QRF: + categorical_columns: list = None + encoded_columns: list = None + output_columns: list = None + + def __init__(self, seed=0, file_path=None): + self.seed = seed + + if file_path is not None: + with open(file_path, "rb") as f: + data = pickle.load(f) + self.seed = data["seed"] + self.categorical_columns = data["categorical_columns"] + self.encoded_columns = data["encoded_columns"] + self.output_columns = data["output_columns"] + self.qrf = data["qrf"] + + def fit(self, X, y, **qrf_kwargs): + self.categorical_columns = X.select_dtypes(include=["object"]).columns + X = pd.get_dummies( + X, columns=self.categorical_columns, drop_first=True + ) + self.encoded_columns = X.columns + self.output_columns = y.columns + self.qrf = RandomForestQuantileRegressor( + random_state=self.seed, **qrf_kwargs + ) + self.qrf.fit(X, y) + + def predict(self, X, count_samples=10, mean_quantile=0.5): + X = pd.get_dummies( + X, columns=self.categorical_columns, drop_first=True + ) + X = X[self.encoded_columns] + pred = self.qrf.predict( + X, quantiles=list(np.linspace(0, 1, count_samples)) + ) + random_generator = np.random.default_rng(self.seed) + a = mean_quantile / (1 - mean_quantile) + input_quantiles = ( + random_generator.beta(a, 1, size=len(X)) * count_samples + ) + input_quantiles = input_quantiles.astype(int) + predictions = pred[np.arange(len(X)), :, input_quantiles] + return pd.DataFrame(predictions, columns=self.output_columns) + + def save(self, path): + with open(path, "wb") as f: + pickle.dump( + { + "seed": self.seed, + "categorical_columns": self.categorical_columns, + "encoded_columns": self.encoded_columns, + "output_columns": self.output_columns, + "qrf": self.qrf, + }, + f, + ) From c35a21c77950783f63b9ed8f85de96dff335b211 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 22:29:20 +0100 Subject: [PATCH 30/38] Add calibration --- policyengine_us_data/datasets/__init__.py | 2 +- policyengine_us_data/datasets/cps/cps.py | 19 ++++++++++++------- policyengine_us_data/utils/loss.py | 7 +++++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/policyengine_us_data/datasets/__init__.py b/policyengine_us_data/datasets/__init__.py index 55cca06..ffde8e4 100644 --- a/policyengine_us_data/datasets/__init__.py +++ b/policyengine_us_data/datasets/__init__.py @@ -5,7 +5,7 @@ CPS_2022, CPS_2023, CPS_2024, - Pooled_3_Year_CPS_2024, + Pooled_3_Year_CPS_2023, CensusCPS_2018, CensusCPS_2019, CensusCPS_2020, diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 53dea31..36a9d07 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -104,6 +104,11 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): print("Imputation complete.") cps["rent"] = np.zeros_like(cps["age"]) cps["rent"][mask] = imputed_values["rent"] + # Assume zero housing assistance since + cps["pre_subsidy_rent"] = cps["rent"] + cps["housing_assistance"] = np.zeros_like( + cps["spm_unit_capped_housing_subsidy_reported"] + ) cps["real_estate_taxes"] = np.zeros_like(cps["age"]) cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] @@ -665,17 +670,17 @@ def generate(self): self.save_dataset(new_data) -class Pooled_3_Year_CPS_2024(PooledCPS): - label = "CPS 2024 (3-year pooled)" - name = "pooled_3_year_cps_2024" - file_path = STORAGE_FOLDER / "pooled_3_year_cps_2024.h5" +class Pooled_3_Year_CPS_2023(PooledCPS): + label = "CPS 2023 (3-year pooled)" + name = "pooled_3_year_cps_2023" + file_path = STORAGE_FOLDER / "pooled_3_year_cps_2023.h5" input_datasets = [ CPS_2021, CPS_2022, CPS_2023, ] - time_period = 2024 - url = "release://PolicyEngine/policyengine-us-data/release/pooled_3_year_cps_2024.h5" + time_period = 2023 + url = "release://PolicyEngine/policyengine-us-data/release/pooled_3_year_cps_2023.h5" if __name__ == "__main__": @@ -683,4 +688,4 @@ class Pooled_3_Year_CPS_2024(PooledCPS): CPS_2022().generate() CPS_2023().generate() CPS_2024().generate() - Pooled_3_Year_CPS_2024().generate() + Pooled_3_Year_CPS_2023().generate() diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index f068011..6f9f30c 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -184,7 +184,7 @@ def build_loss_matrix(dataset: type, time_period): # Medical expenses, sum of spm thresholds # Child support expenses - CPS_DERIVED_TOTALS_2024 = { + HARD_CODED_TOTALS = { "health_insurance_premiums_without_medicare_part_b": 385e9, "other_medical_expenses": 278e9, "medicare_part_b_premiums": 112e9, @@ -198,9 +198,12 @@ def build_loss_matrix(dataset: type, time_period): # Alimony could be targeted via SOI "alimony_income": 13e9, "alimony_expense": 13e9, + # Rough estimate, not CPS derived + "real_estate_taxes": 400e9, # Rough estimate between 350bn and 600bn total property tax collections + "rent": 735e9, # ACS total uprated by CPI } - for variable_name, target in CPS_DERIVED_TOTALS_2024.items(): + for variable_name, target in HARD_CODED_TOTALS.items(): label = f"census/{variable_name}" loss_matrix[label] = sim.calculate( variable_name, map_to="household" From 5a3f94d56758b604d8cd1f507fae738b4e2bb846 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 22:35:46 +0100 Subject: [PATCH 31/38] Shift to branch of US --- policyengine_us_data/datasets/cps/enhanced_cps.py | 6 +++--- policyengine_us_data/datasets/cps/extended_cps.py | 2 +- policyengine_us_data/datasets/puf/puf.py | 13 ++++++------- pyproject.toml | 4 ++-- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 195b46e..39918c5 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -99,13 +99,13 @@ def train_previous_year_income_model(): df = sim.calculate_dataframe(VARIABLES + OUTPUTS, 2019, map_to="person") df_train = df[df.previous_year_income_available] - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF - income_last_year = Imputation() + income_last_year = QRF() X = df_train[VARIABLES[1:]] y = df_train[OUTPUTS] - income_last_year.train(X, y) + income_last_year.fit(X, y) return income_last_year diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 2497ec6..aac6529 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -101,7 +101,7 @@ def generate(self): y = pd.DataFrame(columns=IMPUTED_VARIABLES, index=X.index) model = QRF() - model.train( + model.fit( X_train, y_train, ) diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 9633edb..7d1a048 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -22,14 +22,13 @@ def impute_pension_contributions_to_puf(puf_df): ["employment_income", "household_weight", "pre_tax_contributions"] ) - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF - pension_contributions = Imputation() + pension_contributions = QRF() - pension_contributions.train( + pension_contributions.fit( X=cps_df[["employment_income"]], Y=cps_df[["pre_tax_contributions"]], - sample_weight=cps_df["household_weight"], ) return pension_contributions.predict( X=puf_df[["employment_income"]], @@ -39,7 +38,7 @@ def impute_pension_contributions_to_puf(puf_df): def impute_missing_demographics( puf: pd.DataFrame, demographics: pd.DataFrame ) -> pd.DataFrame: - from survey_enhance import Imputation + from policyengine_us_data.utils import QRF puf_with_demographics = ( puf[puf.RECID.isin(demographics.RECID)] @@ -63,9 +62,9 @@ def impute_missing_demographics( "XTOT", ] - demographics_from_puf = Imputation() + demographics_from_puf = QRF() - demographics_from_puf.train( + demographics_from_puf.fit( X=puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES], Y=puf_with_demographics[DEMOGRAPHIC_VARIABLES], ) diff --git a/pyproject.toml b/pyproject.toml index d0fe62e..4fa24f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,8 +22,8 @@ dependencies = [ dev = [ "black", "pytest", - "policyengine_us==1.71.1", - "survey_enhance", + "policyengine_us @ git+https://github.com/policyengine/policyengine-us@us-rent-data", + "quantile-forest", "torch", "tables", "tabulate", From a23329b91487f855b483d012748870812f19a464 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 22:39:18 +0100 Subject: [PATCH 32/38] Make optional install --- policyengine_us_data/utils/qrf.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/policyengine_us_data/utils/qrf.py b/policyengine_us_data/utils/qrf.py index 30519b9..8754c6a 100644 --- a/policyengine_us_data/utils/qrf.py +++ b/policyengine_us_data/utils/qrf.py @@ -1,4 +1,7 @@ -from quantile_forest import RandomForestQuantileRegressor +try: + from quantile_forest import RandomForestQuantileRegressor +except ImportError: + pass import pandas as pd import numpy as np import pickle From dcda8bde284c626f24790d8bc84cf1b57da7172e Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 22:44:00 +0100 Subject: [PATCH 33/38] Generate ACS before CPS --- Makefile | 1 + policyengine_us_data/datasets/acs/acs.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/Makefile b/Makefile index cc003af..41d5f73 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ documentation: jb clean docs && jb build docs data: + python policyengine_us_data/datasets/cps/acs.py python policyengine_us_data/datasets/cps/cps.py python policyengine_us_data/datasets/cps/extended_cps.py python policyengine_us_data/datasets/cps/enhanced_cps.py diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 5acd111..349d9b6 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -112,3 +112,7 @@ class ACS_2022(ACS): file_path = STORAGE_FOLDER / "acs_2022.h5" census_acs = CensusACS_2022 url = "release://PolicyEngine/policyengine-us-data/release/acs_2022.h5" + + +if __name__ == "__main__": + ACS_2022().generate() From 502d8c9ea6e4021496c6c6be7ad8fdbad9658998 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Sun, 22 Sep 2024 22:47:15 +0100 Subject: [PATCH 34/38] What a silly error --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 41d5f73..e2e1c15 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ documentation: jb clean docs && jb build docs data: - python policyengine_us_data/datasets/cps/acs.py + python policyengine_us_data/datasets/acs/acs.py python policyengine_us_data/datasets/cps/cps.py python policyengine_us_data/datasets/cps/extended_cps.py python policyengine_us_data/datasets/cps/enhanced_cps.py From c8e271057762d508b3a4cea7f2ada73eb9aed09b Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Mon, 23 Sep 2024 10:26:34 +0100 Subject: [PATCH 35/38] Minor improvements --- policyengine_us_data/datasets/cps/cps.py | 5 ++--- policyengine_us_data/datasets/cps/extended_cps.py | 1 - policyengine_us_data/storage/uprating_factors.csv | 3 ++- policyengine_us_data/storage/uprating_growth_factors.csv | 3 ++- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 36a9d07..74097f9 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -29,10 +29,9 @@ def generate(self): if self.raw_cps is None: # Extrapolate from CPS 2023 - cps_2022 = CPS_2023(require=True) - arrays = cps_2022.load_dataset() + cps_2023 = CPS_2023(require=True) + arrays = cps_2023.load_dataset() arrays = uprate_cps_data(arrays, 2023, self.time_period) - self.save_dataset(arrays) return diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index aac6529..3892c0f 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -29,7 +29,6 @@ "qualified_dividend_income", "charitable_cash_donations", "self_employed_pension_contribution_ald", - "real_estate_taxes", "unrecaptured_section_1250_gain", "taxable_unemployment_compensation", "taxable_interest_income", diff --git a/policyengine_us_data/storage/uprating_factors.csv b/policyengine_us_data/storage/uprating_factors.csv index b5374d9..3269bdc 100644 --- a/policyengine_us_data/storage/uprating_factors.csv +++ b/policyengine_us_data/storage/uprating_factors.csv @@ -50,8 +50,9 @@ pre_tax_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1. prior_year_minimum_tax_credit,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 qualified_dividend_income,1.0,1.2,1.269,1.283,1.325,1.376,1.414,1.445,1.483,1.533,1.624,1.714,1.801,1.885,1.966 qualified_tuition_expenses,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 -real_estate_taxes,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 +real_estate_taxes,1.0,1.011,1.083,1.142,1.164,1.161,1.18,1.198,1.219,1.238,1.26,1.282,1.305,1.329,1.353 recapture_of_investment_credit,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 +rent,1.0,1.011,1.083,1.142,1.164,1.161,1.18,1.198,1.219,1.238,1.26,1.282,1.305,1.329,1.353 rental_income,1.0,0.976,0.961,1.017,1.071,1.104,1.13,1.163,1.196,1.228,1.266,1.307,1.348,1.392,1.438 roth_401k_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 roth_ira_contributions,1.0,1.166,1.148,1.215,1.28,1.318,1.35,1.389,1.428,1.467,1.513,1.561,1.611,1.663,1.718 diff --git a/policyengine_us_data/storage/uprating_growth_factors.csv b/policyengine_us_data/storage/uprating_growth_factors.csv index 2829eaa..d020204 100644 --- a/policyengine_us_data/storage/uprating_growth_factors.csv +++ b/policyengine_us_data/storage/uprating_growth_factors.csv @@ -50,8 +50,9 @@ pre_tax_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0. prior_year_minimum_tax_credit,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 qualified_dividend_income,0,0.2,0.057,0.011,0.033,0.038,0.028,0.022,0.026,0.034,0.059,0.055,0.051,0.047,0.043 qualified_tuition_expenses,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 -real_estate_taxes,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 +real_estate_taxes,0,0.011,0.071,0.054,0.019,-0.003,0.016,0.015,0.018,0.016,0.018,0.017,0.018,0.018,0.018 recapture_of_investment_credit,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 +rent,0,0.011,0.071,0.054,0.019,-0.003,0.016,0.015,0.018,0.016,0.018,0.017,0.018,0.018,0.018 rental_income,0,-0.024,-0.015,0.058,0.053,0.031,0.024,0.029,0.028,0.027,0.031,0.032,0.031,0.033,0.033 roth_401k_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 roth_ira_contributions,0,0.166,-0.015,0.058,0.053,0.03,0.024,0.029,0.028,0.027,0.031,0.032,0.032,0.032,0.033 From 7024666cfbd9642e228c8aceed21250b4f0fdc60 Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Mon, 23 Sep 2024 10:30:12 +0100 Subject: [PATCH 36/38] Fix bugs --- policyengine_us_data/datasets/cps/extended_cps.py | 9 +++++++++ policyengine_us_data/datasets/puf/puf.py | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index 3892c0f..6f36f0d 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -6,6 +6,7 @@ import pandas as pd import os from policyengine_us_data.utils import QRF +import time # These are sorted by magnitude. # First 15 contain 90%. @@ -100,11 +101,19 @@ def generate(self): y = pd.DataFrame(columns=IMPUTED_VARIABLES, index=X.index) model = QRF() + start = time.time() model.fit( X_train, y_train, ) + print( + f"Training imputation models from the PUF took {time.time() - start:.2f} seconds" + ) + start = time.time() y = model.predict(X) + print( + f"Predicting imputed values took {time.time() - start:.2f} seconds" + ) data = cps_sim.dataset.load_dataset() new_data = {} diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 7d1a048..e4700a5 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -27,8 +27,8 @@ def impute_pension_contributions_to_puf(puf_df): pension_contributions = QRF() pension_contributions.fit( - X=cps_df[["employment_income"]], - Y=cps_df[["pre_tax_contributions"]], + cps_df[["employment_income"]], + cps_df[["pre_tax_contributions"]], ) return pension_contributions.predict( X=puf_df[["employment_income"]], @@ -65,8 +65,8 @@ def impute_missing_demographics( demographics_from_puf = QRF() demographics_from_puf.fit( - X=puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES], - Y=puf_with_demographics[DEMOGRAPHIC_VARIABLES], + puf_with_demographics[NON_DEMOGRAPHIC_VARIABLES], + puf_with_demographics[DEMOGRAPHIC_VARIABLES], ) puf_without_demographics = puf[ From 54449a205cbc37b75d06172af6ff05487dc4b41e Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Mon, 23 Sep 2024 10:32:19 +0100 Subject: [PATCH 37/38] Adjust QRF to enable single-output predictions --- policyengine_us_data/utils/qrf.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/policyengine_us_data/utils/qrf.py b/policyengine_us_data/utils/qrf.py index 8754c6a..5cd2d65 100644 --- a/policyengine_us_data/utils/qrf.py +++ b/policyengine_us_data/utils/qrf.py @@ -50,7 +50,10 @@ def predict(self, X, count_samples=10, mean_quantile=0.5): random_generator.beta(a, 1, size=len(X)) * count_samples ) input_quantiles = input_quantiles.astype(int) - predictions = pred[np.arange(len(X)), :, input_quantiles] + if len(pred.shape) == 2: + predictions = pred[:, input_quantiles] + else: + predictions = pred[:, :, input_quantiles] return pd.DataFrame(predictions, columns=self.output_columns) def save(self, path): From b67a64f53be9084398b652d1a011c6e80a45401d Mon Sep 17 00:00:00 2001 From: Nikhil Woodruff Date: Mon, 23 Sep 2024 10:41:00 +0100 Subject: [PATCH 38/38] Fix bug in QRF --- policyengine_us_data/utils/qrf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/policyengine_us_data/utils/qrf.py b/policyengine_us_data/utils/qrf.py index 5cd2d65..13a298f 100644 --- a/policyengine_us_data/utils/qrf.py +++ b/policyengine_us_data/utils/qrf.py @@ -51,9 +51,9 @@ def predict(self, X, count_samples=10, mean_quantile=0.5): ) input_quantiles = input_quantiles.astype(int) if len(pred.shape) == 2: - predictions = pred[:, input_quantiles] + predictions = pred[np.arange(len(pred)), input_quantiles] else: - predictions = pred[:, :, input_quantiles] + predictions = pred[np.arange(len(pred)), :, input_quantiles] return pd.DataFrame(predictions, columns=self.output_columns) def save(self, path):