Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backfill_corrections/delphiBackfillCorrection/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Imports:
tidyr,
zoo,
utils,
rlang,
parallel
Suggests:
knitr (>= 1.15),
Expand Down
5 changes: 0 additions & 5 deletions backfill_corrections/delphiBackfillCorrection/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ importFrom(dplyr,filter)
importFrom(dplyr,group_by)
importFrom(dplyr,group_split)
importFrom(dplyr,if_else)
importFrom(dplyr,mutate)
importFrom(dplyr,pull)
importFrom(dplyr,rename)
importFrom(dplyr,select)
importFrom(dplyr,starts_with)
importFrom(dplyr,summarize)
Expand All @@ -50,9 +48,6 @@ importFrom(lubridate,year)
importFrom(parallel,detectCores)
importFrom(quantgen,quantile_lasso)
importFrom(readr,write_csv)
importFrom(rlang,":=")
importFrom(rlang,.data)
importFrom(rlang,.env)
importFrom(stats,coef)
importFrom(stats,nlm)
importFrom(stats,pbeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ objective <- function(theta, x, prob, ...) {
#' @param model_save_dir directory containing trained models
#'
#' @importFrom stats nlm predict
#' @importFrom dplyr %>% filter
#' @importFrom dplyr filter
#' @importFrom quantgen quantile_lasso
#'
est_priors <- function(train_data, prior_test_data, geo, value_type, dw, taus,
Expand All @@ -63,8 +63,8 @@ est_priors <- function(train_data, prior_test_data, geo, value_type, dw, taus,
model_save_dir, start=c(0, log(10)),
base_pseudo_denom=1000, base_pseudo_num=10,
train_models = TRUE, make_predictions = TRUE) {
sub_train_data <- train_data %>% filter(train_data[[dw]] == 1)
sub_test_data <- prior_test_data %>% filter(prior_test_data[[dw]] == 1)
sub_train_data <- filter(train_data, train_data[[dw]] == 1)
sub_test_data <- filter(prior_test_data, prior_test_data[[dw]] == 1)
if (nrow(sub_test_data) == 0) {
pseudo_denom <- base_pseudo_denom
pseudo_num <- base_pseudo_num
Expand Down
19 changes: 8 additions & 11 deletions backfill_corrections/delphiBackfillCorrection/R/io.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@ read_data <- function(input_file) {
#' Make sure data contains a `geo_value` field
#'
#' @template df-template
#'
#' @importFrom dplyr rename select
#' @importFrom rlang .data
fips_to_geovalue <- function(df) {
if ( !("geo_value" %in% colnames(df)) ) {
if ( !("fips" %in% colnames(df)) ) {
stop("Either `fips` or `geo_value` field must be available")
}
df <- rename(df, geo_value = .data$fips)
df$geo_value <- df$fips
}
if ( "fips" %in% colnames(df) ) {
df <- select(df, -.data$fips)
df$fips <- NULL
}
return(df)
}
Expand Down Expand Up @@ -63,10 +60,10 @@ export_test_result <- function(test_data, coef_data, indicator, signal,
dir.create(file.path(export_dir, signal_dir), showWarnings = FALSE)

if (nrow(test_data) == 0) {
warning(str_interp("No test data available for ${signal_info}"))
warning("No test data available for ", signal_info)
} else {
msg_ts(str_interp("Saving predictions to disk for ${signal_info} "))
pred_output_file <- str_interp("prediction_${base_name}")
msg_ts("Saving predictions to disk for ", signal_info)
pred_output_file <- paste0("prediction_", base_name)

prediction_col <- colnames(test_data)[grepl("^predicted", colnames(test_data))]
expected_col <- c("time_value", "issue_date", "lag", "geo_value",
Expand All @@ -75,10 +72,10 @@ export_test_result <- function(test_data, coef_data, indicator, signal,
}

if (nrow(coef_data) == 0) {
warning(str_interp("No coef data available for ${signal_info}"))
warning("No coef data available for ", signal_info)
} else {
msg_ts(str_interp("Saving coefficients to disk for ${signal_info}"))
coef_output_file <- str_interp("coefs_${base_name}")
msg_ts("Saving coefficients to disk for ", signal_info)
coef_output_file <- paste0("coefs_", base_name)
write_csv(coef_data, file.path(export_dir, signal_dir, coef_output_file))
}
}
Expand Down
103 changes: 46 additions & 57 deletions backfill_corrections/delphiBackfillCorrection/R/main.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,14 @@
#' @template indicator-template
#' @template signal-template
#'
#' @importFrom dplyr %>% filter select group_by summarize across everything group_split ungroup
#' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup
#' @importFrom tidyr drop_na
#' @importFrom rlang .data .env
#' @importFrom stringr str_interp
#'
#' @export
run_backfill <- function(df, params,
refd_col = "time_value", lag_col = "lag", issued_col = "issue_date",
signal_suffixes = c(""), indicator = "", signal = "") {
df <- filter(df, .data$lag < params$ref_lag + 30) # a rough filtration to save memory
df <- filter(df, lag < params$ref_lag + 30) # a rough filtration to save memory

geo_levels <- params$geo_levels
if ("state" %in% geo_levels) {
Expand All @@ -28,23 +26,24 @@ run_backfill <- function(df, params,
}

for (geo_level in geo_levels) {
msg_ts(str_interp("geo level ${geo_level}"))
msg_ts("geo level ", geo_level)
# Get full list of interested locations
if (geo_level == "state") {
# Drop county field and make new "geo_value" field from "state_id".
# Aggregate counties up to state level
agg_cols <- c("geo_value", issued_col, refd_col, lag_col)
# Sum all non-agg columns. Summarized columns keep original names
df$geo_value <- df$state_id
df$state_id <- NULL
df <- df %>%
select(-.data$geo_value, geo_value = .data$state_id) %>%
group_by(across(agg_cols)) %>%
summarize(across(everything(), sum)) %>%
ungroup()
}
if (geo_level == "county") {
# Keep only 200 most populous (within the US) counties
top_200_geos <- get_populous_counties()
df <- filter(df, .data$geo_value %in% top_200_geos)
df <- filter(df, geo_value %in% top_200_geos)
}

test_data_list <- list()
Expand All @@ -59,13 +58,13 @@ run_backfill <- function(df, params,
}

msg_ts("Splitting data into geo groups")
group_dfs <- group_split(df, .data$geo_value)
group_dfs <- group_split(df, geo_value)

# Build model for each location
for (subdf in group_dfs) {
geo <- subdf$geo_value[1]

msg_ts(str_interp("Processing ${geo} geo group"))
msg_ts("Processing ", geo, " geo group")

min_refd <- min(subdf[[refd_col]])
max_refd <- max(subdf[[refd_col]])
Expand All @@ -78,7 +77,7 @@ run_backfill <- function(df, params,
# process again. Main use case is for quidel which has overall and
# age-based signals.
if (signal_suffix != "") {
msg_ts(str_interp("signal suffix ${signal_suffix}"))
msg_ts("signal suffix ", signal_suffix)
num_col <- paste(params$num_col, signal_suffix, sep = "_")
denom_col <- paste(params$denom_col, signal_suffix, sep = "_")
} else {
Expand All @@ -87,7 +86,7 @@ run_backfill <- function(df, params,
}

for (value_type in params$value_types) {
msg_ts(str_interp("value type ${value_type}"))
msg_ts("value type ", value_type)
# Handle different signal types
if (value_type == "count") { # For counts data only
combined_df <- fill_missing_updates(subdf, num_col, refd_col, lag_col)
Expand All @@ -113,15 +112,17 @@ run_backfill <- function(df, params,
)
}
combined_df <- add_params_for_dates(combined_df, refd_col, lag_col)
combined_df <- combined_df %>% filter(.data$lag < params$ref_lag)
combined_df <- filter(combined_df, lag < params$ref_lag)

geo_train_data <- combined_df %>%
filter(.data$issue_date < params$training_end_date) %>%
filter(.data$target_date <= params$training_end_date) %>%
filter(.data$target_date > params$training_start_date) %>%
geo_train_data <- filter(combined_df,
issue_date < params$training_end_date,
target_date <= params$training_end_date,
target_date > params$training_start_date,
) %>%
drop_na()
geo_test_data <- combined_df %>%
filter(.data$issue_date %in% params$test_dates) %>%
geo_test_data <- filter(combined_df,
issue_date %in% params$test_dates
) %>%
drop_na()

if (nrow(geo_test_data) == 0) {
Expand All @@ -135,9 +136,10 @@ run_backfill <- function(df, params,

if (value_type == "fraction") {
# Use beta prior approach to adjust fractions
geo_prior_test_data = combined_df %>%
filter(.data$issue_date > min(params$test_dates) - 7) %>%
filter(.data$issue_date <= max(params$test_dates))
geo_prior_test_data = filter(combined_df,
issue_date > min(params$test_dates) - 7,
issue_date <= max(params$test_dates)
)
updated_data <- frac_adj(geo_train_data, geo_test_data, geo_prior_test_data,
indicator = indicator, signal = signal,
geo_level = geo_level, signal_suffix = signal_suffix,
Expand All @@ -154,16 +156,15 @@ run_backfill <- function(df, params,
}
max_raw = sqrt(max(geo_train_data$value_raw))
for (test_lag in params$test_lags) {
msg_ts(str_interp("test lag ${test_lag}"))
msg_ts("test lag ", test_lag)
filtered_data <- data_filteration(test_lag, geo_train_data,
geo_test_data, params$lag_pad)
train_data <- filtered_data[[1]]
test_data <- filtered_data[[2]]

if (nrow(train_data) == 0 || nrow(test_data) == 0) {
msg_ts(str_interp(
"Not enough data to either train or test for test_lag ${test_lag}, skipping"
))
msg_ts("Not enough data to either train or test for test_lag ",
test_lag, ", skipping")
next
}

Expand Down Expand Up @@ -238,9 +239,8 @@ run_backfill <- function(df, params,
#' @template lag_col-template
#' @template issued_col-template
#'
#' @importFrom dplyr bind_rows mutate %>%
#' @importFrom dplyr bind_rows %>%
#' @importFrom parallel detectCores
#' @importFrom rlang .data :=
#' @importFrom stringr str_interp
#'
#' @export
Expand All @@ -253,7 +253,7 @@ main <- function(params,

indicators_subset <- INDICATORS_AND_SIGNALS
if (params$indicators != "all") {
indicators_subset <- filter(indicators_subset, .data$indicator == params$indicators)
indicators_subset <- filter(indicators_subset, indicator == params$indicators)
}
if (nrow(indicators_subset) == 0) {
stop("no indicators to process")
Expand Down Expand Up @@ -288,62 +288,51 @@ main <- function(params,
params$training_start_date <- result$training_start_date
params$training_end_date <- result$training_end_date

msg_ts(paste0(
str_interp("training_start_date is ${params$training_start_date}, "),
str_interp("training_end_date is ${params$training_end_date}")
))
msg_ts("training_start_date is ", params$training_start_date,
", training_end_date is ", params$training_end_date)

# Loop over every indicator + signal combination.
for (group_i in seq_len(nrow(indicators_subset))) {
input_group <- indicators_subset[group_i,]
msg_ts(str_interp(
"Processing indicator ${input_group$indicator} signal ${input_group$signal}"
))
msg_ts("Processing indicator ", input_group$indicator, " signal ", input_group$signal)

files_list <- get_files_list(
input_group$indicator, input_group$signal, params, input_group$sub_dir
)
if (length(files_list) == 0) {
warning(str_interp(
"No files found for indicator ${input_group$indicator} signal ${input_group$signal}, skipping"
))
warning("No files found for indicator indicator ", input_group$indicator,
" signal ", input_group$signal, ", skipping")
next
}

msg_ts("Reading in and combining associated files")
input_data <- lapply(
files_list,
function(file) {
# refd_col and issued_col read in as strings
read_data(file) %>%
fips_to_geovalue() %>%
mutate(
# Use `glue` syntax to construct a new field by variable,
# from https://stackoverflow.com/a/26003971/14401472
"{refd_col}" := as.Date(.data[[refd_col]], "%Y-%m-%d"),
"{issued_col}" := as.Date(.data[[issued_col]], "%Y-%m-%d")
)
fips_to_geovalue()
}
) %>%
bind_rows()

if (nrow(input_data) == 0) {
warning(str_interp(
"No data available for indicator ${input_group$indicator} signal ${input_group$signal}, skipping"
))
warning("No data available for indicator ", input_group$indicator,
" signal ", input_group$signal, ", skipping")
next
}

# Check data type and required columns
msg_ts("Validating input data")
for (value_type in params$value_types) {
msg_ts(str_interp("for ${value_type}"))
result <- validity_checks(
input_data, value_type,
params$num_col, params$denom_col, input_group$name_suffix,
refd_col = refd_col, lag_col = lag_col, issued_col = issued_col
)
input_data <- result[["df"]]
}
# Validate while date fields still stored as strings for speed.
input_data <- validity_checks(
input_data, params$value_types,
params$num_col, params$denom_col, input_group$name_suffix,
refd_col = refd_col, lag_col = lag_col, issued_col = issued_col
)

input_data[[refd_col]] <- as.Date(input_data[[refd_col]], "%Y-%m-%d")
input_data[[issued_col]] <- as.Date(input_data[[issued_col]], "%Y-%m-%d")

# Check available training days
training_days_check(input_data[[issued_col]], params$training_days)
Expand Down
Loading