Skip to content

Commit

Permalink
Pipeline tile tests (#1864)
Browse files Browse the repository at this point in the history
* temp update

* updating with fips check

* adding check on pfs

* updating with pfs test

* Update test_tiles_smoketests.py

* Fix lint errors (#1848)

* Add column names test (#1848)

* Mark tests as smoketests (#1848)

* Move to other score-related tests (#1848)

* Recast Total threshold criteria exceeded to int (#1848)

In writing tests to verify the output of the tiles csv matches the final
score CSV, I noticed TC/Total threshold criteria exceeded was getting
cast from an int64 to a float64 in the process of PostScoreETL. I
tracked it down to the line where we merge the score dataframe with
constants.DATA_CENSUS_CSV_FILE_PATH --- there where > 100 tracts in the
national census CSV that don't exist in the score, so those ended up
with a Total threshhold count of np.nan, which is a float, and thereby
cast those columns to float. For the moment I just cast it back.

* No need for low memeory (#1848)

* Add additional tests of tiles.csv (#1848)

* Drop pre-2010 rows before computing score (#1848)

Note this is probably NOT the optimal place for this change; it might
make more sense for each source to filter its own tracts down to the
acceptable tract list. However, that would be a pretty invasive change,
where this is central and plenty of other things are happening in score
transform that could be moved to sources, so for today, here's where the
change will live.

* Fix typo (#1848)

* Switch from filter to inner join (#1848)

* Remove no-op lines from tiles (#1848)

* Apply feedback from review, linter (#1848)

* Check the values oeverything in the frame (#1848)

* Refactor checker class (#1848)

* Add test for state names (#1848)

* cleanup from reviewing my own code (#1848)

* Fix lint error (#1858)

* Apply Emma's feedback from review (#1848)

* Remove refs to national_df (#1848)

* Account for new, fake nullable bools in tiles (#1848)

To handle a geojson limitation, Emma converted some nullable boolean
colunms to float64 in the tiles export with the values {0.0, 1.0, nan},
giving us the same expressiveness. Sadly, this broke my assumption that
all columns between the score and tiles csvs would have the same dtypes,
so I need to account for these new, fake bools in my test.

* Use equals instead of my worse version (#1848)

* Missed a spot where we called _create_score_data (#1848)

* Update per safety (#1848)

Co-authored-by: matt bowen <matthew.r.bowen@omb.eop.gov>
  • Loading branch information
emma-nechamkin and mattbowen-usds authored Sep 1, 2022
1 parent ccd72e2 commit 9c0e199
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 448 deletions.
4 changes: 2 additions & 2 deletions data/data-pipeline/data_pipeline/etl/score/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@
# Geojson cannot support nulls in a boolean column when we create tiles;
# to preserve null character, we coerce to floats for all fields
# that use null to signify missing information in a boolean field.
field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
field_names.AML_BOOLEAN,
field_names.ELIGIBLE_FUDS_BINARY_FIELD_NAME,
field_names.AML_BOOLEAN,
field_names.HISTORIC_REDLINING_SCORE_EXCEEDED
]
28 changes: 25 additions & 3 deletions data/data-pipeline/data_pipeline/etl/score/etl_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self):
self.persistent_poverty_df: pd.DataFrame
self.census_decennial_df: pd.DataFrame
self.census_2010_df: pd.DataFrame
# self.child_opportunity_index_df: pd.DataFrame
self.national_tract_df: pd.DataFrame
self.hrs_df: pd.DataFrame
self.dot_travel_disadvantage_df: pd.DataFrame
self.fsf_flood_df: pd.DataFrame
Expand Down Expand Up @@ -203,6 +203,15 @@ def extract(self) -> None:
low_memory=False,
)

national_tract_csv = constants.DATA_CENSUS_CSV_FILE_PATH
self.national_tract_df = pd.read_csv(
national_tract_csv,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)

def _join_tract_dfs(self, census_tract_dfs: list) -> pd.DataFrame:
logger.info("Joining Census Tract dataframes")

Expand Down Expand Up @@ -370,8 +379,21 @@ def _prepare_initial_df(self) -> pd.DataFrame:

census_tract_df = self._join_tract_dfs(census_tract_dfs)

# If GEOID10s are read as numbers instead of strings, the initial 0 is dropped,
# and then we get too many CBG rows (one for 012345 and one for 12345).
# Drop tracts that don't exist in the 2010 tracts
pre_join_len = census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()

census_tract_df = census_tract_df.merge(
self.national_tract_df,
on="GEOID10_TRACT",
how="inner",
)
assert (
census_tract_df.shape[0] <= pre_join_len
), "Join against national tract list ADDED rows"
logger.info(
"Dropped %s tracts not in the 2010 tract data",
pre_join_len - census_tract_df[field_names.GEOID_TRACT_FIELD].nunique()
)

# Now sanity-check the merged df.
self._census_tract_df_sanity_check(
Expand Down
47 changes: 7 additions & 40 deletions data/data-pipeline/data_pipeline/etl/score/etl_score_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def __init__(self, data_source: str = None):
self.input_counties_df: pd.DataFrame
self.input_states_df: pd.DataFrame
self.input_score_df: pd.DataFrame
self.input_national_tract_df: pd.DataFrame

self.output_score_county_state_merged_df: pd.DataFrame
self.output_score_tiles_df: pd.DataFrame
Expand Down Expand Up @@ -92,7 +91,9 @@ def _extract_states(self, state_path: Path) -> pd.DataFrame:
def _extract_score(self, score_path: Path) -> pd.DataFrame:
logger.info("Reading Score CSV")
df = pd.read_csv(
score_path, dtype={self.GEOID_TRACT_FIELD_NAME: "string"}
score_path,
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
)

# Convert total population to an int
Expand All @@ -102,18 +103,6 @@ def _extract_score(self, score_path: Path) -> pd.DataFrame:

return df

def _extract_national_tract(
self, national_tract_path: Path
) -> pd.DataFrame:
logger.info("Reading national tract file")
return pd.read_csv(
national_tract_path,
names=[self.GEOID_TRACT_FIELD_NAME],
dtype={self.GEOID_TRACT_FIELD_NAME: "string"},
low_memory=False,
header=None,
)

def extract(self) -> None:
logger.info("Starting Extraction")

Expand All @@ -136,9 +125,6 @@ def extract(self) -> None:
self.input_score_df = self._extract_score(
constants.DATA_SCORE_CSV_FULL_FILE_PATH
)
self.input_national_tract_df = self._extract_national_tract(
constants.DATA_CENSUS_CSV_FILE_PATH
)

def _transform_counties(
self, initial_counties_df: pd.DataFrame
Expand Down Expand Up @@ -185,7 +171,6 @@ def _transform_score(self, initial_score_df: pd.DataFrame) -> pd.DataFrame:

def _create_score_data(
self,
national_tract_df: pd.DataFrame,
counties_df: pd.DataFrame,
states_df: pd.DataFrame,
score_df: pd.DataFrame,
Expand Down Expand Up @@ -217,28 +202,11 @@ def _create_score_data(
right_on=self.STATE_CODE_COLUMN,
how="left",
)

# check if there are census tracts without score
logger.info("Removing tract rows without score")

# merge census tracts with score
merged_df = national_tract_df.merge(
score_county_state_merged,
on=self.GEOID_TRACT_FIELD_NAME,
how="left",
)

# recast population to integer
score_county_state_merged["Total population"] = (
merged_df["Total population"].fillna(0).astype(int)
)

de_duplicated_df = merged_df.dropna(
subset=[DISADVANTAGED_COMMUNITIES_FIELD]
)

assert score_county_merged[
self.GEOID_TRACT_FIELD_NAME
].is_unique, "Merging state/county data introduced duplicate rows"
# set the score to the new df
return de_duplicated_df
return score_county_state_merged

def _create_tile_data(
self,
Expand Down Expand Up @@ -427,7 +395,6 @@ def transform(self) -> None:
transformed_score = self._transform_score(self.input_score_df)

output_score_county_state_merged_df = self._create_score_data(
self.input_national_tract_df,
transformed_counties,
transformed_states,
transformed_score,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ def test_transform_score(etl, score_data_initial, score_transformed_expected):
# pylint: disable=too-many-arguments
def test_create_score_data(
etl,
national_tract_df,
counties_transformed_expected,
states_transformed_expected,
score_transformed_expected,
score_data_expected,
):
score_data_actual = etl._create_score_data(
national_tract_df,
counties_transformed_expected,
states_transformed_expected,
score_transformed_expected,
Expand Down
1 change: 1 addition & 0 deletions data/data-pipeline/data_pipeline/tests/score/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ def final_score_df():
return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "full" / "usa.csv",
dtype={field_names.GEOID_TRACT_FIELD: str},
low_memory=False,
)
221 changes: 221 additions & 0 deletions data/data-pipeline/data_pipeline/tests/score/test_tiles_smoketests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
# flake8: noqa: W0613,W0611,F811
from dataclasses import dataclass
from typing import Optional
import pandas as pd
import numpy as np
import pytest
from data_pipeline.config import settings
from data_pipeline.etl.score import constants
from data_pipeline.score import field_names
from data_pipeline.etl.score.constants import (
TILES_SCORE_COLUMNS,
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
)
from .fixtures import final_score_df # pylint: disable=unused-import

pytestmark = pytest.mark.smoketest


@pytest.fixture
def tiles_df(scope="session"):
return pd.read_csv(
settings.APP_ROOT / "data" / "score" / "csv" / "tiles" / "usa.csv",
dtype={"GTF": str},
low_memory=False,
)


PERCENTILE_FIELDS = [
"DF_PFS",
"AF_PFS",
"HDF_PFS",
"DSF_PFS",
"EBF_PFS",
"EALR_PFS",
"EBLR_PFS",
"EPLR_PFS",
"HBF_PFS",
"LLEF_PFS",
"LIF_PFS",
"LMI_PFS",
"MHVF_PFS",
"PM25F_PFS",
"P100_PFS",
"P200_I_PFS",
"P200_PFS",
"LPF_PFS",
"KP_PFS",
"NPL_PFS",
"RMP_PFS",
"TSDF_PFS",
"TF_PFS",
"UF_PFS",
"WF_PFS",
"UST_PFS",
]


def test_percentiles(tiles_df):
for col in PERCENTILE_FIELDS:
assert tiles_df[col].min() >= 0, f"Negative percentile exists for {col}"
assert (
tiles_df[col].max() <= 1
), f"Percentile over 100th exists for {col}"
assert (tiles_df[col].median() >= 0.4) & (
tiles_df[col].median() <= 0.6
), f"Percentile distribution for {col} is decidedly not uniform"
return True


def test_count_of_fips_codes(tiles_df, final_score_df):
final_score_state_count = (
final_score_df[field_names.GEOID_TRACT_FIELD].str[:2].nunique()
)
assert (
tiles_df["GTF"].str[:2].nunique() == final_score_state_count
), "Some states are missing from tiles"
pfs_columns = tiles_df.filter(like="PFS").columns.to_list()
assert (
tiles_df.dropna(how="all", subset=pfs_columns)["GTF"].str[:2].nunique()
== 56
), "Some states do not have any percentile data"


def test_column_presence(tiles_df):
expected_column_names = set(TILES_SCORE_COLUMNS.values()) | {
THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
USER_INTERFACE_EXPERIENCE_FIELD_NAME,
}
actual_column_names = set(tiles_df.columns)
extra_columns = actual_column_names - expected_column_names
missing_columns = expected_column_names - expected_column_names
assert not (
extra_columns
), f"tiles/usa.csv has columns not specified in TILE_SCORE_COLUMNS: {extra_columns}"
assert not (
missing_columns
), f"tiles/usa.csv is missing columns from TILE_SCORE_COLUMNS: {missing_columns}"


def test_tract_equality(tiles_df, final_score_df):
assert tiles_df.shape[0] == final_score_df.shape[0]


@dataclass
class ColumnValueComparison:
final_score_column: pd.Series
tiles_column: pd.Series
col_name: str

@property
def _is_tiles_column_fake_bool(self) -> bool:
if self.tiles_column.dtype == np.dtype("float64"):
fake_bool = {1.0, 0.0, None}
# Replace the nans in the column values with None for
# so we can just use issubset below
col_values = set(
not np.isnan(val) and val or None
for val in self.tiles_column.value_counts(dropna=False).index
)
return len(col_values) <= 3 and col_values.issubset(fake_bool)
return False

@property
def _is_dtype_ok(self) -> bool:
if self.final_score_column.dtype == self.tiles_column.dtype:
return True
if (
self.final_score_column.dtype == np.dtype("O")
and self.tiles_column.dtype == np.dtype("float64")
and self._is_tiles_column_fake_bool
):
return True
return False

def __post_init__(self):
self._is_value_ok = False
if self._is_dtype_ok:
if self._is_tiles_column_fake_bool:
# Cast to actual bool for useful comparison
self.tiles_column = self.tiles_column.apply(
lambda val: bool(val) if not np.isnan(val) else np.nan
)
if self.tiles_column.dtype == np.dtype("float64"):
self._is_value_ok = np.allclose(
self.final_score_column,
self.tiles_column,
atol=float(f"1e-{constants.TILES_ROUND_NUM_DECIMALS}"),
equal_nan=True,
)
else:
self._is_value_ok = self.final_score_column.equals(
self.tiles_column
)

def __bool__(self) -> bool:
return self._is_dtype_ok and bool(self._is_value_ok)

@property
def error_message(self) -> Optional[str]:
if not self._is_dtype_ok:
return (
f"Column {self.col_name} dtype mismatch: "
f"score_df: {self.final_score_column.dtype}, "
f"tile_df: {self.tiles_column.dtype}"
)
if not self._is_value_ok:
return f"Column {self.col_name} value mismatch"
return None


def test_for_column_fidelitiy_from_score(tiles_df, final_score_df):
# Verify the following:
# * Shape and tracts match between score csv and tile csv
# * If you rename score CSV columns, you are able to make the tile csv
# * The dtypes and values of every renamed score column is "equal" to
# every tile column
# * Because tiles use rounded floats, we use close with a tolerance
assert (
set(TILES_SCORE_COLUMNS.values()) - set(tiles_df.columns) == set()
), "Some TILES_SCORE_COLUMNS are missing from the tiles dataframe"

# Keep only the tiles score columns in the final score data
final_score_df = final_score_df.rename(columns=TILES_SCORE_COLUMNS).drop(
final_score_df.columns.difference(TILES_SCORE_COLUMNS.values()),
axis=1,
errors="ignore",
)

# Drop the UI-specific fields from the tiles dataframe
tiles_df = tiles_df.drop(
columns=[
"SF", # State field, added at geoscore
"CF", # County field, added at geoscore,
constants.THRESHOLD_COUNT_TO_SHOW_FIELD_NAME,
constants.USER_INTERFACE_EXPERIENCE_FIELD_NAME,
]
)
errors = []

# Are the dataframes the same shape truly
assert tiles_df.shape == final_score_df.shape
assert tiles_df["GTF"].equals(final_score_df["GTF"])
assert sorted(tiles_df.columns) == sorted(final_score_df.columns)

# Are all the dtypes and values the same?
comparisons = []
for col_name in final_score_df.columns:
value_comparison = ColumnValueComparison(
final_score_df[col_name], tiles_df[col_name], col_name
)
comparisons.append(value_comparison)
errors = [comp for comp in comparisons if not comp]
error_message = "\n".join(error.error_message for error in errors)
assert not errors, error_message


def test_for_state_names(tiles_df):
states = tiles_df["SF"].value_counts(dropna=False).index
assert np.nan not in states
assert states.all()
Loading

0 comments on commit 9c0e199

Please sign in to comment.