Skip to content

Commit

Permalink
Nhsn indicator (#2080)
Browse files Browse the repository at this point in the history
* first implimentation

* figuring out metric/sensor to use

* first take

* working on test

* added preliminary data source

* adding indicator for gitaction

* lint

* replace with setup.py

* more lint

* fixed date range for test

* lint

* Update DETAILS.md

* fix output data

* analysis in progress

* lint and suggestions

* more analysis

* add hhs geo aggregate

* more analysis

* update DETAILS.md

* Update nhsn/params.json.template

Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>

* Update nhsn/params.json.template

Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>

* cleaning up anaylsis

* rename geo_id column name

* suggested / needed to deploy

* adding default locations for deployment

* fix geo aggregation for hhs

Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>

* Update nhsn/params.json.template

Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>

* lint

* needed to add hhs in to geo for tests

* fixed and added more plots

* cleaning up notebook and adding details

* new signal name

* needed to update the type dict also

---------

Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>
  • Loading branch information
aysim319 and minhkhul authored Dec 10, 2024
1 parent 9f1d4a0 commit 5ff844a
Show file tree
Hide file tree
Showing 25 changed files with 12,899 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
dir: "delphi_quidel_covidtest"
- package: "sir_complainsalot"
dir: "delphi_sir_complainsalot"
- package: "nhsn"
dir: "delphi_nhsn"
defaults:
run:
working-directory: ${{ matrix.package }}
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- TODO: #527 Get this list automatically from python-ci.yml at runtime.
*/

def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp']
def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp', 'nhsn']
def build_package_main = [:]
def build_package_prod = [:]
def deploy_staging = [:]
Expand Down
30 changes: 30 additions & 0 deletions ansible/templates/nhsn-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"common": {
"export_dir": "/common/covidcast/receiving/nhsn",
"backup_dir": "./raw_data_backups",
"log_filename": "/var/log/indicators/nhsn.log",
"log_exceptions": false
},
"indicator": {
"wip_signal": true,
"static_file_dir": "./static",
"socrata_token": "{{ nhsn_token }}"
},
"validation": {
"common": {
"data_source": "nhsn",
"api_credentials": "{{ validation_api_key }}",
"span_length": 15,
"min_expected_lag": {"all": "7"},
"max_expected_lag": {"all": "13"},
"dry_run": true,
"suppressed_errors": []
},
"static": {
"minimum_sample_size": 0,
"missing_se_allowed": true,
"missing_sample_size_allowed": true
},
"dynamic": {}
}
}
4 changes: 4 additions & 0 deletions ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
"nssp": {
"max_age":19,
"maintainers": []
},
"nhsn": {
"max_age":19,
"maintainers": []
}
}
}
3 changes: 3 additions & 0 deletions ansible/vars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ nwss_wastewater_token: "{{ vault_cdc_socrata_token }}"
# nssp
nssp_token: "{{ vault_cdc_socrata_token }}"

# nhsn
nhsn_token: "{{ vault_cdc_socrata_token }}"

# SirCAL
sir_complainsalot_api_key: "{{ vault_sir_complainsalot_api_key }}"
sir_complainsalot_slack_token: "{{ vault_sir_complainsalot_slack_token }}"
Expand Down
30 changes: 30 additions & 0 deletions nhsn/DETAILS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# NHSN data

We import the NHSN Weekly Hospital Respiratory Data

There are 2 sources we grab data from for nhsn:
Note that they are from the same source, but with different cadence and one reporting preliminary data for the previous reporting week

Primary source: https://data.cdc.gov/Public-Health-Surveillance/Weekly-Hospital-Respiratory-Data-HRD-Metrics-by-Ju/ua7e-t2fy/about_data
Secondary (preliminary source): https://data.cdc.gov/Public-Health-Surveillance/Weekly-Hospital-Respiratory-Data-HRD-Metrics-by-Ju/mpgq-jmmr/about_data

## Geographical Levels
* `state`: reported using two-letter postal code
* `national`: just `us` for now
* `hhs`: reporting using Geomapper with state level

## Metrics
* `confirmed_admissions_covid`: total number of confirmed admission for covid
* `confirmed_admissions_flu`: total number of confirmed admission for flu
* `prelim_confirmed_admissions_covid`: total number of confirmed admission for covid from preliminary source
* `prelim_confirmed_admissions_flu`: total number of confirmed admission for flu from preliminary source

## Additional Notes
HHS dataset and NHSN dataset covers the equivalent data of hospital admission for covid and flu.
As a general trend, HHS and NHSN data matches pretty well.
However, there are differences between some of the states, notably for GA (untill 2023), LA, NV, PR (late 2020-early 2021), TN all have HHS substantially lower, HHS is substantially lower than NHSN.

Some states have this spike in NHSN or hhs where the other source doesn't have a spike and spikes don't happen at the same time_values across states

More details regarding the analysis is available in the [analysis.ipynb](notebook%2Fanalysis.ipynb)
(may require installing additional packages to work)
32 changes: 32 additions & 0 deletions nhsn/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.PHONY = venv, lint, test, clean

dir = $(shell find ./delphi_* -name __init__.py | grep -o 'delphi_[_[:alnum:]]*' | head -1)
venv:
python3.8 -m venv env

install: venv
. env/bin/activate; \
pip install wheel ; \
pip install -e ../_delphi_utils_python ;\
pip install -e .

install-ci: venv
. env/bin/activate; \
pip install wheel ; \
pip install ../_delphi_utils_python ;\
pip install .

lint:
. env/bin/activate; pylint $(dir) --rcfile=../pyproject.toml
. env/bin/activate; pydocstyle $(dir)

format:
. env/bin/activate; darker $(dir)

test:
. env/bin/activate ;\
(cd tests && ../env/bin/pytest --cov=$(dir) --cov-report=term-missing)

clean:
rm -rf env
rm -f params.json
13 changes: 13 additions & 0 deletions nhsn/delphi_nhsn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
"""Module to pull and clean indicators from the XXXXX source.
This file defines the functions that are made public by the module. As the
module is intended to be executed though the main method, these are primarily
for testing.
"""

from __future__ import absolute_import

from . import run

__version__ = "0.1.0"
13 changes: 13 additions & 0 deletions nhsn/delphi_nhsn/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
"""Call the function run_module when executed.
This file indicates that calling the module (`python -m MODULE_NAME`) will
call the function `run_module` found within the run.py file. There should be
no need to change this template.
"""

from delphi_utils import read_params

from .run import run_module # pragma: no cover

run_module(read_params()) # pragma: no cover
31 changes: 31 additions & 0 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Registry for signal names."""

GEOS = ["state", "nation", "hhs"]

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"

SIGNALS_MAP = {
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew": float,
"confirmed_admissions_flu_ew": float,
}

# signal mapping for secondary, preliminary source
PRELIM_SIGNALS_MAP = {
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
}
PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew_prelim": float,
"confirmed_admissions_flu_ew_prelim": float,
}
123 changes: 123 additions & 0 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
return df


def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
The output dataset has:
- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
Parameters
----------
socrata_token: str
My App Token for pulling the NHSN data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object
Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="ua7e-t2fy")

keep_columns = list(TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]

df = df[keep_columns]
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
df = df.astype(TYPE_DICT)
else:
df = pd.DataFrame(columns=keep_columns)

return df


def pull_preliminary_nhsn_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
The output dataset has:
- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
Parameters
----------
socrata_token: str
My App Token for pulling the NHSN data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object
Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="mpgq-jmmr")

keep_columns = list(PRELIM_TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in PRELIM_SIGNALS_MAP.items():
df[signal] = df[col_name]

df = df[keep_columns]
df = df.astype(PRELIM_TYPE_DICT)
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
else:
df = pd.DataFrame(columns=keep_columns)

return df
Loading

0 comments on commit 5ff844a

Please sign in to comment.