Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rvdss interface and new fn layout so current/historical data can be easily fetched #1551

Merged
merged 11 commits into from
Nov 22, 2024
9 changes: 7 additions & 2 deletions src/acquisition/rvdss/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

# The dataset calls the same viruses, provinces, regions (province groups),
# and country by multiple names. Map each of those to a common abbreviation.
VIRUSES = {
Expand Down Expand Up @@ -82,7 +84,7 @@
# disease data in a dashboard with a static URL. Therefore, this collection
# of URLs does _NOT_ need to be updated. It is used for fetching historical
# data (for dates on or before June 8, 2024) only.
HISTORIC_SEASON_URL = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in
HISTORIC_SEASON_URLS = (HISTORIC_SEASON_REPORTS_URL.format(year_range = year_range) for year_range in
(
"2013-2014",
"2014-2015",
Expand All @@ -101,7 +103,10 @@
DASHBOARD_UPDATE_DATE_FILE = "RVD_UpdateDate.csv"
DASHBOARD_DATA_FILE = "RVD_WeeklyData.csv"

RESP_COUNTS_OUTPUT_FILE = "respiratory_detections.csv"
RESP_DETECTIONS_OUTPUT_FILE = "respiratory_detections.csv"
POSITIVE_TESTS_OUTPUT_FILE = "positive_tests.csv"
COUNTS_OUTPUT_FILE = "number_of_detections.csv"

LAST_WEEK_OF_YEAR = 35

NOW = datetime.datetime.now()
121 changes: 121 additions & 0 deletions src/acquisition/rvdss/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
===============
=== Purpose ===
===============

Stores data provided by rvdss Corp., which contains flu lab test results.
See: rvdss.py


=======================
=== Data Dictionary ===
=======================

`rvdss` is the table where rvdss data is stored.
+----------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+----------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| location | varchar(8) | NO | MUL | NULL | |
| epiweek | int(11) | NO | MUL | NULL | |
| value | float | NO | | NULL | |
+----------+-------------+------+-----+---------+----------------+
id: unique identifier for each record
location: hhs1-10
epiweek: the epiweek during which the queries were executed
value: number of total test records per facility, within each epiweek

=================
=== Changelog ===
=================
2017-12-14:
* add "need update" check

2017-12-02:
* original version
"""

# standard library
import argparse

# third party
import mysql.connector

# first party
from delphi.epidata.acquisition.rvdss import rvdss
import delphi.operations.secrets as secrets
from delphi.utils.epidate import EpiDate
import delphi.utils.epiweek as flu
from delphi.utils.geo.locations import Locations

LOCATIONS = Locations.hhs_list
DATAPATH = "/home/automation/rvdss_data"


def update(locations, first=None, last=None, force_update=False, load_email=True):
# download and prepare data first
qd = rvdss.rvdssData(DATAPATH, load_email)
if not qd.need_update and not force_update:
print("Data not updated, nothing needs change.")
return

qd_data = qd.load_csv()
qd_measurements = qd.prepare_measurements(qd_data, start_weekday=4)
qd_ts = rvdss.measurement_to_ts(qd_measurements, 7, startweek=first, endweek=last)
# connect to the database
u, p = secrets.db.epi
cnx = mysql.connector.connect(user=u, password=p, database="epidata")
cur = cnx.cursor()

def get_num_rows():
cur.execute("SELECT count(1) `num` FROM `rvdss`")
for (num,) in cur:
pass
return num

# check from 4 weeks preceeding the last week with data through this week
cur.execute("SELECT max(`epiweek`) `ew0`, yearweek(now(), 6) `ew1` FROM `rvdss`")
for (ew0, ew1) in cur:
ew0 = 200401 if ew0 is None else flu.add_epiweeks(ew0, -4)
ew0 = ew0 if first is None else first
ew1 = ew1 if last is None else last
print(f"Checking epiweeks between {int(ew0)} and {int(ew1)}...")

# keep track of how many rows were added
rows_before = get_num_rows()

# check rvdss for new and/or revised data
sql = """
INSERT INTO
`rvdss` (`location`, `epiweek`, `value`)
VALUES
(%s, %s, %s)
ON DUPLICATE KEY UPDATE
`value` = %s
"""

total_rows = 0

for location in locations:
if location not in qd_ts:
continue
ews = sorted(qd_ts[location].keys())
num_missing = 0
for ew in ews:
v = qd_ts[location][ew]
sql_data = (location, ew, v, v)
cur.execute(sql, sql_data)
total_rows += 1
if v == 0:
num_missing += 1
if num_missing > 0:
print(f" [{location}] missing {int(num_missing)}/{len(ews)} value(s)")

# keep track of how many rows were added
rows_after = get_num_rows()
print(f"Inserted {int(rows_after - rows_before)}/{int(total_rows)} row(s)")

# cleanup
cur.close()
cnx.commit()
cnx.close()
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import math

from delphi.epidata.acquisition.rvdss.constants import (
DASHBOARD_BASE_URLS_2023_2024_SEASON, HISTORIC_SEASON_URL,
DASHBOARD_BASE_URLS_2023_2024_SEASON, HISTORIC_SEASON_URLS,
ALTERNATIVE_SEASON_BASE_URL, SEASON_BASE_URL, LAST_WEEK_OF_YEAR,
RESP_COUNTS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE
RESP_DETECTIONS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE
)
from delphi.epidata.acquisition.rvdss.utils import (
abbreviate_virus, abbreviate_geo, create_geo_types, check_date_format,
Expand Down Expand Up @@ -367,7 +367,7 @@ def create_percent_positive_detection_table(table,modified_date,start_year, flu=

return(table)

def get_season_reports(url):
def fetch_one_season_from_report(url):
# From the url, go to the main landing page for a season
# which contains all the links to each week in the season
page=requests.get(url)
Expand All @@ -382,13 +382,13 @@ def get_season_reports(url):
# create tables to hold all the data for the season
all_positive_tables=pd.DataFrame()
all_number_tables=pd.DataFrame()
all_respiratory_detection_table=pd.DataFrame()
all_respiratory_detection_tables=pd.DataFrame()

for week_num in range(len(urls)):
current_week = weeks[week_num]
current_week_end = end_dates[week_num]

# In the 2019=2020 season, the webpages for weeks 5 and 47 only have
# In the 2019-2020 season, the webpages for weeks 5 and 47 only have
# the abbreviations table and the headers for the respiratory detections
# table, so they are effectively empty, and skipped
if season[0] == '2019':
Expand Down Expand Up @@ -532,8 +532,8 @@ def get_season_reports(url):
# If not, add the weeks tables into the season table

# check for deduplication pandas
if not respiratory_detection_table.index.isin(all_respiratory_detection_table.index).any():
all_respiratory_detection_table= pd.concat([all_respiratory_detection_table,respiratory_detection_table])
if not respiratory_detection_table.index.isin(all_respiratory_detection_tables.index).any():
all_respiratory_detection_tables= pd.concat([all_respiratory_detection_tables,respiratory_detection_table])

if not combined_positive_tables.index.isin(all_positive_tables.index).any():
all_positive_tables=pd.concat([all_positive_tables,combined_positive_tables])
Expand All @@ -542,40 +542,28 @@ def get_season_reports(url):
if not number_detections_table.index.isin(all_number_tables.index).any():
all_number_tables=pd.concat([all_number_tables,number_detections_table])

# write files to csvs
all_respiratory_detection_table.to_csv(path+"/" + RESP_COUNTS_OUTPUT_FILE, index=True)
all_positive_tables.to_csv(path+"/" + POSITIVE_TESTS_OUTPUT_FILE, index=True)

# Write the number of detections table to csv if it exists (i.e has rows)
if len(all_number_tables) != 0:
all_number_tables.to_csv(path+"/number_of_detections.csv", index=True)

def main():
# Scrape each season. Saves data to CSVs as a side effect.
[get_season_reports(url) for url in HISTORIC_SEASON_URL]
return {
"respiratory_detection": all_respiratory_detection_tables,
"positive": all_positive_tables,
"count": all_number_tables,
}

# Update the end of the 2023-2024 season with the dashboard data
def fetch_archived_dashboard_urls():
## TODO: paste in Christine's code for scraping this list https://health-infobase.canada.ca/respiratory-virus-detections/archive.html
pass

# Load old csvs
old_detection_data = pd.read_csv('season_2023_2024/' + RESP_COUNTS_OUTPUT_FILE).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])
old_positive_data = pd.read_csv('season_2023_2024/' + POSITIVE_TESTS_OUTPUT_FILE).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])
def fetch_report_data():
# Scrape each season.
dict_list = [fetch_one_season_from_report(url) for url in HISTORIC_SEASON_URLS]

for base_url in DASHBOARD_BASE_URLS_2023_2024_SEASON:
# Get weekly dashboard data
weekly_data = get_weekly_data(base_url,2023).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])
positive_data = get_revised_data(base_url)
return dict_list

# Check if indices are already present in the old data
# If not, add the new data
if not weekly_data.index.isin(old_detection_data.index).any():
old_detection_data= pd.concat([old_detection_data,weekly_data],axis=0)

if not positive_data.index.isin(old_positive_data.index).any():
old_positive_data= pd.concat([old_positive_data,positive_data],axis=0)
def fetch_historical_dashboard_data():
# Update the end of the 2023-2024 season with the dashboard data
included_urls = fetch_archived_dashboard_urls()
dict_list = [{} for url in included_urls]

# Overwrite/update csvs
old_detection_data.to_csv('season_2023_2024/' + RESP_COUNTS_OUTPUT_FILE,index=True)
old_positive_data.to_csv('season_2023_2024/' + POSITIVE_TESTS_OUTPUT_FILE,index=True)
for i, base_url in enumerate(included_urls):
dict_list[i]["weekly"] = fetch_dashboard_data(url, 2023)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: Need to make sure key is one of the three standard keys ("county", "positive", etc)


if __name__ == '__main__':
main()
return dict_list
106 changes: 106 additions & 0 deletions src/acquisition/rvdss/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Defines command line interface for the rvdss indicator. Current data (covering the most recent epiweek) and historical data (covering all data before the most recent epiweek) can be generated together or separately.

Defines top-level functions to fetch data and save to disk or DB.
"""

import pandas as pd
import os

from delphi.epidata.acquisition.rvdss.utils import get_weekly_data, get_revised_data, get_dashboard_update_date
from delphi.epidata.acquisition.rvdss.constants import DASHBOARD_BASE_URL, RESP_DETECTIONS_OUTPUT_FILE, POSITIVE_TESTS_OUTPUT_FILE, COUNTS_OUTPUT_FILE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: check imports



def update_current_data():
## TODO: what is the base path for these files?
base_path = "."

data_dict = fetch_dashboard_data(DASHBOARD_BASE_URL, 2024)

table_types = {
"respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE,
"positive": POSITIVE_TESTS_OUTPUT_FILE,
# "count": COUNTS_OUTPUT_FILE, # Dashboards don't contain this data.
}
for tt in table_types.keys():
data = data_dict[table_types]

# Write the tables to separate csvs
path = base_path + "/" + table_types[tt]

# Since this function generates new data weekly, we need to combine it with the existing data, if it exists.
if not os.path.exists(path):
data.to_csv(path,index=True)
else:
old_data = pd.read_csv(path).set_index(['epiweek', 'time_value', 'issue', 'geo_type', 'geo_value'])

# If index already exists in the data on disk, don't add the new data -- we may have already run the weekly data fetch.
## TODO: The check on index maybe should be stricter? Although we do deduplication upstream, so this probably won't find true duplicates
if not data.index.isin(old_data.index).any():
old_data= pd.concat([old_data,data],axis=0)
old_data.to_csv(path,index=True)

# ## TODO
# update_database(data)


def update_historical_data():
## TODO: what is the base path for these files?
base_path = "."

report_dict_list = fetch_report_data()
dashboard_dict_list = fetch_historical_dashboard_data()

table_types = {
"respiratory_detection": RESP_DETECTIONS_OUTPUT_FILE,
"positive": POSITIVE_TESTS_OUTPUT_FILE,
"count": COUNTS_OUTPUT_FILE,
}
for tt in table_types.keys():
# Merge tables together from dashboards and reports for each table type.
dashboard_data = [elem.get(tt, None) for elem in dashboard_dict_list]
report_data = [elem.get(tt, None) for elem in report_dict_list]
data = [report_data, dashboard_data].concat()

# Write the tables to separate csvs
data.to_csv(base_path +"/" + table_types[tt], index=True)

# ## TODO
# update_database(data)


def main():
# args and usage
parser = argparse.ArgumentParser()
# fmt: off
parser.add_argument(
"--current",
"-c",
action="store_true",
help="fetch current data, that is, data for the latest epiweek"
)
parser.add_argument(
"--historical",
"-h",
action="store_true",
help="fetch historical data, that is, data for all available time periods other than the latest epiweek"
)
# fmt: on
args = parser.parse_args()

current_flag, historical_flag = (
args.current,
args.historical,
)
if not current_flag and not historical_flag:
raise Exception("no data was requested")

# Decide what to update
if current_flag:
update_current_data()
if historical_flag:
update_historical_data()


if __name__ == "__main__":
main()
Loading
Loading