-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpull.py
154 lines (137 loc) · 5.41 KB
/
pull.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
def pull_usafacts_data(base_url: str, metric: str, pop_df: pd.DataFrame) -> pd.DataFrame:
"""Pulls the latest USA Facts data, and conforms it into a dataset
The output dataset has:
- Each row corresponds to (County, Date), denoted (FIPS, timestamp)
- Each row additionally has a column `new_counts` corresponding to the new
new_counts (either `confirmed` cases or `deaths`), and a column
`cumulative_counts`, correspond to the aggregate metric from January 22nd
(as of April 27th) until the latest date.
Note that the raw dataset gives the `cumulative_counts` metric, from which
we compute `new_counts` by taking first differences. Hence, `new_counts`
may be negative. This is wholly dependent on the quality of the raw
dataset.
We filter the data such that we only keep rows with valid FIPS, or "FIPS"
codes defined under the exceptions of the README. The current exceptions
include:
# - 6000: Grand Princess Cruise Ship
# - 2270: Wade Hampton Census Area in AK, but no cases/deaths were assigned
# - 0: statewise unallocated
# - 1: New York City Unallocated/Probable (only exists for NYC)
PS: No information for PR
Parameters
----------
base_url: str
Base URL for pulling the USA Facts data
metric: str
One of 'confirmed' or 'deaths'. The keys of base_url.
pop_df: pd.DataFrame
Read from static file "fips_population.csv".
Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Constants
DROP_COLUMNS = [
"FIPS",
"County Name",
"State",
"stateFIPS"
]
# MIN_FIPS = 1000
# MAX_FIPS = 57000
# Read data
df = pd.read_csv(base_url.format(metric=metric)).rename({"countyFIPS":"FIPS"}, axis=1)
# Check missing FIPS
null_mask = pd.isnull(df["FIPS"])
assert null_mask.sum() == 0
UNEXPECTED_COLUMNS = [x for x in df.columns if "Unnamed" in x]
DROP_COLUMNS.extend(UNEXPECTED_COLUMNS)
# Assign Grand Princess Cruise Ship a special FIPS 90000
# df.loc[df["FIPS"] == 6000, "FIPS"] = 90000
# df.loc[df["FIPS"] == 6000, "stateFIPS"] = 90
# Ignore Grand Princess Cruise Ship and Wade Hampton Census Area in AK
df = df[
(df["FIPS"] != 6000)
& (df["FIPS"] != 2270)
]
# Merge in population LOWERCASE, consistent across confirmed and deaths
# Population for unassigned cases/deaths is NAN
df = pd.merge(df, pop_df, on="FIPS", how="left")
# Change FIPS from 0 to XX000 for statewise unallocated cases/deaths
unassigned_index = (df['FIPS'] == 0)
df.loc[unassigned_index, "FIPS"] = df["stateFIPS"].loc[unassigned_index].values * 1000
# Conform FIPS
df["fips"] = df["FIPS"].apply(lambda x: f"{int(x):05d}")
# Drop unnecessary columns (state is pre-encoded in fips)
try:
df.drop(DROP_COLUMNS, axis=1, inplace=True)
except KeyError as e:
raise ValueError(
"Tried to drop non-existent columns. The dataset "
"schema may have changed. Please investigate and "
"amend DROP_COLUMNS."
)
# Check that columns are either FIPS or dates
try:
columns = list(df.columns)
columns.remove("fips")
columns.remove("population")
# Detects whether there is a non-date string column -- not perfect
_ = [int(x.replace("/", "")) for x in columns]
except ValueError as e:
print(e)
raise ValueError(
"Detected unexpected column(s) "
"after dropping DROP_COLUMNS. The dataset "
"schema may have changed. Please investigate and "
"amend DROP_COLUMNS."
)
# Reshape dataframe
df = df.melt(
id_vars=["fips", "population"],
var_name="timestamp",
value_name="cumulative_counts",
)
# timestamp: str -> datetime
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Add a dummy first row here on day before first day
min_ts = min(df["timestamp"])
df_dummy = df.loc[df["timestamp"] == min_ts].copy()
df_dummy.loc[:, "timestamp"] = min_ts - pd.Timedelta(days=1)
df_dummy.loc[:, "cumulative_counts"] = 0
df = pd.concat([df_dummy, df])
# Obtain new_counts
df.sort_values(["fips", "timestamp"], inplace=True)
df["new_counts"] = df["cumulative_counts"].diff() # 1st discrete difference
# Handle edge cases where we diffed across fips
mask = df["fips"] != df["fips"].shift(1)
df.loc[mask, "new_counts"] = np.nan
df.reset_index(inplace=True, drop=True)
# Final sanity checks
days_by_fips = df.groupby("fips").count()["cumulative_counts"].unique()
unique_days = df["timestamp"].unique()
# each FIPS has same number of rows
if (len(days_by_fips) > 1) or (days_by_fips[0] != len(unique_days)):
raise ValueError("Differing number of days by fips")
min_timestamp = min(unique_days)
max_timestamp = max(unique_days)
n_days = (max_timestamp - min_timestamp) / np.timedelta64(1, "D") + 1
if n_days != len(unique_days):
raise ValueError(
f"Not every day between {min_timestamp} and "
"{max_timestamp} is represented."
)
return df.loc[
df["timestamp"] >= min_ts,
[ # Reorder
"fips",
"timestamp",
"population",
"new_counts",
"cumulative_counts",
],
]