Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forcing DAG direction #1294

Closed
edgBR opened this issue Jul 9, 2020 · 7 comments
Closed

Forcing DAG direction #1294

edgBR opened this issue Jul 9, 2020 · 7 comments

Comments

@edgBR
Copy link

edgBR commented Jul 9, 2020

Prework

Dear community, thanks to will I was able to complete my drake workflow splitting a model fitting using the fable package in a way that allow me to decrease the memory consumption of my server from 220GBs to 70GBs (pretty big success here) with the only limitation increasing 50% my running time (from 60mins to 90).

Prework is available here: #1293

Description

Now I am trying to fetch all of the accuracy metrics of my models to get the best one but the problem is that this step is being executed before my models run (maybe because the accuracy csv files are already there?)

Reproducible example

The plan is as follows:

plan_branched <- drake_plan(
  life_counter_data = getLifeCounterData(environment = "PROD", 
                                         key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                                         max_forecasting_horizon = argument_parser$horizon),
  unit_metadata = getMetadata(environment = "PROD", 
                              key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                              operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1,2,3)),
  unit_with_recent_data = getLastData(life_counter_data),
  processed_data = featureEngineering(raw_data = life_counter_data, 
                                      metadata = unit_metadata, 
                                      recent_units = unit_with_recent_data,
                                      max_forecasting_horizon = argument_parser$horizon),
  ts_models = target(
    trainModels(
      input_data = processed_data, 
      max_forecast_horizon = argument_parser$horizon,
      max_multisession_cores = argument_parser$sessions,
      model_type = type
    ),
    transform = map(type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "train", 
      models = ts_models, 
      max_forecast_horizon = argument_parser$horizon,
      directory_out = "/data1/"
    ),
    transform = map(ts_models, .id = type)
  ),
  saving = target(
    saveModels(
      models = ts_models, 
      directory_out = "/data1/", 
      max_forecasting_horizon = argument_parser$horizon, 
      max_multisession_cores = argument_parser$sessions
    ),
    transform = map(ts_models, .id = type)
  ),
  final_accuracy = target(bestModel(models_metrics_uri = "/data1/", 
                             metric_selected = "MAPE", 
                             final_metrics_uri = "/data1/", 
                             metrics_store = "local", 
                             max_forecast_horizon = argument_parser$horizon)
)
)

My final accuracy function is as follows:

#' Selecting best time series model for individual units
#' @author 
#' @param model_metrics_uri 
#' @param metric_selected 
#' @param final_metrics_uri
#' @param metrics_store
#' @return A tibble with the processed data and no gaps caused by different start date and end date of the individual time series.


bestModel <- function(models_metrics_uri, metric_selected, final_metrics_uri, metrics_store, max_forecast_horizon) {
  if(metrics_store == "local") {
    
    model_metrics_joined <- list.files(path = models_metrics_uri, pattern = "*.csv", full.names = TRUE) %>% 
      map_df(~fread(.)) %>% 
      filter_all(all_vars(!is.infinite(.))) %>% 
      drop_na() %>% 
      group_by(a,b,c)
    
    best_model_tbl <- model_metrics_joined %>% summarise(metric = min({metric_selected}))
    print(paste0("Saving best model accuracy metrics for horizon ", max_forecast_horizon))
    accuracy_log = write_csv(x = best_model_tbl,
                             path = paste0(final_metrics_uri, "final_accuracy_metrics_horizon_", max_forecast_horizon, ".csv"))
    
  } else if(metrics_store =="s3") {
    
    #TODO max_priority
    
  } else if(metrics_store =="cloudwatch") {
    
    #TODO holding up until we decide model monitoring platform
    
  } else if(metrics_store == "mlflow") {
    
    #TODO holding up until we decide model monitoring platform, but prioritized over cloudwatch as it is avaialble in the RStudio instance
    
  }
}

But my dag looks as follows:

image

Desired result

I would like to load the accuracy metrics after I have save my models and compute the accuracy.

Session info

sessionInfo() 
R version 4.0.0 (2020-04-24) 
Platform: x86_64-pc-linux-gnu (64-bit) 
Running under: Ubuntu 16.04.6 LTS  
Matrix products: default BLAS:   /usr/lib/atlas-base/atlas/libblas.so.3.0 
LAPACK: /usr/lib/atlas-base/atlas/liblapack.so.3.0 
locale:  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=en_US.UTF-8     [6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C             [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C    
     
attached base packages: 
[1] stats     graphics  grDevices utils     datasets  methods   base      
 
other attached packages: 
 [1] tidyselect_1.1.0      feather_0.3.5         xts_0.12-0            zoo_1.8-8             DescTools_0.99.35     data.table_1.12.8     qs_0.22.1              [8] optparse_1.6.6        furrr_0.1.0           future_1.17.0         imputeTS_3.0          timetk_2.0.0          feasts_0.1.3          forecast_8.12 
 [15] fable.prophet_0.1.0   Rcpp_1.0.4.6          fable_0.2.1.9000      fabletools_0.2.0.9000 tsibble_0.9.1         forcats_0.5.0         stringr_1.4.0         
[22] dplyr_1.0.0           purrr_0.3.4           readr_1.3.1           tidyr_1.1.0           tibble_3.0.1          ggplot2_3.3.2         tidyverse_1.3.0       [29] aws.s3_0.3.21         drake_7.12.2          tictoc_1.0            ConfigParser_1.0.0    R6_2.4.1              ini_0.3.1             DBI_1.1.0             [36] odbc_1.2.2            lubridate_1.7.9        

loaded via a namespace (and not attached):  

[1] colorspace_1.4-1      ellipsis_0.3.0        class_7.3-17          base64enc_0.1-3       fs_1.4.1              rstudioapi_0.11       listenv_0.8.0          
[8] farver_2.0.3          getopt_1.20.3         bit64_0.9-7           mvtnorm_1.1-0         prodlim_2019.11.13    fansi_0.4.1           xml2_1.3.2            
[15] codetools_0.2-16      splines_4.0.0         jsonlite_1.7.0        packrat_0.5.0         broom_0.5.6           anytime_0.3.7         dbplyr_1.4.3          
[22] compiler_4.0.0        httr_1.4.1            backports_1.1.8       assertthat_0.2.1      Matrix_1.2-18         cli_2.0.2             htmltools_0.5.0       
[29] visNetwork_2.0.9      prettyunits_1.1.1     tools_4.0.0           igraph_1.2.5          gtable_0.3.0          glue_1.4.1            cellranger_1.1.0      
[36] fracdiff_1.5-1        vctrs_0.3.1           urca_1.3-0            nlme_3.1-147          lmtest_0.9-37         timeDate_3043.102     gower_0.2.1           
[43] globals_0.12.5        rvest_0.3.5           lifecycle_0.2.0       MASS_7.3-51.6         scales_1.1.0          ipred_0.9-9           aws.ec2metadata_0.2.0 
[50] hms_0.5.3             parallel_4.0.0        expm_0.999-4          yaml_2.2.1            quantmod_0.4.17       curl_4.3              aws.signature_0.5.2   
[57] rpart_4.1-15          stringi_1.4.6         tseries_0.10-47       TTR_0.23-6            filelock_1.0.2        boot_1.3-25           lava_1.6.7            
[64] storr_1.2.1           rlang_0.4.6           pkgconfig_2.0.3       distributional_0.1.0  lattice_0.20-41       htmlwidgets_1.5.1     stinepack_1.4         
[71] recipes_0.1.12        bit_1.1-15.2          magrittr_1.5          generics_0.0.2        base64url_1.4         txtq_0.2.0            pillar_1.4.4          
[78] haven_2.2.0           withr_2.2.0           survival_3.1-12       nnet_7.3-14           modelr_0.1.7          crayon_1.3.4          RApiSerialize_0.1.0   
[85] progress_1.2.2        grid_4.0.0            readxl_1.3.1          blob_1.2.1            reprex_0.3.0          digest_0.6.25         stringfish_0.12.1     [92] munsell_0.5.0         sessioninfo_1.1.1     quadprog_1.5-8
@wlandau
Copy link
Member

wlandau commented Jul 9, 2020

drake discovers dependency relationships using static code analysis. The command for final_accuracy must literally mention the symbols of any targets it depends on. The following plan should work.

library(drake)
model_types <- c("model1", "model2")
plan <- drake_plan(
  life_counter_data = getLifeCounterData(environment = "PROD", 
                                         key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                                         max_forecasting_horizon = argument_parser$horizon),
  unit_metadata = getMetadata(environment = "PROD", 
                              key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
                              operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1,2,3)),
  unit_with_recent_data = getLastData(life_counter_data),
  processed_data = featureEngineering(raw_data = life_counter_data, 
                                      metadata = unit_metadata, 
                                      recent_units = unit_with_recent_data,
                                      max_forecasting_horizon = argument_parser$horizon),
  ts_models = target(
    trainModels(
      input_data = processed_data, 
      max_forecast_horizon = argument_parser$horizon,
      max_multisession_cores = argument_parser$sessions,
      model_type = type
    ),
    transform = map(type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "train", 
      models = ts_models, 
      max_forecast_horizon = argument_parser$horizon,
      directory_out = "/data1/"
    ),
    transform = map(ts_models, .id = type)
  ),
  saving = target(
    saveModels(
      models = ts_models, 
      directory_out = "/data1/", 
      max_forecasting_horizon = argument_parser$horizon, 
      max_multisession_cores = argument_parser$sessions
    ),
    transform = map(ts_models, .id = type)
  ),
  aggregated_accuracy = target(
    # Could be dplyr::bind_rows(accuracy)
    # if the accuracy_* targets are data frames:
    list(accuracy),
    transform = combine(accuracy)
  ),
  final_accuracy = {
    # Mention the symbol "aggregated_accuracy" so final_accuracy runs last:
    aggregated_accuracy
    bestModel(models_metrics_uri = "/data1/", 
                                    metric_selected = "MAPE", 
                                    final_metrics_uri = "/data1/", 
                                    metrics_store = "local", 
                                    max_forecast_horizon = argument_parser$horizon)
  }
)

Screen Shot 2020-07-09 at 8 32 12 AM

@wlandau
Copy link
Member

wlandau commented Jul 9, 2020

Also, please have a look at https://books.ropensci.org/drake/plans.html#how-to-choose-good-targets. Targets are R objects that drake automatically saves and retrieves from storage, and it tracks changes to these values to keep targets up to date. If you are saving all your data to custom files, e.g. directory_out = "/data1/", then drake` does not know how to watch your results for changes, and it will not be able to automatically rerun targets at the right times. So in your case, I recommend either returning the fitted models themselves from the targets or using dynamic files. Dynamic files may be easier if you are willing to return the individual file paths of the models and accuracy metrics you save.

@wlandau wlandau closed this as completed Jul 9, 2020
@wlandau
Copy link
Member

wlandau commented Jul 9, 2020

There's also file_in() and file_out(), which tell drake to watch for changes in files known ahead of time, but dynamic files are probably easier to think about.

@edgBR
Copy link
Author

edgBR commented Jul 9, 2020

Hi @wlandau

The reason because I saved my models is because my workflow was crashing when I was storing the targets, but I do not know if this was normal.

@wlandau-lilly
Copy link
Collaborator

If you do decide to save models, I recommend format = "qs" because it is lighter in storage than drake's default save method.

Do you need to store the entire model object? I am not familiar with fable but in a lot of cases, you can save data frames of strategic summaries instead of entire model objects. Some fitted models are super large, and some models have pointers that are only valid in the current R session and cannot be reloaded in the a new session. (For example, Keras models cannot be saved and loaded with saveRDS() and readRDS(), so they require keras::save_model_hdf5() or foramt = "keras" in drake.)

The Bayesian analysis example here and here is an example of how to deal with these problems. Markov chain Monte Carlo generates a large number of posterior samples, and so it is unfeasible to save every single fitted model. So the custom functions in the workflow generate a data frame of posterior summaries instead of saving the entire model.

@edgBR
Copy link
Author

edgBR commented Jul 9, 2020

Hi @wlandau-lilly

I am using qs for saving the binaries:

saveModels <- function(models, directory_out, max_forecasting_horizon, max_multisession_cores) {
print("Saving the all-mighty mable")
qsave(x = models, file = paste0(directory_out, attributes(models)$model, "_horizon_", max_forecasting_horizon, ".qs"), 
      preset = "custom",
      shuffle_control = 15,
      algorithm = "zstd",
      nthreads = max_multisession_cores)
#saveRDS(object = models, file = paste0(directory_out, "ts_models_horizon_", max_forecasting_horizon, ".rds"))
print(paste0("End workflow for ", attributes(models)$model, " models with maximum forecasting horizon ", max_forecasting_horizon))
}

The problem is that fable needs the binary containing the model to make the forecast. Should I use the format="qs" directly in the drake plan with file_out?

BR
/E

@wlandau
Copy link
Member

wlandau commented Jul 10, 2020

So the physical model files need to be there? Nothing you can do about it?

I that case, maybe combine the model-fitting step and forecasting step into a single target. Data in the cache will be lighter that way. Merging two targets into one is a good strategy sometimes if you find yourself running too many targets or saving too much data. See https://books.ropensci.org/drake/plans.html#how-to-choose-good-targets for a discussion of the tradeoffs.

The example at https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan is a similar situation. In Bayesian analysis, posterior samples eat up a lot of data, and we don't want to save everything for every single model. So we combine model-fitting and summarization into a single step and return a one-line data frame for each model. See https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan/R/functions.R#L62-L85 and https://github.com/wlandau/drake-examples/blob/13e6edf9d6c4b60c0c57d0fc303cfba63702e9f2/stan/R/plan.R#L16-L20.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants