Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JHU prop signal #328

Merged
merged 6 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,7 @@ venv.bak/
.retry
.indicators-ansible-vault-pass
indicators-ansible-vault-pass

# testing_utils
testing_utils/cache
testing_utils/*.csv
1 change: 1 addition & 0 deletions _delphi_utils_python/data_proc/geomap/geo_data_proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ def create_fips_population_table():
df_pr = df_pr.groupby("fips").sum().reset_index()
df_pr = df_pr[~df_pr["fips"].isin(census_pop["fips"])]
census_pop_pr = pd.concat([census_pop, df_pr])

census_pop_pr.to_csv(join(OUTPUT_DIR, FIPS_POPULATION_OUT_FILENAME), index=False)


Expand Down
7 changes: 4 additions & 3 deletions _delphi_utils_python/delphi_utils/geomap.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def replace_geocode(
df = df.groupby([date_col, new_col]).sum().reset_index()
return df

def add_population_column(self, geocode_type, data=None, geocode_col=None):
def add_population_column(self, geocode_type, data=None, geocode_col=None, dropna=True):
"""
Appends a population column to a dataframe, based on the FIPS or ZIP code. If no
dataframe is provided, the full crosswalk from geocode to population is returned.
Expand Down Expand Up @@ -428,12 +428,13 @@ def add_population_column(self, geocode_type, data=None, geocode_col=None):
if not is_string_dtype(data[geocode_col]):
data[geocode_col] = data[geocode_col].astype(str).str.zfill(5)

merge_type = "left" if dropna else "inner"
data_with_pop = (
data.copy()
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how="inner")
.merge(pop_df, left_on=geocode_col, right_on=geocode_type, how=merge_type)
.rename(columns={"pop": "population"})
)
data_with_pop["population"] = data_with_pop["population"].astype(int)

return data_with_pop

@staticmethod
Expand Down
6 changes: 6 additions & 0 deletions jhu/delphi_jhu/geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def geo_map(df: pd.DataFrame, geo_res: str):
if geo_res == "county":
df.rename(columns={'fips': 'geo_id'}, inplace=True)
elif geo_res == "state":
df = df.set_index("fips")
# Zero out the state FIPS population to avoid double counting.
state_fips_codes = {str(x).zfill(2) + "000" for x in range(1,73)}
subset_state_fips_codes = set(df.index.values) & state_fips_codes
df.loc[subset_state_fips_codes, "population"] = 0
df = df.reset_index()
df = gmpr.replace_geocode(df, "fips", "state_id", new_col="geo_id", date_col="timestamp")
else:
df = gmpr.replace_geocode(df, "fips", geo_res, new_col="geo_id", date_col="timestamp")
Expand Down
147 changes: 73 additions & 74 deletions jhu/delphi_jhu/pull.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,66 @@
# -*- coding: utf-8 -*-

import re
import pandas as pd
import numpy as np
from delphi_utils import GeoMapper

def detect_date_col(col_name: str):
"""determine if column name is a date"""
date_match = re.match(r'\d{1,2}\/\d{1,2}\/\d{1,2}', col_name)
if date_match:
return True
return False

def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
def download_data(base_url: str, metric: str) -> pd.DataFrame:
"""
Downloads the data from the JHU repo, extracts the UID and the date columns, and
enforces the date datatype on the the time column.
"""
# Read data
df = pd.read_csv(base_url.format(metric=metric))
# Keep the UID and the time series columns only
# The regex filters for columns with the date format MM-DD-YY or M-D-YY
df = df.filter(regex="\d{1,2}\/\d{1,2}\/\d{2}|UID").melt(
id_vars=["UID"], var_name="timestamp", value_name="cumulative_counts"
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df


def create_diffs_column(df: pd.DataFrame) -> pd.DataFrame:
"""
Using the cumulative_counts column from the dataframe, partitions the dataframe
into separate time-series based on fips, and then computes pairwise differences
of the cumulative values to get the incidence values. Boundary cases are handled
by zero-filling the day prior.
"""
# Take time-diffs in each geo_code partition
df = df.set_index(["fips", "timestamp"])
df["new_counts"] = df.groupby(level=0)["cumulative_counts"].diff()
# Fill the NA value for the first date of each partition with the cumulative value that day
# (i.e. pretend the cumulative count the day before was 0)
na_value_mask = df["new_counts"].isna()
df.loc[na_value_mask, "new_counts"] = df.loc[na_value_mask, "cumulative_counts"]
df = df.reset_index()
return df


def sanity_check_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Perform a final set of sanity checks on the data.
"""
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
unique_days = df["timestamp"].unique()

# each FIPS has same number of rows
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
raise ValueError("Differing number of days by fips")

min_timestamp = min(unique_days)
max_timestamp = max(unique_days)
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
if n_days != len(unique_days):
raise ValueError(
f"Not every day between {min_timestamp} and "
"{max_timestamp} is represented."
)


def pull_jhu_data(base_url: str, metric: str, gmpr: GeoMapper) -> pd.DataFrame:
"""Pulls the latest Johns Hopkins CSSE data, and conforms it into a dataset

The output dataset has:
Expand All @@ -28,87 +76,38 @@ def pull_jhu_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFr
may be negative. This is wholly dependent on the quality of the raw
dataset.

We filter the data such that we only keep rows with valid FIPS, or "FIPS"
codes defined under the exceptions of the README. The current exceptions
include:

- 70002: Dukes County and Nantucket County in Massachusetts, which are
reported together
- 70003: Kansas City, Missouri, which reports counts separately from the
four counties it intesects (Platte, Cass, Clay, Jackson Counties)
We filter the data such that we only keep rows with valid FIPS or "FIPS"
codes defined under the exceptions of the README.

Parameters
----------
base_url: str
Base URL for pulling the JHU CSSE data
Base URL for pulling the JHU CSSE data.
metric: str
One of 'confirmed' or 'deaths'.
pop_df: pd.DataFrame
Read from static file "fips_population.csv".
gmpr: GeoMapper
An instance of the geomapping utility.

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
df = download_data(base_url, metric)

# Read data
df = pd.read_csv(base_url.format(metric=metric))

# FIPS are missing for some nonstandard FIPS
date_cols = [col_name for col_name in df.columns if detect_date_col(col_name)]
keep_cols = date_cols + ['UID']
df = df[keep_cols]

df = df.melt(
id_vars=["UID"],
var_name="timestamp",
value_name="cumulative_counts",
gmpr = GeoMapper()
df = gmpr.replace_geocode(
df, "jhu_uid", "fips", from_col="UID", date_col="timestamp"
)
df["timestamp"] = pd.to_datetime(df["timestamp"])

gmpr = GeoMapper()
df = gmpr.replace_geocode(df, "jhu_uid", "fips", from_col="UID", date_col="timestamp")
# Merge in population, set population as NAN for fake fips
df = pd.merge(df, pop_df, on="fips", how='left')

# Add a dummy first row here on day before first day
# code below could be cleaned with groupby.diff

min_ts = min(df["timestamp"])
df_dummy = df.loc[df["timestamp"] == min_ts].copy()
df_dummy.loc[:, "timestamp"] = min_ts - pd.Timedelta(days=1)
df_dummy.loc[:, "cumulative_counts"] = 0
df = pd.concat([df_dummy, df])
# Obtain new_counts
df.sort_values(["fips", "timestamp"], inplace=True)
df["new_counts"] = df["cumulative_counts"].diff() # 1st discrete difference
# Handle edge cases where we diffed across fips
mask = df["fips"] != df["fips"].shift(1)
df.loc[mask, "new_counts"] = np.nan
df.reset_index(inplace=True, drop=True)
df = gmpr.add_population_column("fips", df)

df = create_diffs_column(df)

# Final sanity checks
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
unique_days = df["timestamp"].unique()
# each FIPS has same number of rows
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
raise ValueError("Differing number of days by fips")
min_timestamp = min(unique_days)
max_timestamp = max(unique_days)
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
if n_days != len(unique_days):
raise ValueError(
f"Not every day between {min_timestamp} and "
"{max_timestamp} is represented."
)
return df.loc[
df["timestamp"] >= min_ts,
[ # Reorder
"fips",
"timestamp",
"population",
"new_counts",
"cumulative_counts",
],
]
sanity_check_data(df)

# Reorder columns
df = df[["fips", "timestamp", "population", "new_counts", "cumulative_counts"]]
return df
6 changes: 2 additions & 4 deletions jhu/delphi_jhu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def run_module():
export_start_date = params["export_start_date"]
export_dir = params["export_dir"]
base_url = params["base_url"]
static_file_dir = params["static_file_dir"]
cache_dir = params["cache_dir"]

if len(params["bucket_name"]) > 0:
Expand All @@ -85,9 +84,8 @@ def run_module():
else:
arch_diff = None

pop_df = GeoMapper().add_population_column("fips")

dfs = {metric: pull_jhu_data(base_url, metric, pop_df) for metric in METRICS}
gmpr = GeoMapper()
dfs = {metric: pull_jhu_data(base_url, metric, gmpr) for metric in METRICS}
for metric, geo_res, sensor, smoother in product(
METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTHERS):
print(metric, geo_res, sensor, smoother)
Expand Down
7 changes: 7 additions & 0 deletions jhu/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from os import listdir, remove
from os.path import join
import pandas as pd

from delphi_utils import read_params
from delphi_jhu.run import run_module
Expand All @@ -25,3 +26,9 @@ def run_as_module():
s3_client.create_bucket(Bucket=params["bucket_name"])

run_module()

@pytest.fixture
def jhu_confirmed_test_data():
df = pd.read_csv("test_data/jhu_confirmed.csv", dtype={"fips": str})
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
Loading