Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ansible/templates/claims_hosp-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
{
"common": {
"export_dir": "./receiving",
"export_dir": "./common/covidcast/receiving/claims_hosp",
"log_exceptions": false
},
"indicator": {
"input_dir": "./retrieve_files",
"start_date": "2020-02-01",
"end_date": null,
"drop_date": null,
"backfill_dir": "/common/backfill/claims_hosp",
"backfill_merge_day": 0,
"n_backfill_days": 70,
"n_waiting_days": 3,
"write_se": false,
Expand Down
116 changes: 116 additions & 0 deletions claims_hosp/delphi_claims_hosp/backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Store backfill data.

Author: Jingjing Tang
Created: 2022-08-03

"""
import os
import glob
from datetime import datetime

# third party
import pandas as pd
from delphi_utils import GeoMapper


from .config import Config

gmpr = GeoMapper()

def store_backfill_file(claims_filepath, _end_date, backfill_dir):
"""
Store county level backfill data into backfill_dir.

Parameter:
claims_filepath: str
path to the aggregated claims data
_end_date: datetime
The most recent date when the raw data is received
backfill_dir: str
specified path to store backfill files.
"""
backfilldata = pd.read_csv(
claims_filepath,
usecols=Config.CLAIMS_DTYPES.keys(),
dtype=Config.CLAIMS_DTYPES,
parse_dates=[Config.CLAIMS_DATE_COL],
)
backfilldata.rename({"ServiceDate": "time_value",
"PatCountyFIPS": "fips",
"Denominator": "den",
"Covid_like": "num"},
axis=1, inplace=True)
backfilldata = gmpr.add_geocode(backfilldata, from_code="fips", new_code="state_id",
from_col="fips", new_col="state_id")
#Store one year's backfill data
_start_date = _end_date.replace(year=_end_date.year-1)
selected_columns = ['time_value', 'fips', 'state_id',
'den', 'num']
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
& (~backfilldata["fips"].isnull()),
selected_columns]
path = backfill_dir + \
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)

def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):
"""
Merge ~4 weeks' backfill data into one file.

Usually this function should merge 28 days' data into a new file so as to
save the reading time when running the backfill pipelines. We set a softer
threshold to allow flexibility in data delivery.
Parameters
----------
today : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_merge_day: int
The day of a week that we used to merge the backfill files. e.g. 0
is Monday.
test_mode: bool
check_nd: int
The criteria of the number of unmerged files. Ideally, we want the
number to be 28, but we use a looser criteria from practical
considerations
"""
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")

def get_date(file_link):
# Keep the function here consistent with the backfill path in
# function `store_backfill_file`
fn = file_link.split("/")[-1].split(".parquet")[0].split("_")[-1]
return datetime.strptime(fn, "%Y%m%d")

date_list = list(map(get_date, new_files))
earliest_date = min(date_list)
latest_date = max(date_list)

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
return

# Start to merge files
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
issue_date = get_date(fn)
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%(
datetime.strftime(earliest_date, "%Y%m%d"),
datetime.strftime(latest_date, "%Y%m%d"))
merged_file.to_parquet(path, index=False)

# Delete daily files once we have the merged one.
if not test_mode:
for fn in new_files:
os.remove(fn)
return
2 changes: 0 additions & 2 deletions claims_hosp/delphi_claims_hosp/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Created: 2020-09-27

"""

# third party
import pandas as pd

Expand Down Expand Up @@ -53,7 +52,6 @@ def load_claims_data(claims_filepath, dropdate, base_geo):

return claims_data


def load_data(input_filepath, dropdate, base_geo):
"""
Load in claims data, and combine them.
Expand Down
7 changes: 7 additions & 0 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
Expand Down Expand Up @@ -89,6 +90,12 @@ def run_module(params):
if params["indicator"]["start_date"] is not None:
startdate = params["indicator"]['start_date']

# Store backfill data
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)

# print out information
logger.info("Loaded params",
startdate = startdate,
Expand Down
2 changes: 2 additions & 0 deletions claims_hosp/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"end_date": null,
"drop_date": null,
"n_backfill_days": 70,
"backfill_dir": "./backfill",
"backfill_merge_day": 0,
"n_waiting_days": 3,
"write_se": false,
"obfuscated_prefix": "foo_obfuscated",
Expand Down
1 change: 1 addition & 0 deletions claims_hosp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
required = [
"numpy",
"pandas",
"pyarrow",
"paramiko",
"pydocstyle",
"pytest",
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
83 changes: 83 additions & 0 deletions claims_hosp/tests/test_backfill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import os
import glob
from datetime import datetime

# third party
import pandas as pd
import pytest

# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file

CONFIG = Config()
CONSTANTS = GeoConstants()
PARAMS = {
"indicator": {
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
"backfill_dir": "./backfill",
"drop_date": "2020-06-11",
}
}
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
backfill_dir = PARAMS["indicator"]["backfill_dir"]

class TestBackfill:

def test_store_backfill_file(self):
dropdate = datetime(2020, 1, 1)
fn = "claims_hosp_as_of_20200101.parquet"
assert fn not in os.listdir(backfill_dir)

# Store backfill file
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
assert fn in os.listdir(backfill_dir)
fn = "claims_hosp_as_of_20200101.parquet"
backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow')

selected_columns = ['time_value', 'fips', 'state_id',
'num', 'den']
assert set(selected_columns) == set(backfill_df.columns)

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)

def test_merge_backfill_file(self):

today = datetime.today()

new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet")
fn = "claims_hosp_from_20200611_to_20200614.parquet"
assert fn not in os.listdir(backfill_dir)

# Check the when the merged file is not generated
today = datetime(2020, 6, 14)
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=8)
assert fn not in os.listdir(backfill_dir)

# Generate the merged file, but not delete it
merge_backfill_file(backfill_dir, today.weekday(), today,
test_mode=True, check_nd=2)
assert fn in os.listdir(backfill_dir)

# Read daily file
pdList = []
for file in new_files:
df = pd.read_parquet(file, engine='pyarrow')
issue_date = datetime.strptime(file[-16:-8], "%Y%m%d")
df["issue_date"] = issue_date
df["lag"] = [(issue_date - x).days for x in df["time_value"]]
pdList.append(df)
expected = pd.concat(pdList).sort_values(["time_value", "fips"])

# Read the merged file
merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow')

assert set(expected.columns) == set(merged.columns)
assert expected.shape[0] == merged.shape[0]
assert expected.shape[1] == merged.shape[1]

os.remove(backfill_dir + "/" + fn)
assert fn not in os.listdir(backfill_dir)