Skip to content

Commit

Permalink
Refactor DOE Energy Burden and COI to use YAML (#1796)
Browse files Browse the repository at this point in the history
* added tribalId for Supplemental dataset (#1804)

* Setting zoom levels for tribal map (#1810)

* NRI dataset and initial score YAML configuration (#1534)

* update be staging gha

* NRI dataset and initial score YAML configuration

* checkpoint

* adding data checks for release branch

* passing tests

* adding INPUT_EXTRACTED_FILE_NAME to base class

* lint

* columns to keep and tests

* update be staging gha

* checkpoint

* update be staging gha

* NRI dataset and initial score YAML configuration

* checkpoint

* adding data checks for release branch

* passing tests

* adding INPUT_EXTRACTED_FILE_NAME to base class

* lint

* columns to keep and tests

* checkpoint

* PR Review

* renoving source url

* tests

* stop execution of ETL if there's a YAML schema issue

* update be staging gha

* adding source url as class var again

* clean up

* force cache bust

* gha cache bust

* dynamically set score vars from YAML

* docsctrings

* removing last updated year - optional reverse percentile

* passing tests

* sort order

* column ordening

* PR review

* class level vars

* Updating DatasetsConfig

* fix pylint errors

* moving metadata hint back to code

Co-authored-by: lucasmbrown-usds <lucas.m.brown@omb.eop.gov>

* Correct copy typo (#1809)

* Add basic test suite for COI (#1518)

* Update COI to use new yaml (#1518)

* Add tests for DOE energy budren (1518

* Add dataset config for energy budren (1518)

* Refactor ETL to use datasets.yml (#1518)

* Add fake GEOIDs to COI tests (#1518)

* Refactor _setup_etl_instance_and_run_extract to base (#1518)

For the three classes we've done so far, a generic
_setup_etl_instance_and_run_extract will work fine, for the moment we
can reuse the same setup method until we decide future classes need more
flexibility --- but they can also always subclass so...

* Add output-path tests (#1518)

* Update YAML to match constant (#1518)

* Don't blindly set float format (#1518)

* Add defaults for extract (#1518)

* Run YAML load on all subclasses (#1518)

* Update description fields (#1518)

* Update YAML per final format (#1518)

* Update fixture tract IDs (#1518)

* Update base class refactor (#1518)

Now that NRI is final I needed to make a small number of updates to my
refactored code.

* Remove old comment (#1518)

* Fix type signature and return (#1518)

* Update per code review (#1518)

Co-authored-by: Jorge Escobar <83969469+esfoobar-usds@users.noreply.github.com>
Co-authored-by: lucasmbrown-usds <lucas.m.brown@omb.eop.gov>
Co-authored-by: Vim <86254807+vim-usds@users.noreply.github.com>
  • Loading branch information
4 people authored and emma-nechamkin committed Aug 11, 2022
1 parent baa591a commit 97e1754
Show file tree
Hide file tree
Showing 28 changed files with 458 additions and 192 deletions.
102 changes: 55 additions & 47 deletions data/data-pipeline/data_pipeline/etl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class ExtractTransformLoad:
DATA_PATH: pathlib.Path = APP_ROOT / "data"
TMP_PATH: pathlib.Path = DATA_PATH / "tmp"
CONTENT_CONFIG: pathlib.Path = APP_ROOT / "content" / "config"
DATASET_CONFIG: pathlib.Path = APP_ROOT / "etl" / "score" / "config"
DATASET_CONFIG_PATH: pathlib.Path = APP_ROOT / "etl" / "score" / "config"
DATASET_CONFIG: Optional[dict] = None

# Parameters
GEOID_FIELD_NAME: str = "GEOID10"
Expand Down Expand Up @@ -98,48 +99,51 @@ class ExtractTransformLoad:
# It is used on the "load" base class method
output_df: pd.DataFrame = None

def __init_subclass__(cls) -> None:
cls.DATASET_CONFIG = cls.yaml_config_load()

@classmethod
def yaml_config_load(cls) -> dict:
def yaml_config_load(cls) -> Optional[dict]:
"""Generate config dictionary and set instance variables from YAML dataset."""

# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
cls.DATASET_CONFIG / "datasets.yml",
DatasetsConfig,
)

# get the config for this dataset
try:
dataset_config = next(
item
for item in datasets_config.get("datasets")
if item["module_name"] == cls.NAME
if cls.NAME is not None:
# check if the class instance has score YAML definitions
datasets_config = load_yaml_dict_from_file(
cls.DATASET_CONFIG_PATH / "datasets.yml",
DatasetsConfig,
)
except StopIteration:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.error(
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
)
sys.exit()

# set some of the basic fields
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
"input_geoid_tract_field_name"
]

# get the columns to write on the CSV
# and set the constants
cls.COLUMNS_TO_KEEP = [
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
]
for field in dataset_config["load_fields"]:
cls.COLUMNS_TO_KEEP.append(field["long_name"])

# set the constants for the class
setattr(cls, field["df_field_name"], field["long_name"])

# return the config dict
return dataset_config
# get the config for this dataset
try:
dataset_config = next(
item
for item in datasets_config.get("datasets")
if item["module_name"] == cls.NAME
)
except StopIteration:
# Note: it'd be nice to log the name of the dataframe, but that's not accessible in this scope.
logger.error(
f"Exception encountered while extracting dataset config for dataset {cls.NAME}"
)
sys.exit()

# set some of the basic fields
cls.INPUT_GEOID_TRACT_FIELD_NAME = dataset_config[
"input_geoid_tract_field_name"
]

# get the columns to write on the CSV
# and set the constants
cls.COLUMNS_TO_KEEP = [
cls.GEOID_TRACT_FIELD_NAME, # always index with geoid tract id
]
for field in dataset_config["load_fields"]:
cls.COLUMNS_TO_KEEP.append(field["long_name"])
setattr(cls, field["df_field_name"], field["long_name"])

# set the constants for the class
setattr(cls, field["df_field_name"], field["long_name"])
return dataset_config
return None

# This is a classmethod so it can be used by `get_data_frame` without
# needing to create an instance of the class. This is a use case in `etl_score`.
Expand Down Expand Up @@ -176,14 +180,18 @@ def extract(
to get the file from a source url, unzips it and stores it on an
extract_path."""

# this can be accessed via super().extract()
if source_url and extract_path:
unzip_file_from_url(
file_url=source_url,
download_path=self.get_tmp_path(),
unzipped_file_path=extract_path,
verify=verify,
)
if source_url is None:
source_url = self.SOURCE_URL

if extract_path is None:
extract_path = self.get_tmp_path()

unzip_file_from_url(
file_url=source_url,
download_path=self.get_tmp_path(),
unzipped_file_path=extract_path,
verify=verify,
)

def transform(self) -> None:
"""Transform the data extracted into a format that can be consumed by the
Expand Down
52 changes: 52 additions & 0 deletions data/data-pipeline/data_pipeline/etl/score/config/datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,55 @@ datasets:
df_field_name: "CONTAINS_AGRIVALUE"
long_name: "Contains agricultural value"
field_type: bool
- long_name: "Child Opportunity Index 2.0 database"
short_name: "coi"
module_name: "child_opportunity_index"
input_geoid_tract_field_name: "geoid"
load_fields:
- short_name: "he_heat"
df_field_name: "EXTREME_HEAT_FIELD"
long_name: "Summer days above 90F"
field_type: float
include_in_downloadable_files: true
include_in_tiles: true
- short_name: "he_food"
long_name: "Percent low access to healthy food"
df_field_name: "HEALTHY_FOOD_FIELD"
field_type: float
include_in_downloadable_files: true
include_in_tiles: true
- short_name: "he_green"
long_name: "Percent impenetrable surface areas"
df_field_name: "IMPENETRABLE_SURFACES_FIELD"
field_type: float
include_in_downloadable_files: true
include_in_tiles: true
- short_name: "ed_reading"
df_field_name: "READING_FIELD"
long_name: "Third grade reading proficiency"
field_type: float
include_in_downloadable_files: true
include_in_tiles: true
- long_name: "Low-Income Energy Affordabililty Data"
short_name: "LEAD"
module_name: "doe_energy_burden"
input_geoid_tract_field_name: "FIP"
load_fields:
- short_name: "EBP_PFS"
df_field_name: "REVISED_ENERGY_BURDEN_FIELD_NAME"
long_name: "Energy burden"
field_type: float
include_in_downloadable_files: true
include_in_tiles: true
- long_name: "Example ETL"
short_name: "Example"
module_name: "example_dataset"
input_geoid_tract_field_name: "GEOID10_TRACT"
load_fields:
- short_name: "EXAMPLE_FIELD"
df_field_name: "Input Field 1"
long_name: "Example Field 1"
field_type: float
include_in_tiles: true
include_in_downloadable_files: true

Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from pathlib import Path
import pandas as pd

from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.score import field_names
from data_pipeline.utils import get_module_logger, unzip_file_from_url
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger

logger = get_module_logger(__name__)

Expand All @@ -21,15 +20,27 @@ class ChildOpportunityIndex(ExtractTransformLoad):
Full technical documents: https://www.diversitydatakids.org/sites/default/files/2020-02/ddk_coi2.0_technical_documentation_20200212.pdf.
Github repo: https://github.com/diversitydatakids/COI/
"""

# Metadata for the baseclass
NAME = "child_opportunity_index"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT

# Define these for easy code completion
EXTREME_HEAT_FIELD: str
HEALTHY_FOOD_FIELD: str
IMPENETRABLE_SURFACES_FIELD: str
READING_FIELD: str

def __init__(self):
self.COI_FILE_URL = (
self.SOURCE_URL = (
"https://data.diversitydatakids.org/datastore/zip/f16fff12-b1e5-4f60-85d3-"
"3a0ededa30a0?format=csv"
)

# TODO: Decide about nixing this
self.TRACT_INPUT_COLUMN_NAME = self.INPUT_GEOID_TRACT_FIELD_NAME

self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "child_opportunity_index"
)
Expand All @@ -40,31 +51,19 @@ def __init__(self):
self.IMPENETRABLE_SURFACES_INPUT_FIELD = "HE_GREEN"
self.READING_INPUT_FIELD = "ED_READING"

# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
field_names.EXTREME_HEAT_FIELD,
field_names.HEALTHY_FOOD_FIELD,
field_names.IMPENETRABLE_SURFACES_FIELD,
field_names.READING_FIELD,
]

self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame

def extract(self) -> None:
logger.info("Starting 51MB data download.")

unzip_file_from_url(
file_url=self.COI_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "child_opportunity_index",
super().extract(
source_url=self.SOURCE_URL,
extract_path=self.get_tmp_path(),
)

self.raw_df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "child_opportunity_index"
/ "raw.csv",
def transform(self) -> None:
logger.info("Starting transforms.")
raw_df = pd.read_csv(
filepath_or_buffer=self.get_tmp_path() / "raw.csv",
# The following need to remain as strings for all of their digits, not get
# converted to numbers.
dtype={
Expand All @@ -73,16 +72,13 @@ def extract(self) -> None:
low_memory=False,
)

def transform(self) -> None:
logger.info("Starting transforms.")

output_df = self.raw_df.rename(
output_df = raw_df.rename(
columns={
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.EXTREME_HEAT_INPUT_FIELD: field_names.EXTREME_HEAT_FIELD,
self.HEALTHY_FOOD_INPUT_FIELD: field_names.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_INPUT_FIELD: field_names.IMPENETRABLE_SURFACES_FIELD,
self.READING_INPUT_FIELD: field_names.READING_FIELD,
self.EXTREME_HEAT_INPUT_FIELD: self.EXTREME_HEAT_FIELD,
self.HEALTHY_FOOD_INPUT_FIELD: self.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_INPUT_FIELD: self.IMPENETRABLE_SURFACES_FIELD,
self.READING_INPUT_FIELD: self.READING_FIELD,
}
)

Expand All @@ -95,8 +91,8 @@ def transform(self) -> None:

# Convert percents from 0-100 to 0-1 to standardize with our other fields.
percent_fields_to_convert = [
field_names.HEALTHY_FOOD_FIELD,
field_names.IMPENETRABLE_SURFACES_FIELD,
self.HEALTHY_FOOD_FIELD,
self.IMPENETRABLE_SURFACES_FIELD,
]

for percent_field_to_convert in percent_fields_to_convert:
Expand All @@ -105,11 +101,3 @@ def transform(self) -> None:
)

self.output_df = output_df

def load(self) -> None:
logger.info("Saving CSV")

self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,48 @@
import pandas as pd

from data_pipeline.config import settings
from data_pipeline.etl.base import ExtractTransformLoad
from data_pipeline.utils import get_module_logger, unzip_file_from_url
from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger

logger = get_module_logger(__name__)


class DOEEnergyBurden(ExtractTransformLoad):
def __init__(self):
self.DOE_FILE_URL = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/DOE_LEAD_AMI_TRACT_2018_ALL.csv.zip"
)
NAME = "doe_energy_burden"
SOURCE_URL: str = (
settings.AWS_JUSTICE40_DATASOURCES_URL
+ "/DOE_LEAD_AMI_TRACT_2018_ALL.csv.zip"
)
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT

REVISED_ENERGY_BURDEN_FIELD_NAME: str

def __init__(self):
self.OUTPUT_PATH: Path = (
self.DATA_PATH / "dataset" / "doe_energy_burden"
)

self.TRACT_INPUT_COLUMN_NAME = "FIP"
self.INPUT_ENERGY_BURDEN_FIELD_NAME = "BURDEN"
self.REVISED_ENERGY_BURDEN_FIELD_NAME = "Energy burden"

# Constants for output
self.COLUMNS_TO_KEEP = [
self.GEOID_TRACT_FIELD_NAME,
self.REVISED_ENERGY_BURDEN_FIELD_NAME,
]

self.raw_df: pd.DataFrame
self.output_df: pd.DataFrame

def extract(self) -> None:
logger.info("Starting data download.")

unzip_file_from_url(
file_url=self.DOE_FILE_URL,
download_path=self.get_tmp_path(),
unzipped_file_path=self.get_tmp_path() / "doe_energy_burden",
)

self.raw_df = pd.read_csv(
def transform(self) -> None:
logger.info("Starting DOE Energy Burden transforms.")
raw_df: pd.DataFrame = pd.read_csv(
filepath_or_buffer=self.get_tmp_path()
/ "doe_energy_burden"
/ "DOE_LEAD_AMI_TRACT_2018_ALL.csv",
# The following need to remain as strings for all of their digits, not get converted to numbers.
dtype={
self.TRACT_INPUT_COLUMN_NAME: "string",
self.INPUT_GEOID_TRACT_FIELD_NAME: "string",
},
low_memory=False,
)

def transform(self) -> None:
logger.info("Starting transforms.")

output_df = self.raw_df.rename(
logger.info("Renaming columns and ensuring output format is correct")
output_df = raw_df.rename(
columns={
self.INPUT_ENERGY_BURDEN_FIELD_NAME: self.REVISED_ENERGY_BURDEN_FIELD_NAME,
self.TRACT_INPUT_COLUMN_NAME: self.GEOID_TRACT_FIELD_NAME,
self.INPUT_GEOID_TRACT_FIELD_NAME: self.GEOID_TRACT_FIELD_NAME,
}
)

Expand All @@ -75,7 +60,4 @@ def transform(self) -> None:
def load(self) -> None:
logger.info("Saving DOE Energy Burden CSV")

self.OUTPUT_PATH.mkdir(parents=True, exist_ok=True)
self.output_df[self.COLUMNS_TO_KEEP].to_csv(
path_or_buf=self.OUTPUT_PATH / "usa.csv", index=False
)
super().load()
Loading

0 comments on commit 97e1754

Please sign in to comment.