Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Different S3 Services #41

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 95 additions & 28 deletions R/workflow_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ get_run_config <- function(configure_run_file = "configure_run.yml", lake_direct
file.copy(file.path(lake_directory,"configuration",config_set_name,configure_run_file), file.path(lake_directory, "restart", config$location$site_id, config$run_config$sim_name, configure_run_file))
}
}else if(config$run_config$use_s3){
restart_exists <- suppressMessages(aws.s3::object_exists(object = file.path(config$location$site_id, config$run_config$sim_name, configure_run_file), bucket = "restart"))
restart_exists <- suppressMessages(aws.s3::object_exists(object = file.path(config$location$site_id, config$run_config$sim_name, configure_run_file),
bucket = "restart",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS"))))
if(restart_exists){
aws.s3::save_object(object = file.path(config$location$site_id, config$run_config$sim_name, configure_run_file), bucket = "restart", file = file.path(lake_directory, "restart", config$location$site_id, config$run_config$sim_name, configure_run_file))
aws.s3::save_object(object = file.path(config$location$site_id, config$run_config$sim_name, configure_run_file),
bucket = "restart",
file = file.path(lake_directory, "restart", config$location$site_id, config$run_config$sim_name, configure_run_file),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}else{
file.copy(file.path(lake_directory,"configuration",config_set_name,configure_run_file), file.path(lake_directory, "restart", config$location$site_id, config$run_config$sim_name, configure_run_file))
}
Expand Down Expand Up @@ -84,13 +91,25 @@ put_targets <- function(site_id, cleaned_insitu_file = NA, cleaned_met_file = NA

if(use_s3){
if(!is.na(cleaned_insitu_file)){
aws.s3::put_object(file = cleaned_insitu_file, object = file.path(site_id, basename(cleaned_insitu_file)), bucket = "targets")
aws.s3::put_object(file = cleaned_insitu_file,
object = file.path(site_id, basename(cleaned_insitu_file)),
bucket = "targets",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
if(!is.na(cleaned_inflow_file)){
aws.s3::put_object(file = cleaned_inflow_file, object = file.path(site_id, basename(cleaned_inflow_file)), bucket = "targets")
aws.s3::put_object(file = cleaned_inflow_file,
object = file.path(site_id, basename(cleaned_inflow_file)),
bucket = "targets",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
if(!is.na(cleaned_met_file)){
aws.s3::put_object(file = cleaned_met_file, object = file.path(site_id, basename(cleaned_met_file)), bucket = "targets")
aws.s3::put_object(file = cleaned_met_file,
object = file.path(site_id, basename(cleaned_met_file)),
bucket = "targets",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}
}
Expand All @@ -105,7 +124,9 @@ put_targets <- function(site_id, cleaned_insitu_file = NA, cleaned_met_file = NA
#'
get_targets <- function(lake_directory, config){
if(config$run_config$use_s3){
download_s3_objects(lake_directory, bucket = "targets", prefix = config$location$site_id)
download_s3_objects(lake_directory,
bucket = "targets",
prefix = config$location$site_id)
}
}

Expand Down Expand Up @@ -247,7 +268,9 @@ get_restart_file <- function(config, lake_directory){
if(config$run_config$use_s3){
aws.s3::save_object(object = file.path(config$location$site_id, restart_file),
bucket = "forecasts",
file = file.path(lake_directory, "forecasts", config$location$site_id, restart_file))
file = file.path(lake_directory, "forecasts", config$location$site_id, restart_file),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
config$run_config$restart_file <- file.path(lake_directory, "forecasts", config$location$site_id, restart_file)
}
Expand Down Expand Up @@ -286,7 +309,11 @@ update_run_config <- function(config, lake_directory, configure_run_file = "conf
}
yaml::write_yaml(config$run_config, file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name,configure_run_file))
if(config$run_config$use_s3){
aws.s3::put_object(file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name, configure_run_file), object = file.path(config$location$site_id,config$run_config$sim_name, configure_run_file), bucket = "restart")
aws.s3::put_object(file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name, configure_run_file),
object = file.path(config$location$site_id,config$run_config$sim_name, configure_run_file),
bucket = "restart",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
invisible(config)
}
Expand All @@ -312,7 +339,11 @@ update_run_config_neon <- function(config, lake_directory, configure_run_file =
config$run_config$restart_file <- basename(saved_file)
yaml::write_yaml(config$run_config, file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name,configure_run_file))
if(config$run_config$use_s3){
aws.s3::put_object(file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name, configure_run_file), object = file.path(config$location$site_id,config$run_config$sim_name, configure_run_file), bucket = "restart")
aws.s3::put_object(file = file.path(lake_directory,"restart",config$location$site_id,config$run_config$sim_name, configure_run_file),
object = file.path(config$location$site_id,config$run_config$sim_name, configure_run_file),
bucket = "restart",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
invisible(config)
}
Expand All @@ -329,11 +360,19 @@ update_run_config_neon <- function(config, lake_directory, configure_run_file =
#'
put_forecast <- function(saved_file, eml_file_name, config){
if(config$run_config$use_s3){
success <- aws.s3::put_object(file = saved_file, object = file.path(config$location$site_id, basename(saved_file)), bucket = "forecasts")
success <- aws.s3::put_object(file = saved_file,
object = file.path(config$location$site_id, basename(saved_file)),
bucket = "forecasts",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
if(success){
unlink(saved_file)
}
success <- aws.s3::put_object(file = eml_file_name, object = file.path(config$location$site_id, basename(eml_file_name)), bucket = "forecasts")
success <- aws.s3::put_object(file = eml_file_name,
object = file.path(config$location$site_id, basename(eml_file_name)),
bucket = "forecasts",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
if(success){
unlink(eml_file_name)
}
Expand All @@ -351,13 +390,20 @@ put_forecast <- function(saved_file, eml_file_name, config){
#'
download_s3_objects <- function(lake_directory, bucket, prefix){

files <- aws.s3::get_bucket(bucket = bucket, prefix = prefix)
files <- aws.s3::get_bucket(bucket = bucket,
prefix = prefix,
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
keys <- vapply(files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
keys <- keys[!empty]
if(length(keys) > 0){
for(i in 1:length(keys)){
aws.s3::save_object(object = keys[i],bucket = bucket, file = file.path(lake_directory, bucket, keys[i]))
aws.s3::save_object(object = keys[i],
bucket = bucket,
file = file.path(lake_directory, bucket, keys[i]),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}
}
Expand All @@ -371,13 +417,19 @@ download_s3_objects <- function(lake_directory, bucket, prefix){
#' @export
#'
delete_restart <- function(site_id, sim_name){
files <- aws.s3::get_bucket(bucket = "restart", prefix = file.path(site_id, sim_name))
files <- aws.s3::get_bucket(bucket = "restart",
prefix = file.path(site_id, sim_name),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
keys <- vapply(files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
keys <- keys[!empty]
if(length(keys > 0)){
for(i in 1:length(keys)){
aws.s3::delete_object(object = keys[i], bucket = "restart")
aws.s3::delete_object(object = keys[i],
bucket = "restart",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}
}
Expand Down Expand Up @@ -428,7 +480,10 @@ check_noaa_present <- function(lake_directory, configure_run_file = "configure_r
forecast_model = config$met$forecast_met_model)

if(config$run_config$forecast_horizon > 0 & !is.null(noaa_forecast_path)){
noaa_files <- aws.s3::get_bucket(bucket = "drivers", prefix = noaa_forecast_path)
noaa_files <- aws.s3::get_bucket(bucket = "drivers",
prefix = noaa_forecast_path,
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
noaa_forecast_path <- file.path(lake_directory,"drivers", noaa_forecast_path)
keys <- vapply(noaa_files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
Expand Down Expand Up @@ -464,45 +519,57 @@ delete_sim <- function(site_id, sim_name){

if(go){
message("deleting analysis files")
files <- aws.s3::get_bucket(bucket = "analysis", prefix = site_id)
files <- aws.s3::get_bucket(bucket = "analysis",
prefix = site_id,
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
keys <- vapply(files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
keys <- keys[!empty]
keys <- keys[stringr::str_detect(keys, sim_name)]
if(length(keys > 0)){
for(i in 1:length(keys)){
aws.s3::delete_object(object = keys[i], bucket = "analysis")
aws.s3::delete_object(object = keys[i],
bucket = "analysis",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}

#forecasts
message("deleting forecast files")
files <- aws.s3::get_bucket(bucket = "forecasts", prefix = file.path(site_id))
files <- aws.s3::get_bucket(bucket = "forecasts",
prefix = file.path(site_id),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
keys <- vapply(files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
keys <- keys[!empty]
keys <- keys[stringr::str_detect(keys, sim_name)]
if(length(keys > 0)){
for(i in 1:length(keys)){
aws.s3::delete_object(object = keys[i], bucket = "forecasts")
aws.s3::delete_object(object = keys[i],
bucket = "forecasts",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}

message("deleting restart files")
files <- aws.s3::get_bucket(bucket = "restart", prefix = file.path(site_id, sim_name))
files <- aws.s3::get_bucket(bucket = "restart",
prefix = file.path(site_id, sim_name),
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
keys <- vapply(files, `[[`, "", "Key", USE.NAMES = FALSE)
empty <- grepl("/$", keys)
keys <- keys[!empty]
if(length(keys > 0)){
for(i in 1:length(keys)){
aws.s3::delete_object(object = keys[i], bucket = "restart")
aws.s3::delete_object(object = keys[i],
bucket = "restart",
region = Sys.getenv("AWS_DEFAULT_REGION"),
use_https = as.logical(Sys.getenv("USE_HTTPS")))
}
}
}
}