Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding first street foundation data #1823

10 changes: 10 additions & 0 deletions data/data-pipeline/data_pipeline/etl/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
"module_dir": "mapping_for_ej",
"class_name": "MappingForEJETL",
},
{
"name": "fsf_flood_risk",
"module_dir": "fsf_flood_risk",
"class_name": "FloodRiskETL",
},
{
"name": "fsf_wildfire_risk",
"module_dir": "fsf_wildfire_risk",
"class_name": "WildfireRiskETL",
},
{
"name": "ejscreen",
"module_dir": "ejscreen",
Expand Down
81 changes: 81 additions & 0 deletions data/data-pipeline/data_pipeline/etl/score/config/datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,84 @@ datasets:
include_in_tiles: true
include_in_downloadable_files: true

- long_name: "First Street Foundation Flood Risk"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOVE you're doing YAML like a baws.

short_name: "FSF Flood Risk"
module_name: fsf_flood_risk
input_geoid_tract_field_name: "GEOID"
load_fields:
- short_name: "flood_eligible_properties"
df_field_name: "COUNT_PROPERTIES"
long_name: "Count of properties eligible for flood risk calculation within tract (floor of 250)"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "flood_risk_properties_today"
df_field_name: "PROPERTIES_AT_RISK_FROM_FLOODING_TODAY"
long_name: "Count of properties at risk of flood today"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "flood_risk_properties_30yrs"
df_field_name: "PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS"
long_name: "Count of properties at risk of flood in 30 years"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "flood_risk_share_today"
df_field_name: "SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_TODAY"
long_name: "Share of properties at risk of flood today"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: true
- short_name: "flood_risk_share_30yrs"
df_field_name: "SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS"
long_name: "Share of properties at risk of flood in 30 years"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: true

- long_name: "First Street Foundation Wildfire Risk"
short_name: "FSF Wildfire Risk"
module_name: fsf_wildfire_risk
input_geoid_tract_field_name: "GEOID"
load_fields:
- short_name: "fire_eligible_properties"
df_field_name: "COUNT_PROPERTIES"
long_name: "Count of properties eligible for wildfire risk calculation within tract (floor of 250)"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "fire_risk_properties_today"
df_field_name: "PROPERTIES_AT_RISK_FROM_FIRE_TODAY"
long_name: "Count of properties at risk of wildfire today"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "fire_risk_properties_30yrs"
df_field_name: "PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS"
long_name: "Count of properties at risk of wildfire in 30 years"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: false
- short_name: "fire_risk_share_today"
df_field_name: "SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_TODAY"
long_name: "Share of properties at risk of fire today"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: true
- short_name: "fire_risk_share_30yrs"
df_field_name: "SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS"
long_name: "Share of properties at risk of fire in 30 years"
field_type: float
include_in_tiles: false
include_in_downloadable_files: true
create_percentile: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# FSF flood risk data

Flood risk computed as 1 in 100 year flood zone
Empty file.
93 changes: 93 additions & 0 deletions data/data-pipeline/data_pipeline/etl/sources/fsf_flood_risk/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# pylint: disable=unsubscriptable-object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note (namely to @lucasmbrown-usds) -- these two datasets get munged basically identically. We could move the work into a separate util function for transform, but the yaml file means that we wouldn't really save any lines of code. LMK if you think this explicit work is the wrong call.

# pylint: disable=unsupported-assignment-operation

import pandas as pd
from data_pipeline.config import settings

from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger

logger = get_module_logger(__name__)


class FloodRiskETL(ExtractTransformLoad):
"""ETL class for the First Street Foundation flood risk dataset"""

NAME = "fsf_flood_risk"
SOURCE_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/fsf_flood.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT

# Output score variables (values set on datasets.yml) for linting purposes
COUNT_PROPERTIES: str
PROPERTIES_AT_RISK_FROM_FLOODING_TODAY: str
PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS: str
SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_TODAY: str
SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS: str

def __init__(self):
# define the full path for the input CSV file
self.INPUT_CSV = (
self.get_tmp_path() / "fsf_flood" / "flood_tract_2010.csv"
)

# this is the main dataframe
self.df: pd.DataFrame

# Start dataset-specific vars here
self.COUNT_PROPERTIES_NATIVE_FIELD_NAME = "count_properties"
self.COUNT_PROPERTIES_AT_RISK_TODAY = "mid_depth_100_year00"
self.COUNT_PROPERTIES_AT_RISK_30_YEARS = "mid_depth_100_year30"
self.CLIP_PROPERTIES_COUNT = 250

def transform(self) -> None:
"""Reads the unzipped data file into memory and applies the following
transformations to prepare it for the load() method:

- Renames the Census Tract column to match the other datasets
- Calculates share of properties at risk, left-clipping number of properties at 250
"""
logger.info("Transforming National Risk Index Data")

logger.info(self.COLUMNS_TO_KEEP)
# read in the unzipped csv data source then rename the
# Census Tract column for merging
df_fsf_flood_disagg: pd.DataFrame = pd.read_csv(
self.INPUT_CSV,
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: str},
low_memory=False,
)

df_fsf_flood_disagg[self.GEOID_TRACT_FIELD_NAME] = df_fsf_flood_disagg[
self.INPUT_GEOID_TRACT_FIELD_NAME
].str.zfill(11)

# Because we have some tracts that are listed twice, we aggregate based on
# GEOID10_TRACT. Note that I haven't confirmed this with the FSF boys -- to do!
df_fsf_flood = (
df_fsf_flood_disagg.groupby(self.GEOID_TRACT_FIELD_NAME)
.sum()
.reset_index()
)

df_fsf_flood[self.COUNT_PROPERTIES] = df_fsf_flood[
self.COUNT_PROPERTIES_NATIVE_FIELD_NAME
].clip(lower=self.CLIP_PROPERTIES_COUNT)

df_fsf_flood[self.SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_TODAY] = (
df_fsf_flood[self.COUNT_PROPERTIES_AT_RISK_TODAY]
/ df_fsf_flood[self.COUNT_PROPERTIES]
)
df_fsf_flood[
self.SHARE_OF_PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS
] = (
df_fsf_flood[self.COUNT_PROPERTIES_AT_RISK_30_YEARS]
/ df_fsf_flood[self.COUNT_PROPERTIES]
)

# Assign the final df to the class' output_df for the load method with rename
self.output_df = df_fsf_flood.rename(
columns={
self.COUNT_PROPERTIES_AT_RISK_TODAY: self.PROPERTIES_AT_RISK_FROM_FLOODING_TODAY,
self.COUNT_PROPERTIES_AT_RISK_30_YEARS: self.PROPERTIES_AT_RISK_FROM_FLOODING_IN_30_YEARS,
}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# FSF wildfire risk data

Fire risk computed as >= 0.003 burn risk probability
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# pylint: disable=unsubscriptable-object
# pylint: disable=unsupported-assignment-operation

import pandas as pd
from data_pipeline.config import settings

from data_pipeline.etl.base import ExtractTransformLoad, ValidGeoLevel
from data_pipeline.utils import get_module_logger

logger = get_module_logger(__name__)


class WildfireRiskETL(ExtractTransformLoad):
"""ETL class for the First Street Foundation wildfire risk dataset"""

NAME = "fsf_wildfire_risk"
SOURCE_URL = settings.AWS_JUSTICE40_DATASOURCES_URL + "/fsf_fire.zip"
GEO_LEVEL = ValidGeoLevel.CENSUS_TRACT

# Output score variables (values set on datasets.yml) for linting purposes
COUNT_PROPERTIES: str
PROPERTIES_AT_RISK_FROM_FIRE_TODAY: str
PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS: str
SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_TODAY: str
SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS: str

def __init__(self):
# define the full path for the input CSV file
self.INPUT_CSV = (
self.get_tmp_path() / "fsf_fire" / "fire_tract_2010.csv"
)

# this is the main dataframe
self.df: pd.DataFrame

# Start dataset-specific vars here
self.COUNT_PROPERTIES_NATIVE_FIELD_NAME = "count_properties"
self.COUNT_PROPERTIES_AT_RISK_TODAY = "burnprob_year00_flag"
self.COUNT_PROPERTIES_AT_RISK_30_YEARS = "burnprob_year30_flag"
self.CLIP_PROPERTIES_COUNT = 250

def transform(self) -> None:
"""Reads the unzipped data file into memory and applies the following
transformations to prepare it for the load() method:

- Renames the Census Tract column to match the other datasets
- Calculates share of properties at risk, left-clipping number of properties at 250
"""
logger.info("Transforming National Risk Index Data")

logger.info(self.COLUMNS_TO_KEEP)
# read in the unzipped csv data source then rename the
# Census Tract column for merging
df_fsf_fire_disagg: pd.DataFrame = pd.read_csv(
self.INPUT_CSV,
dtype={self.INPUT_GEOID_TRACT_FIELD_NAME: str},
low_memory=False,
)

df_fsf_fire_disagg[self.GEOID_TRACT_FIELD_NAME] = df_fsf_fire_disagg[
self.INPUT_GEOID_TRACT_FIELD_NAME
].str.zfill(11)

# Because we have some tracts that are listed twice, we aggregate based on
# GEOID10_TRACT. Note that I haven't confirmed this with the FSF boys -- to do!
df_fsf_fire = (
df_fsf_fire_disagg.groupby(self.GEOID_TRACT_FIELD_NAME)
.sum()
.reset_index()
)

df_fsf_fire[self.COUNT_PROPERTIES] = df_fsf_fire[
self.COUNT_PROPERTIES_NATIVE_FIELD_NAME
].clip(lower=self.CLIP_PROPERTIES_COUNT)

df_fsf_fire[self.SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_TODAY] = (
df_fsf_fire[self.COUNT_PROPERTIES_AT_RISK_TODAY]
/ df_fsf_fire[self.COUNT_PROPERTIES]
)
df_fsf_fire[self.SHARE_OF_PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS] = (
df_fsf_fire[self.COUNT_PROPERTIES_AT_RISK_30_YEARS]
/ df_fsf_fire[self.COUNT_PROPERTIES]
)

# Assign the final df to the class' output_df for the load method with rename
self.output_df = df_fsf_fire.rename(
columns={
self.COUNT_PROPERTIES_AT_RISK_TODAY: self.PROPERTIES_AT_RISK_FROM_FIRE_TODAY,
self.COUNT_PROPERTIES_AT_RISK_30_YEARS: self.PROPERTIES_AT_RISK_FROM_FIRE_IN_30_YEARS,
}
)