diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index f1700be96..312bf2c0b 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -34,6 +34,7 @@ importFrom(dplyr,group_split) importFrom(dplyr,if_else) importFrom(dplyr,mutate) importFrom(dplyr,pull) +importFrom(dplyr,rename) importFrom(dplyr,select) importFrom(dplyr,summarize) importFrom(dplyr,ungroup) @@ -43,6 +44,7 @@ importFrom(lubridate,day) importFrom(lubridate,days_in_month) importFrom(lubridate,make_date) importFrom(lubridate,month) +importFrom(lubridate,with_tz) importFrom(lubridate,year) importFrom(parallel,detectCores) importFrom(quantgen,quantile_lasso) diff --git a/backfill_corrections/delphiBackfillCorrection/R/io.R b/backfill_corrections/delphiBackfillCorrection/R/io.R index e16c90f0e..beb9c54cd 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/io.R +++ b/backfill_corrections/delphiBackfillCorrection/R/io.R @@ -1,15 +1,84 @@ #' Read a parquet file into a dataframe #' -#' @template input_dir-template +#' @template input_file-template #' #' @importFrom arrow read_parquet +#' @importFrom dplyr %>% #' #' @export -read_data <- function(input_dir) { - df <- read_parquet(input_dir, as_data_frame = TRUE) +read_data <- function(input_file) { + df <- read_parquet(input_file, as_data_frame = TRUE) return (df) } +#' Add `issue_date` parsed from filename to daily input, if needed +#' +#' @param df dataframe containing input data from a single parquet file +#' @template input_file-template +#' @template indicator-template +#' @template signal-template +#' @template lag_col-template +#' @template issued_col-template +#' +#' @importFrom stringr str_interp +add_issue_date <- function(df, input_file, indicator, signal, + lag_col = "lag", issued_col = "issue_date") { + daily_pattern <- create_name_pattern(indicator, signal, "daily") + if (grepl(daily_pattern, input_file)) { + # Daily file + if (!(issued_col %in% names(df)) || any(is.na(df[[issued_col]]))) { + # Capture issue date from filename + issue_date <- as.Date( + sub("^.*/.*_as_of_([0-9]{8})[.]parquet$", "\\1", input_file), + format = "%Y%m%d" + ) + df[[issued_col]] <- issue_date + } + } else { + # Rollup file + if (!(lag_col %in% colnames(df)) && !(issued_col %in% colnames(df))) { + stop("since rollup files contain data spanning multiple issues, ", + "the issue date field cannot be reconstructed from the ", + "filename. Thus, either issue date or lag must be ", + str_interp("provided in the input file ${input_file}")) + } + if (!(issued_col %in% colnames(df))) { + df[[issued_col]] <- as.Date(df$time_value + df[[lag_col]]) + } + } + + return(df) +} + +#' Add `lag` field based on issue date and time value, if needed +#' +#' @param df dataframe containing input data from a single parquet file +#' @template lag_col-template +#' @template issued_col-template +add_lag <- function(df, lag_col = "lag", issued_col = "issue_date") { + if ( (lag_col %in% colnames(df) && any(is.na(df[[lag_col]]))) || + !(lag_col %in% colnames(df)) ) { + df[[lag_col]] <- as.integer(df[[issued_col]] - df$time_value) + } + + return(df) +} + +#' Extract date from datetime column in timezone-aware way +#' +#' @param df dataframe containing input data from a single parquet file +#' @param cols vector of names of datetime fields to convert to date +#' +#' @importFrom lubridate with_tz +datetime_to_date <- function(df, cols = c("time_value", "issue_date")) { + for (col in cols) { + if (col %in% colnames(df)) { + df[[col]] <- as.Date(with_tz(df[[col]], "UTC")) + } + } + return(df) +} + #' Export the result to customized directory #' #' @param test_data test data containing prediction results diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index 8a67cb6e7..557d0ed28 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -227,7 +227,7 @@ run_backfill <- function(df, params, #' #' @template params-template #' -#' @importFrom dplyr bind_rows mutate +#' @importFrom dplyr bind_rows mutate rename #' @importFrom parallel detectCores #' @importFrom rlang .data #' @@ -287,9 +287,15 @@ main <- function(params) { msg_ts("Reading in and combining associated files") input_data <- lapply( files_list, - function(file) {read_data(file)} + function(file) { + read_data(file) %>% + datetime_to_date() %>% + add_issue_date(file, input_group$indicator, input_group$signal) %>% + add_lag() + } ) %>% - bind_rows() + bind_rows() %>% + rename(geo_value = .data$fips) if (nrow(input_data) == 0) { warning(str_interp( diff --git a/backfill_corrections/delphiBackfillCorrection/R/utils.R b/backfill_corrections/delphiBackfillCorrection/R/utils.R index 550b84f50..5c50a4ef7 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/utils.R +++ b/backfill_corrections/delphiBackfillCorrection/R/utils.R @@ -148,12 +148,9 @@ validity_checks <- function(df, value_type, num_col, denom_col, signal_suffixes, stop("No 'time_value' column detected for the reference date!") } - # issue_date or lag should exist in the dataset - if ( !lag_col %in% colnames(df) ) { - if ( issued_col %in% colnames(df) ) { - df$lag = as.integer(df$issue_date - df$time_value) - } - else {stop("No issue_date or lag exists!")} + # issue_date and lag should exist in the dataset + if ( !(lag_col %in% colnames(df)) || !(issued_col %in% colnames(df))) { + stop("issue_date and lag fields must exist in the input data") } return(list(df = df, value_cols = value_cols)) diff --git a/backfill_corrections/delphiBackfillCorrection/man-roxygen/input_file-template.R b/backfill_corrections/delphiBackfillCorrection/man-roxygen/input_file-template.R new file mode 100644 index 000000000..2067a050d --- /dev/null +++ b/backfill_corrections/delphiBackfillCorrection/man-roxygen/input_file-template.R @@ -0,0 +1,2 @@ +#' @param input_file path to the an individual file containing input data +#' in `.parquet` format. diff --git a/backfill_corrections/delphiBackfillCorrection/man/add_issue_date.Rd b/backfill_corrections/delphiBackfillCorrection/man/add_issue_date.Rd new file mode 100644 index 000000000..7f53fc0a3 --- /dev/null +++ b/backfill_corrections/delphiBackfillCorrection/man/add_issue_date.Rd @@ -0,0 +1,38 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/io.R +\name{add_issue_date} +\alias{add_issue_date} +\title{Add `issue_date` parsed from filename to daily input, if needed} +\usage{ +add_issue_date( + df, + input_file, + indicator, + signal, + lag_col = "lag", + issued_col = "issue_date" +) +} +\arguments{ +\item{df}{dataframe containing input data from a single parquet file} + +\item{input_file}{path to the an individual file containing input data +in `.parquet` format.} + +\item{indicator}{string specifying the name of the indicator as used in +`parquet` input data filenames. One indicator can be associated +with multiple signals.} + +\item{signal}{string specifying the name of the signal as used in +`parquet` input data filenames. One indicator can be associated +with multiple signals.} + +\item{lag_col}{string specifying name of lag field within +the input dataframe.} + +\item{issued_col}{string specifying name of issue date (version) field within +the input dataframe.} +} +\description{ +Add `issue_date` parsed from filename to daily input, if needed +} diff --git a/backfill_corrections/delphiBackfillCorrection/man/add_lag.Rd b/backfill_corrections/delphiBackfillCorrection/man/add_lag.Rd new file mode 100644 index 000000000..b4779c1a7 --- /dev/null +++ b/backfill_corrections/delphiBackfillCorrection/man/add_lag.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/io.R +\name{add_lag} +\alias{add_lag} +\title{Add `lag` field based on issue date and time value, if needed} +\usage{ +add_lag(df, lag_col = "lag", issued_col = "issue_date") +} +\arguments{ +\item{df}{dataframe containing input data from a single parquet file} + +\item{lag_col}{string specifying name of lag field within +the input dataframe.} + +\item{issued_col}{string specifying name of issue date (version) field within +the input dataframe.} +} +\description{ +Add `lag` field based on issue date and time value, if needed +} diff --git a/backfill_corrections/delphiBackfillCorrection/man/datetime_to_date.Rd b/backfill_corrections/delphiBackfillCorrection/man/datetime_to_date.Rd new file mode 100644 index 000000000..8d5e2c409 --- /dev/null +++ b/backfill_corrections/delphiBackfillCorrection/man/datetime_to_date.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/io.R +\name{datetime_to_date} +\alias{datetime_to_date} +\title{Extract date from datetime column in timezone-aware way} +\usage{ +datetime_to_date(df, cols = c("time_value", "issue_date")) +} +\arguments{ +\item{df}{dataframe containing input data from a single parquet file} + +\item{cols}{vector of names of datetime fields to convert to date} +} +\description{ +Extract date from datetime column in timezone-aware way +} diff --git a/backfill_corrections/delphiBackfillCorrection/man/read_data.Rd b/backfill_corrections/delphiBackfillCorrection/man/read_data.Rd index 1b5f24726..40335fd35 100644 --- a/backfill_corrections/delphiBackfillCorrection/man/read_data.Rd +++ b/backfill_corrections/delphiBackfillCorrection/man/read_data.Rd @@ -4,10 +4,11 @@ \alias{read_data} \title{Read a parquet file into a dataframe} \usage{ -read_data(input_dir) +read_data(input_file) } \arguments{ -\item{input_dir}{path to the directory containing input data} +\item{input_file}{path to the an individual file containing input data +in `.parquet` format.} } \description{ Read a parquet file into a dataframe