Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to run drake parallel workflow with future and fable #1277

Closed
edgBR opened this issue Jun 16, 2020 · 11 comments
Closed

Unable to run drake parallel workflow with future and fable #1277

edgBR opened this issue Jun 16, 2020 · 11 comments
Assignees

Comments

@edgBR
Copy link

edgBR commented Jun 16, 2020

Description

Dear colleagues I am trying to wrap my workflow for training 4273x9 fable models to forecast multiple time series. The fable package provide a nice interface to generate forecasts by groups and so far I have manage to run the workflow without any problem (despite of course of the high memory consumption and the huge model binary file [+15GBs].

I have wrap my code according to the example guidelines as follows:

library(lubridate)
library(odbc)
library(DBI)
library(ConfigParser)
library(tictoc)
library(drake)

#Configuration & initial parameters

directory <- getwd()
source(paste(directory, "/src/lib/r/functionX.R", sep = ""))
source(paste(directory, "/src/lib/r/functionY.R", sep = ""))
source(paste(directory, "/src/lib/r/functionZ.R", sep = ""))
config <- read.ini(paste(directory, "/config/configfile.ini", sep = ""))

#Workflor functions
source(paste(directory, "/src/forecasting/fable-backend/getting_data.R", sep = ""))
source(paste(directory, "/src/forecasting/fable-backend/building_features.R", sep = ""))
source(paste(directory, "/src/forecasting/fable-backend/train.R", sep = ""))
source(paste(directory, "/src/forecasting/fable-backend/drake_plan.R", sep = ""))
#source(paste(directory, "/src/forecasting/fable-backend/inference.R", sep = ""))


make(plan = plan, verbose = 2, 
     log_progress = TRUE,
     recover = TRUE)



vis_drake_graph(plan)

Where my plan is as follows:

plan <- drake_plan(
  life_counter_data = getLifeCounterData(environment = "PROD", key_directory = config$parameters1$parameter),
  unit_metadata = getMetadata("PROD", key_directory = config$parameters1$parameter1, 
                              operex_schema = config$parameters1$parameter2, source = c(1,2,3)),
  processed_data = featureEngineering(raw_data = life_counter_data, metadata = unit_metadata),
  fitting_models = trainModels(input_data = processed_data),
  saving = saveModels (models = fitting_models, directory_out = config$parameters1$parameter3 )
)

I am having the following issues:

  1. Whenever I register the future::plan(multiprocess) before making the plan and then I add paralellism="future". The workflow does not execute in parallel and I do not see more PIDs being generated whenever I run htop or top.
  2. If instead I add the multiprocess plan in the training code trainModels(), the training runs in parallel but I get the following error:
> make(plan = plan, verbose = 2, 
+      log_progress = TRUE,
+      recover = TRUE)
targets [--------------------------------------------------------------------------------------------------------------------------------------]   0%[1] "Fitting the beast"
Repacking large object
Error: target fitting_models failed.
diagnose(fitting_models)error$message:
  Detected an error (‘fatal error in wrapper code’) by the 'parallel' package while trying to retrieve the value of a MulticoreFuture (‘future_mapply-1’). This could be because the forked R process that evaluates the future was terminated before it was completed: ‘{; ...future.globals.maxSize.org <- getOption("future.globals.maxSize"); if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {; oopts <- options(future.globals.maxSize = ...future.globals.maxSize); on.exit(options(oopts), add = TRUE); }; ...; do.call(mapply, args = args); }; }’
diagnose(fitting_models)error$calls:
   1. └─global::trainModels(input_data = processed_data)
   2.   └─`%>%`(...) src/forecasting/fable-backend/train.R:14:2
   3.     ├─base::withVisible(eval(quote(`_fseq`(`_lhs`)), env, env))
   4.     └─base::eval(quote(`_fseq`(`_lhs`)), env, env)
   5.       └─base::eval(quote(`_fseq`(`
> 

The workflow works without any problems outside drake and I am just trying to figure it out why I am having this error when I have the future backend in the training code or why the paralellism argument is not working as expected.

I would very much appreciate any help.

BR
/Edgar

Session info

sessionInfo()
R version 4.0.0 (2020-04-24)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 16.04.6 LTS

Matrix products: default
BLAS: /usr/lib/atlas-base/atlas/libblas.so.3.0
LAPACK: /usr/lib/atlas-base/atlas/liblapack.so.3.0

locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8 LC_MONETARY=en_US.UTF-8
[6] LC_MESSAGES=en_US.UTF-8 LC_PAPER=en_US.UTF-8 LC_NAME=C LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats graphics grDevices utils datasets methods base

other attached packages:
[1] shiny_1.4.0.2 tidyselect_1.1.0 feasts_0.1.3 fable_0.2.0 fabletools_0.1.3 furrr_0.1.0 future_1.17.0
[8] imputeTS_3.0 timetk_1.0.0 tsibble_0.8.6 forcats_0.5.0 purrr_0.3.4 readr_1.3.1 tibble_3.0.1
[15] ggplot2_3.3.0 tidyverse_1.3.0 aws.s3_0.3.21 feather_0.3.5 tidyr_1.0.3 dplyr_0.8.5 xts_0.12-0
[22] zoo_1.8-8 DescTools_0.99.35 data.table_1.12.8 stringr_1.4.0 drake_7.12.0 tictoc_1.0 ConfigParser_1.0.0
[29] R6_2.4.1 ini_0.3.1 DBI_1.1.0 odbc_1.2.2 lubridate_1.7.8

loaded via a namespace (and not attached):
[1] readxl_1.3.1 backports_1.1.6 igraph_1.2.5 splines_4.0.0 storr_1.2.1 listenv_0.8.0
[7] usethis_1.6.1 digest_0.6.25 htmltools_0.4.0 fansi_0.4.1 memoise_1.1.0 magrittr_1.5
[13] base64url_1.4 remotes_2.1.1 aws.signature_0.5.2 recipes_0.1.12 globals_0.12.5 modelr_0.1.7
[19] gower_0.2.1 anytime_0.3.7 forecast_8.12 tseries_0.10-47 prettyunits_1.1.1 colorspace_1.4-1
[25] blob_1.2.1 rvest_0.3.5 xfun_0.13 haven_2.2.0 callr_3.4.3 crayon_1.3.4
[31] jsonlite_1.6.1 survival_3.1-12 glue_1.4.1 gtable_0.3.0 ipred_0.9-9 pkgbuild_1.0.8
[37] clipr_0.7.0 future.apply_1.5.0 quantmod_0.4.17 scales_1.1.0 stinepack_1.4 mvtnorm_1.1-0
[43] miniUI_0.1.1.1 Rcpp_1.0.4.6 xtable_1.8-4 progress_1.2.2 bit_1.1-15.2 txtq_0.2.0
[49] lava_1.6.7 prodlim_2019.11.13 htmlwidgets_1.5.1 httr_1.4.1 ellipsis_0.3.0 pkgconfig_2.0.3
[55] nnet_7.3-14 dbplyr_1.4.3 rlang_0.4.6 later_1.0.0 munsell_0.5.0 cellranger_1.1.0
[61] tools_4.0.0 visNetwork_2.0.9 cli_2.0.2 generics_0.0.2 devtools_2.3.0 broom_0.5.6
[67] evaluate_0.14 fastmap_1.0.1 yaml_2.2.1 knitr_1.28 processx_3.4.2 bit64_0.9-7
[73] fs_1.4.1 packrat_0.5.0 nlme_3.1-147 whisker_0.4 mime_0.9 xml2_1.3.2
[79] compiler_4.0.0 rstudioapi_0.11 filelock_1.0.2 curl_4.3 testthat_2.3.2 reprex_0.3.0
[85] stringi_1.4.6 ps_1.3.3 desc_1.2.0 lattice_0.20-41 Matrix_1.2-18 urca_1.3-0
[91] vctrs_0.3.0 pillar_1.4.4 aws.ec2metadata_0.2.0 lifecycle_0.2.0 lmtest_0.9-37 httpuv_1.5.2
[97] promises_1.1.0 sessioninfo_1.1.1 codetools_0.2-16 pkgload_1.0.2 boot_1.3-25 MASS_7.3-51.6
[103] assertthat_0.2.1 rprojroot_1.3-2 withr_2.2.0 fracdiff_1.5-1 expm_0.999-4 parallel_4.0.0
[109] hms_0.5.3 quadprog_1.5-8 grid_4.0.0 rpart_4.1-15 timeDate_3043.102 class_7.3-17
[115] rmarkdown_2.1 TTR_0.23-6 base64enc_0.1-3

@wlandau
Copy link
Member

wlandau commented Jun 16, 2020

For (1), did you set the jobs argument of make() to a value greater than 1? (Asking because I did not see it in the code you posted.) For (2), I am betting this is an instance of #675, in which case the error will probably go away if you set lock_envir to FALSE.

@edgBR
Copy link
Author

edgBR commented Jun 17, 2020

Hi wlandau,

Yes, I use the jobs arguments as well. I will try with lock envir FALSE. Should I also sync the jobs number with the plan argument?

BR
/Edgar

@edgBR
Copy link
Author

edgBR commented Jun 17, 2020

Dear @wlandau

Now I am doing as follows:

make(plan = plan, verbose = 2, 
     log_progress = TRUE,
     recover = TRUE,
     lock_envir = FALSE,
     parallelism = "future",
     jobs = 24)

But still I am register the future backend in the training code and not before the make plan. Now it works but I get the following error:

r Error: cannot allocate vector of size 57.1 GBs
Interesting because I still have 150Gbs of memory free.

BR
/Edgar

@wlandau
Copy link
Member

wlandau commented Jun 17, 2020

I think we might be getting the two approaches mixed up. If you set jobs > 1, that assumes you already set future::plan(multiprocess) before make().

That's a strange memory error. Does it persist without future? Would you be willing to create a smaller example that reproduces it and post the code? I'm not sure what to do with that without running it myself.

@wlandau
Copy link
Member

wlandau commented Jun 17, 2020

Do you have a traceback for the memory error?

@edgBR
Copy link
Author

edgBR commented Jun 17, 2020

Dear @wlandau

I run again the workflow and it works but now I am re-running it following your recommendation:

options(future.globals.maxSize = 1500000000)
future::plan(multiprocess) #breaking infrastructure once again ;)
set.seed(666) # reproducibility


make(plan = plan, verbose = 2, 
     log_progress = TRUE,
     recover = TRUE,
     lock_envir = FALSE,
     parallelism = "future",
     jobs = 22)



vis_drake_graph(plan)

But the code stops to run on paralell.

BR
/Edgar

@wlandau
Copy link
Member

wlandau commented Jun 17, 2020

But the code stops to run on parallel.

You mean the targets run without errors, but sequentially? If you are running in the RStudio IDE, multicore might just be disabled, in which case you could try future::plan(multisession). Maybe also set workers = 22, e.g. future::plan(multisession, workers = 22) just to confirm that future is not capping the number of workers.

@edgBR
Copy link
Author

edgBR commented Jun 17, 2020

Hi,

The training part of my drake plan is using the fable package to fit multiple models at a time to a tsibble.

If the different steps of the workflow run sequentially that is not a problem. The problem is that one of the steps:

fitting_models = trainModels(input_data = processed_data),

Runs only in parallel if I register the future backend in the trainModels function.

BR
/Edgar

@wlandau
Copy link
Member

wlandau commented Jun 17, 2020

With make(parallelism = "future", jobs = 22), drake tries to run different targets in different processes: for example, life_counter_data will run at the same time as unit_metadata. The new process that spins up for life_counter_data has its own global environment which does not know about the future::plan() you defined before calling make(). So yes, if you want parallelism within targets, that requires setting a future::plan() inside trainModels() or another such function. make(parallelism = "future") only applies to parallelism among targets. future::tweak() might allow you to do both up front, but I do not have direct experience with this.

@edgBR
Copy link
Author

edgBR commented Jun 17, 2020

Dear @wlandau

Now it is cristal clear for me. I am completely new at drake (I literally started with the package 2days ago) so I am not fully familiar with all of the technicalities of the package.

I believe that we can close the topics.

Thank you so much for the support

BR
/E

@wlandau
Copy link
Member

wlandau commented Jun 17, 2020

Glad to hear it.

@wlandau wlandau closed this as completed Jun 17, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants