Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate quidel patch #2077

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
105 changes: 81 additions & 24 deletions quidel_covidtest/delphi_quidel_covidtest/backfill.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# -*- coding: utf-8 -*-
"""Store backfill data."""
import calendar
import os
import glob
from datetime import datetime
import re
import shutil
from datetime import datetime, timedelta
from typing import Union

import pandas as pd

Expand All @@ -11,7 +15,7 @@

gmpr = GeoMapper()

def store_backfill_file(df, _end_date, backfill_dir):
def store_backfill_file(df, _end_date, backfill_dir, logger):
"""
Store county level backfill data into backfill_dir.

Expand Down Expand Up @@ -59,6 +63,7 @@ def store_backfill_file(df, _end_date, backfill_dir):
'num_age_0_17', 'den_age_0_17']
backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date,
selected_columns]
logger.info("Filtering source data", startdate=_start_date, enddate=_end_date)
backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]]
backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d")
backfilldata["issue_date"] = datetime.strftime(_end_date, "%Y-%m-%d")
Expand All @@ -70,37 +75,88 @@ def store_backfill_file(df, _end_date, backfill_dir):
"state_id": "string"
})

path = backfill_dir + \
"/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
filename = "quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
path = f"{backfill_dir}/{filename}"
# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)
try:
backfilldata.to_parquet(path, index=False)
logger.info("Stored source data in parquet", filename=filename)
except Exception: # pylint: disable=W0703
logger.info("Failed to store source data in parquet")
return path

def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):

def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger):
"""
Merge ~4 weeks' backfill data into one file.
Merge existing backfill with the patch data included. This function is specifically run for patching.

Usually this function should merge 28 days' data into a new file so as to
save the reading time when running the backfill pipelines. We set a softer
threshold to allow flexibility in data delivery.
When the indicator fails for some reason or another, there's a gap in the backfill files.
The patch to fill in the missing dates happens later down the line when the backfill files are already merged.
This function takes the merged files with the missing date, insert the particular date, and merge back the file.
Parameters
----------
issue_date : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_file : str
specific file add to merged backfill file.
"""
new_files = glob.glob(backfill_dir + "/quidel_covidtest_*")

def get_file_with_date(files) -> Union[str, None]:
# pylint: disable=R1716
for filename in files:
# need to only match files with 6 digits for merged files
pattern = re.findall(r"_(\d{6,6})\.parquet", filename)
if pattern:
file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1)
end_date = (file_month + timedelta(days=32)).replace(day=1)
if issue_date >= file_month and issue_date < end_date:
return filename
# pylint: enable=R1716
return ""

file_name = get_file_with_date(new_files)

if len(file_name) == 0:
logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d"))
return

logger.info("Adding missing date to merged file", issue_date=issue_date,
filename=backfill_file, merged_filename=file_name)

# Start to merge files
merge_file = f"{file_name.split('.')[0]}_after_merge.parquet"
try:
shutil.copyfile(file_name, merge_file)
existing_df = pd.read_parquet(merge_file, engine="pyarrow")
df = pd.read_parquet(backfill_file, engine="pyarrow")
merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"])
merged_df.to_parquet(merge_file, index=False)
os.remove(file_name)
os.rename(merge_file, file_name)
except Exception as e: # pylint: disable=W0703
os.remove(merge_file)
logger.error(e)
return

def merge_backfill_file(backfill_dir, today, logger, test_mode=False):
"""
Merge month's backfill data into one file.

Parameters
----------
today : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_merge_day: int
The day of a week that we used to merge the backfill files. e.g. 0
is Monday.
test_mode: bool
check_nd: int
The criteria of the number of unmerged files. Ideally, we want the
number to be 28, but we use a looser criteria from practical
considerations
"""
new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*")
previous_month = (today.replace(day=1) - timedelta(days=1)).strftime("%Y%m")
new_files = glob.glob(backfill_dir + f"/quidel_covidtest_as_of_{previous_month}*")
if len(new_files) == 0: # if no any daily file is stored
logger.info("No new files to merge; skipping merging")
return

def get_date(file_link):
Expand All @@ -110,23 +166,24 @@ def get_date(file_link):
return datetime.strptime(fn, "%Y%m%d")

date_list = list(map(get_date, new_files))
earliest_date = min(date_list)
latest_date = max(date_list)

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
date_list = list(map(get_date, new_files))
num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1]
if len(date_list) < num_of_days_in_month:
logger.info("Not enough days, skipping merging", n_file_days=len(date_list))
return

# Start to merge files
logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1])
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
pdList.append(df)
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%(
datetime.strftime(earliest_date, "%Y%m%d"),
datetime.strftime(latest_date, "%Y%m%d"))
path = backfill_dir + f"/quidel_covidtest_{datetime.strftime(latest_date, '%Y%m')}.parquet"
merged_file.to_parquet(path, index=False)

# Delete daily files once we have the merged one.
Expand Down
4 changes: 4 additions & 0 deletions quidel_covidtest/delphi_quidel_covidtest/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Registry for constants."""
from datetime import datetime

# global constants
MIN_OBS = 50 # minimum number of observations in order to compute a proportion.
POOL_DAYS = 7 # number of days in the past (including today) to pool over
Expand Down Expand Up @@ -49,3 +51,5 @@
"age_65plus",
"age_0_17",
]

FULL_BKFILL_START_DATE = datetime(2020, 5, 26)
76 changes: 76 additions & 0 deletions quidel_covidtest/delphi_quidel_covidtest/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
This module is used for patching data in the delphi_doctor_visits package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/doctor_visits/AprilPatch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/quidel_covidtest/actual_data_file.csv
"""
from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .run import run_module
from .constants import END_FROM_TODAY_MINUS

def patch():
"""
Run the quidel_covidtest indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
params = read_params()
logger = get_structured_logger("delphi_quidel_covidtest.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=start_issue.strftime("%Y-%m-%d"),
end_issue=end_issue.strftime("%Y-%m-%d"),
)
makedirs(params["patch"]["patch_dir"], exist_ok=True)
export_day_range = params["indicator"]["export_day_range"]

current_issue = start_issue

export_day_range -= END_FROM_TODAY_MINUS

while current_issue <= end_issue:
logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d"))

current_issue_yyyymmdd = current_issue.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""
calculated_start_date = current_issue - timedelta(export_day_range)
calculated_end_date = current_issue
params["indicator"]["pull_start_date"] = calculated_start_date.strftime("%Y-%m-%d")
params["indicator"]["pull_end_date"] = calculated_end_date.strftime("%Y-%m-%d")

run_module(params, logger)
current_issue += timedelta(days=1)

if __name__ == "__main__":
patch()
53 changes: 25 additions & 28 deletions quidel_covidtest/delphi_quidel_covidtest/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
import pandas as pd
import numpy as np

from .constants import AGE_GROUPS
from .constants import AGE_GROUPS, FULL_BKFILL_START_DATE



def get_from_s3(start_date, end_date, bucket, logger):
def get_from_s3(params, start_date, end_date, logger):
"""
Get raw data from aws s3 bucket.

Args:
params: dict
read from params.json
start_date: datetime.datetime
pull data from file tagged with date on/after the start date
end_date: datetime.datetime
Expand All @@ -30,6 +32,15 @@ def get_from_s3(start_date, end_date, bucket, logger):
df: pd.DataFrame
time_flag: datetime.datetime
"""
# connect aws s3 bucket
aws_access_key_id = params["aws_credentials"]["aws_access_key_id"]
aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"]
bucket_name = params["bucket_name"]

s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
bucket = s3.Bucket(bucket_name)

time_flag = None
selected_columns = ['SofiaSerNum', 'TestDate', 'Facility', 'City',
'State', 'Zip', 'PatientAge', 'Result1',
Expand Down Expand Up @@ -118,7 +129,7 @@ def fix_date(df, logger):
df["timestamp"].values[mask] = df["StorageDate"].values[mask]
return df

def preprocess_new_data(start_date, end_date, params, test_mode, logger):
def preprocess_new_data(start_date, end_date, params, logger):
"""
Pull and pre-process Quidel Covid Test data.

Expand All @@ -133,32 +144,15 @@ def preprocess_new_data(start_date, end_date, params, test_mode, logger):
pull data from file tagged with date on/before the end date
params: dict
read from params.json
test_mode: bool
pull raw data from s3 or not
logger: logging.Logger
The structured logger.
output:
df: pd.DataFrame
time_flag: datetime.date:
the actual pull end date on which we successfully pull the data
"""
if test_mode:
test_data_dir = "./test_data/test_data.csv"
df, time_flag = pd.read_csv(
test_data_dir,
parse_dates=["StorageDate", "TestDate"]
), datetime(2020, 8, 17)
else:
# connect aws s3 bucket
aws_access_key_id = params["aws_credentials"]["aws_access_key_id"]
aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"]
bucket_name = params["bucket_name"]

s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
bucket = s3.Bucket(bucket_name)
# Get new data from s3
df, time_flag = get_from_s3(start_date, end_date, bucket, logger)
# Get new data from s3
df, time_flag = get_from_s3(params, start_date, end_date, logger)

# No new data can be pulled
if time_flag is None:
Expand Down Expand Up @@ -282,13 +276,16 @@ def pull_quidel_covidtest(params, logger):

"""
cache_dir = params["input_cache_dir"]

test_mode = params["test_mode"]
if params["pull_start_date"] == "":
params["pull_start_date"] = FULL_BKFILL_START_DATE

# pull new data only that has not been ingested
previous_df, pull_start_date = check_intermediate_file(
cache_dir,
datetime.strptime(params["pull_start_date"], '%Y-%m-%d'))
params["pull_start_date"])

if params["pull_start_date"] != FULL_BKFILL_START_DATE:
pull_start_date = datetime.strptime(params["pull_start_date"], '%Y-%m-%d')

if params["pull_end_date"] == "":
pull_end_date = datetime.today()
Expand All @@ -298,7 +295,7 @@ def pull_quidel_covidtest(params, logger):
# Pull data from the file at 5 digit zipcode level
# Use _end_date to check the most recent date that we received data
df, _end_date = preprocess_new_data(
pull_start_date, pull_end_date, params, test_mode, logger)
pull_start_date, pull_end_date, params, logger)

# Utilize previously stored data
if previous_df is not None:
Expand Down Expand Up @@ -343,7 +340,7 @@ def check_export_end_date(input_export_end_date, _end_date,
def check_export_start_date(export_start_date, export_end_date,
export_day_range):
"""
Ensure that the starte date, end date, and day range are mutually consistent.
Ensure that the start date, end date, and day range are mutually consistent.

Parameters:
export_start_date: str
Expand All @@ -359,7 +356,7 @@ def check_export_start_date(export_start_date, export_end_date,

"""
if export_start_date == "":
export_start_date = datetime(2020, 5, 26)
export_start_date = FULL_BKFILL_START_DATE
else:
export_start_date = datetime.strptime(export_start_date, '%Y-%m-%d')
# Only export data from -50 days to -5 days
Expand Down
Loading
Loading