Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backfill_corrections/delphiBackfillCorrection/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ importFrom(dplyr,group_split)
importFrom(dplyr,if_else)
importFrom(dplyr,mutate)
importFrom(dplyr,pull)
importFrom(dplyr,rename)
importFrom(dplyr,select)
importFrom(dplyr,summarize)
importFrom(dplyr,ungroup)
Expand All @@ -43,6 +44,7 @@ importFrom(lubridate,day)
importFrom(lubridate,days_in_month)
importFrom(lubridate,make_date)
importFrom(lubridate,month)
importFrom(lubridate,with_tz)
importFrom(lubridate,year)
importFrom(parallel,detectCores)
importFrom(quantgen,quantile_lasso)
Expand Down
75 changes: 72 additions & 3 deletions backfill_corrections/delphiBackfillCorrection/R/io.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,84 @@
#' Read a parquet file into a dataframe
#'
#' @template input_dir-template
#' @template input_file-template
#'
#' @importFrom arrow read_parquet
#' @importFrom dplyr %>%
#'
#' @export
read_data <- function(input_dir) {
df <- read_parquet(input_dir, as_data_frame = TRUE)
read_data <- function(input_file) {
df <- read_parquet(input_file, as_data_frame = TRUE)
return (df)
}

#' Add `issue_date` parsed from filename to daily input, if needed
#'
#' @param df dataframe containing input data from a single parquet file
#' @template input_file-template
#' @template indicator-template
#' @template signal-template
#' @template lag_col-template
#' @template issued_col-template
#'
#' @importFrom stringr str_interp
add_issue_date <- function(df, input_file, indicator, signal,
lag_col = "lag", issued_col = "issue_date") {
daily_pattern <- create_name_pattern(indicator, signal, "daily")
if (grepl(daily_pattern, input_file)) {
# Daily file
if (!(issued_col %in% names(df)) || any(is.na(df[[issued_col]]))) {
# Capture issue date from filename
issue_date <- as.Date(
sub("^.*/.*_as_of_([0-9]{8})[.]parquet$", "\\1", input_file),
format = "%Y%m%d"
)
df[[issued_col]] <- issue_date
}
} else {
# Rollup file
if (!(lag_col %in% colnames(df)) && !(issued_col %in% colnames(df))) {
stop("since rollup files contain data spanning multiple issues, ",
"the issue date field cannot be reconstructed from the ",
"filename. Thus, either issue date or lag must be ",
str_interp("provided in the input file ${input_file}"))
}
if (!(issued_col %in% colnames(df))) {
df[[issued_col]] <- as.Date(df$time_value + df[[lag_col]])
}
}

return(df)
}

#' Add `lag` field based on issue date and time value, if needed
#'
#' @param df dataframe containing input data from a single parquet file
#' @template lag_col-template
#' @template issued_col-template
add_lag <- function(df, lag_col = "lag", issued_col = "issue_date") {
if ( (lag_col %in% colnames(df) && any(is.na(df[[lag_col]]))) ||
!(lag_col %in% colnames(df)) ) {
df[[lag_col]] <- as.integer(df[[issued_col]] - df$time_value)
}

return(df)
}

#' Extract date from datetime column in timezone-aware way
#'
#' @param df dataframe containing input data from a single parquet file
#' @param cols vector of names of datetime fields to convert to date
#'
#' @importFrom lubridate with_tz
datetime_to_date <- function(df, cols = c("time_value", "issue_date")) {
for (col in cols) {
if (col %in% colnames(df)) {
df[[col]] <- as.Date(with_tz(df[[col]], "UTC"))
}
}
return(df)
}

#' Export the result to customized directory
#'
#' @param test_data test data containing prediction results
Expand Down
12 changes: 9 additions & 3 deletions backfill_corrections/delphiBackfillCorrection/R/main.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ run_backfill <- function(df, params,
#'
#' @template params-template
#'
#' @importFrom dplyr bind_rows mutate
#' @importFrom dplyr bind_rows mutate rename
#' @importFrom parallel detectCores
#' @importFrom rlang .data
#'
Expand Down Expand Up @@ -287,9 +287,15 @@ main <- function(params) {
msg_ts("Reading in and combining associated files")
input_data <- lapply(
files_list,
function(file) {read_data(file)}
function(file) {
read_data(file) %>%
datetime_to_date() %>%
add_issue_date(file, input_group$indicator, input_group$signal) %>%
add_lag()
}
) %>%
bind_rows()
bind_rows() %>%
rename(geo_value = .data$fips)

if (nrow(input_data) == 0) {
warning(str_interp(
Expand Down
9 changes: 3 additions & 6 deletions backfill_corrections/delphiBackfillCorrection/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,9 @@ validity_checks <- function(df, value_type, num_col, denom_col, signal_suffixes,
stop("No 'time_value' column detected for the reference date!")
}

# issue_date or lag should exist in the dataset
if ( !lag_col %in% colnames(df) ) {
if ( issued_col %in% colnames(df) ) {
df$lag = as.integer(df$issue_date - df$time_value)
}
else {stop("No issue_date or lag exists!")}
# issue_date and lag should exist in the dataset
if ( !(lag_col %in% colnames(df)) || !(issued_col %in% colnames(df))) {
stop("issue_date and lag fields must exist in the input data")
}

return(list(df = df, value_cols = value_cols))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#' @param input_file path to the an individual file containing input data
#' in `.parquet` format.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions backfill_corrections/delphiBackfillCorrection/man/add_lag.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.