Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add percent_of_expected_deaths signal and dry-run mode to NCHS mortality data pipeline #233

Merged
merged 18 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,11 @@ def update_cache(self):

self._cache_updated = True

def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
def archive_exports(self,
exported_files: Files,
update_cache: bool = True,
update_s3: bool = True
) -> Tuple[Files, Files]:
"""
Handles actual archiving of files to the S3 bucket.

Expand All @@ -301,10 +305,12 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
archive_key = join(self.indicator_prefix, basename(exported_file))

try:
# Update local cache
shutil.copyfile(exported_file, cached_file)
if update_cache:
# Update local cache
shutil.copyfile(exported_file, cached_file)

self.bucket.Object(archive_key).upload_file(exported_file)
if update_s3:
self.bucket.Object(archive_key).upload_file(exported_file)

archive_success.append(exported_file)
except FileNotFoundError:
Expand Down
13 changes: 13 additions & 0 deletions nchs_mortality/DETAILS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ consistency how NCHS reports the data, please refer to [Exceptions](#Exceptions)
* `covid_deaths`: All Deaths with confirmed or presumed COVID-19,
coded to ICD–10 code U07.1
* `total_deaths`: Deaths from all causes.
* `percent_of_expected_deaths`: the number of deaths for all causes for this
week in 2020 compared to the average number
across the same week in 2017–2019.
* `pneumonia_deaths`: Counts of deaths involving Pneumonia, with or without
COVID-19, excluding Influenza deaths(J12.0-J18.9).
* `pneumonia_and_covid_deaths`: Counts of deaths involving COVID-19 and Pneumonia,
Expand All @@ -24,9 +27,12 @@ consistency how NCHS reports the data, please refer to [Exceptions](#Exceptions)
Influenza, or COVID-19, coded to ICD–10
codes U07.1 or J09–J18.9

Detailed descriptions are provided in the notes under Table 1 [here](https://www.cdc.gov/nchs/nvss/vsrr/COVID19/index.htm).

## Metrics, Level 2 (`m2`)
* `num`: number of new deaths on a given week
* `prop`: `num` / population * 100,000
* _**No** `m2` for signal `percent_of_expected_deaths`._

## Exceptions

Expand All @@ -47,3 +53,10 @@ refers to an epiweek). However, NCHS reports their weekly data from Saturday to
Saturday. We assume there is a one day shift. For example, they report a death counts
for Alaska in a week starting from date D, we will report the timestamp of this report
as the corresponding epiweek of date(D + 1).

### Data Versioning
Data versions are tracked on both a daily and weekly level.
On a daily level, we check for updates for NCHS mortality data every weekday as how it is reported by
CDC and stash these daily updates on S3, but not our API.
On a weekly level (on Mondays), we additionally upload the changes to the data
made over the past week (due to backfill) to our API.
2 changes: 1 addition & 1 deletion nchs_mortality/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ the state-level data as-is. For detailed information see the files
steps below to create a MyAppToken.
- Click the `Sign up for an app toekn` buttom in the linked website
- Sign In or Sign Up with Socrata ID
- Clck the `Create New App Token` buttom
- Clck the `Create New App Token` button
- Fill in `Application Name` and `Description` (You can just use NCHS_Mortality
for both) and click `Save`
- Copy the `App Token`
Expand Down
1 change: 1 addition & 0 deletions nchs_mortality/daily_cache/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
1 change: 1 addition & 0 deletions nchs_mortality/daily_receiving/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
36 changes: 22 additions & 14 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pandas as pd
from sodapy import Socrata

def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame, test_mode: str):
"""Pulls the latest NCHS Mortality data, and conforms it into a dataset

The output dataset has:
Expand All @@ -23,24 +23,30 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
My App Token for pulling the NCHS mortality data
map_df: pd.DataFrame
Read from static file "state_pop.csv".
test_mode:str
Check whether to run in a test mode

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Constants
KEEP_COLUMNS = ['covid_deaths', 'total_deaths', 'pneumonia_deaths',
KEEP_COLUMNS = ['covid_deaths', 'total_deaths',
'percent_of_expected_deaths', 'pneumonia_deaths',
'pneumonia_and_covid_deaths', 'influenza_deaths',
'pneumonia_influenza_or_covid_19_deaths']
TYPE_DICT = {key: float for key in KEEP_COLUMNS}
TYPE_DICT["timestamp"] = 'datetime64[ns]'

# Pull data from Socrata API
client = Socrata("data.cdc.gov", token)
results = client.get("r8kw-7aab", limit=10**10)
df = pd.DataFrame.from_records(results).rename(
{"start_week": "timestamp"}, axis=1)
if test_mode == "":
# Pull data from Socrata API
client = Socrata("data.cdc.gov", token)
results = client.get("r8kw-7aab", limit=10**10)
df = pd.DataFrame.from_records(results).rename(
{"start_week": "timestamp"}, axis=1)
else:
df = pd.read_csv("./test_data/%s"%test_mode)

# Check missing start_week == end_week
try:
Expand All @@ -50,7 +56,13 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:
"end_week is not always the same as start_week, check the raw file"
)

df = df.astype(TYPE_DICT)
try:
df = df.astype(TYPE_DICT)
except KeyError:
raise ValueError("Expected column(s) missed, The dataset "
"schema may have changed. Please investigate and "
"amend the code.")

df = df[df["state"] != "United States"]
df.loc[df["state"] == "New York City", "state"] = "New York"

Expand Down Expand Up @@ -79,10 +91,6 @@ def pull_nchs_mortality_data(token: str, map_df: pd.DataFrame) -> pd.DataFrame:

# Add population info
KEEP_COLUMNS.extend(["timestamp", "geo_id", "population"])
try:
df = df.merge(map_df, on="state")[KEEP_COLUMNS]
except KeyError:
raise ValueError("Expected column(s) missed, The dataset "
"schema may have changed. Please investigate and "
"amend the code.")
df = df.merge(map_df, on="state")[KEEP_COLUMNS]

return df
121 changes: 102 additions & 19 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
when the module is run with `python -m MODULE_NAME`.
"""
from datetime import datetime, date, timedelta
from itertools import product
from os.path import join
from os import remove, listdir
from shutil import copy

import numpy as np
import pandas as pd
from delphi_utils import read_params
from delphi_utils import read_params, S3ArchiveDiffer

from .pull import pull_nchs_mortality_data
from .export import export_csv

# global constants
METRICS = [
'covid_deaths', 'total_deaths', 'pneumonia_deaths',
'pneumonia_and_covid_deaths', 'influenza_deaths',
'covid_deaths', 'total_deaths', 'percent_of_expected_deaths',
'pneumonia_deaths', 'pneumonia_and_covid_deaths', 'influenza_deaths',
'pneumonia_influenza_or_covid_19_deaths'
]
SENSORS = [
Expand All @@ -37,27 +38,109 @@ def run_module():
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
export_dir = params["export_dir"]
daily_export_dir = params["daily_export_dir"]
cache_dir = params["cache_dir"]
daily_cache_dir = params["daily_cache_dir"]
static_file_dir = params["static_file_dir"]
token = params["token"]
test_mode = params["mode"]

daily_arch_diff = S3ArchiveDiffer(
daily_cache_dir, daily_export_dir,
params["bucket_name"], "nchs_mortality",
params["aws_credentials"])
daily_arch_diff.update_cache()

map_df = pd.read_csv(
join(static_file_dir, "state_pop.csv"), dtype={"fips": int}
)

df = pull_nchs_mortality_data(token, map_df)
for metric, sensor in product(METRICS, SENSORS):
print(metric, sensor)
if sensor == "num":
df = pull_nchs_mortality_data(token, map_df, test_mode)
for metric in METRICS:
if metric == 'percent_of_expected_deaths':
print(metric)
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
sensor_name = "_".join(["wip", metric])
export_csv(
df,
geo_name=geo_res,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
sensor_name = "_".join(["wip", metric, sensor])
export_csv(
df,
geo_name=geo_res,
export_dir=export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
for sensor in SENSORS:
print(metric, sensor)
if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
sensor_name = "_".join(["wip", metric, sensor])
export_csv(
df,
geo_name=geo_res,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
if datetime.today().weekday() == 0:
# Copy todays raw output to receiving
for output_file in listdir(daily_export_dir):
copy(
join(daily_export_dir, output_file),
join(export_dir, output_file))

weekly_arch_diff = S3ArchiveDiffer(
cache_dir, export_dir,
params["bucket_name"], "nchs_mortality",
params["aws_credentials"])

# Dont update cache from S3 (has daily files), only simulate a update_cache() call
weekly_arch_diff._cache_updated = True

# Diff exports, and make incremental versions
_, common_diffs, new_files = weekly_arch_diff.diff_exports()

# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
to_archive += new_files
_, fails = weekly_arch_diff.archive_exports(to_archive, update_s3=False)

# Filter existing exports to exclude those that failed to archive
succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails}
weekly_arch_diff.filter_exports(succ_common_diffs)

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive (weekly) '{exported_file}'")

# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving

# Diff exports, and make incremental versions
_, common_diffs, new_files = daily_arch_diff.diff_exports()

# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items() if diff is not None]
to_archive += new_files
_, fails = daily_arch_diff.archive_exports(to_archive)

# Daily output not needed anymore, remove them
for exported_file in new_files:
remove(exported_file)
for exported_file, diff_file in common_diffs.items():
remove(exported_file)
remove(diff_file)

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive (daily) '{exported_file}'")
10 changes: 9 additions & 1 deletion nchs_mortality/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
"static_file_dir": "./static",
"export_dir": "./receiving",
"cache_dir": "./cache",
"token": ""
"daily_export_dir": "./daily_receiving",
"daily_cache_dir": "./daily_cache",
"token": "",
"mode":"",
"aws_credentials": {
"aws_access_key_id": "",
"aws_secret_access_key": ""
},
"bucket_name": ""
}
3 changes: 2 additions & 1 deletion nchs_mortality/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"pylint",
"delphi-utils",
"sodapy",
"epiweeks"
"epiweeks",
"freezegun",
]

setup(
Expand Down
Empty file.
32 changes: 28 additions & 4 deletions nchs_mortality/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,42 @@
# -*- coding: utf-8 -*-

from boto3 import Session
from freezegun import freeze_time
from moto import mock_s3
import pytest

from os import listdir, remove
from os.path import join

from delphi_utils import read_params
from delphi_nchs_mortality.run import run_module


@pytest.fixture(scope="session")
def run_as_module():
# Clean receiving directory
@pytest.fixture(scope="function")
def run_as_module(date):
# Clean directories
for fname in listdir("receiving"):
if ".csv" in fname:
remove(join("receiving", fname))

run_module()
for fname in listdir("cache"):
if ".csv" in fname:
remove(join("cache", fname))

for fname in listdir("daily_cache"):
if ".csv" in fname:
remove(join("daily_cache", fname))

for fname in listdir("daily_receiving"):
if ".csv" in fname:
remove(join("daily_receiving", fname))

with mock_s3():
with freeze_time(date):
# Create the fake bucket we will be using
params = read_params()
aws_credentials = params["aws_credentials"]
s3_client = Session(**aws_credentials).client("s3")
s3_client.create_bucket(Bucket=params["bucket_name"])

run_module()
1 change: 1 addition & 0 deletions nchs_mortality/tests/daily_cache/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
1 change: 1 addition & 0 deletions nchs_mortality/tests/daily_receiving/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.csv
10 changes: 9 additions & 1 deletion nchs_mortality/tests/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
"static_file_dir": "../static",
"export_dir": "./receiving",
"cache_dir": "./cache",
"token": ""
"daily_export_dir": "./daily_receiving",
"daily_cache_dir": "./daily_cache",
"token": "",
"mode":"test_data.csv",
"aws_credentials": {
"aws_access_key_id": "FAKE_TEST_ACCESS_KEY_ID",
"aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY"
},
"bucket_name": "test-bucket"
}
Loading