Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combining tibbles that come from dynamic and transform maps #1327

Closed
edgBR opened this issue Oct 1, 2020 · 1 comment
Closed

Combining tibbles that come from dynamic and transform maps #1327

edgBR opened this issue Oct 1, 2020 · 1 comment

Comments

@edgBR
Copy link

edgBR commented Oct 1, 2020

Prework

Hi again drake colleagues. It seems that now my implementation is working and now I can train, evaluated properly in train and test data and save my models and accuracy metrics in s3! Prework here:

Original workflow: #1293
Modified workflow to correct the DAG: #1294
First attemp to rewrite the plan using dynamic targets: #1311
Linking dynamic targets: #1314
Linking multiple dynamic targets to a target: #1321

Description

The small beast now looks like this:

image

And my plan looks as follows:

new_plan_dynamic_branch_test <- drake_plan(
  unitMetadata = getMetadata(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1, 2, 3)
  ),
  lastData = getLastData(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  finalMetadata = getApplicablePackages(
    unit_metadata = unitMetadata, 
    units_with_recent_data = lastData
  ),
  counterCombination = getCountersComb(
    df_in = finalMetadata, 
    environment = "PROD", 
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  counterCombinationIndex = counterCombination %>%  group_indices(),
  getFullData = getLifeFullData(
    environment = "PROD",
    key_directory = config_parameters$RSTUDIO_CLOUD_CONF$KeyDir,
    max_forecasting_horizon = argument_parser$horizon,
    list_of_machines = counterCombination$machine %>% unique()
  ),
  getIndividualData = target(
    getIndividualCounterData(
      full_data = getFullData,
      combination_df = counterCombination, 
      combination_df_index = counterCombinationIndex
    ),
    dynamic = cross( # Use `dynamic =` instcounterCombination %>% group_indices()ead of `transform =`
      counterCombinationIndex # no tidy evaluation needed for dynamic branching
    )
  ),
  processingData = target(
    featureEngineering(
      raw_data = getIndividualData,
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(getIndividualData)
  ),
  training_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "train"
    ),
    dynamic = map(processingData)
  ),
  testing_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "test"
    ),
    dynamic = map(processingData)
  ),
  models_training = target( ### split train/validation set
    trainModels(
      input_data = training_data,
      max_forecast_horizon = argument_parser$horizon, 
      model_type = model_type,
      max_multisession_cores = argument_parser$sessions
    ),
    dynamic = map(training_data), # dynamic branching
    transform = map(model_type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "test",
      models = models_training,
      max_forecast_horizon = argument_parser$horizon,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_lifecounter3",
      testing_data = testing_data
    ), 
    dynamic = map(models_training, testing_data),
    transform = map(models_training)
  ),
  saveModels = target(
    saveModelsS3(
      model = models_training,
      bucket = argument_parser$outputbucket, 
      bucket_folder = "/test_3", 
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(models_training),
    transform = map(model_type)
  )

Desired result

Now my objective was to evaluate the best model for an specific counter combination as I call and then refit my model to the whole historical data accordingly. The function that I wrote to do this is as follow:

bestModel <- function(input_data, bucket, bucket_folder, max_forecasting_horizon) {
  
  snsr <- unique(input_data$snsr)
  machine <- unique(input_data$machine)
  db_src <- unique(input_data$db_src)
  print(paste0("Refitting best model for ", machine, 
               " db_src ", db_src, " and counter ", 
               snsr, " and horizon ", max_forecasting_horizon))
  
  bucket_objects <- get_bucket(bucket = bucket, prefix = paste0(bucket_folder, "/", 
                                                                package_b_number, "/horizon_", 
                                                                max_forecasting_horizon, "/", 
                                                                "snsr_", snsr, "/",
                                                                "db_src_", db_src))
  accuracy_log_names <- c()
  for(i in 1:length(bucket_objects)) {
    accuracy_log_names <- c(
      accuracy_log_names, 
      bucket_objects[[i]]$Key)

  }
  
  model_metrics_paths <- bucket_objects_df %>% filter(grepl('.csv', value)) 
  model_binaries_paths <- bucket_objects_df %>% filter(!grepl('.csv', value))
  
  accuracy_log_values <- c()
  
  for (i in 1:length(model_metrics_paths$value)){

    accuracy_log_values[[i]] <- s3read_using(data.table::fread, 
                                          bucket = bucket, 
                                          object = model_metrics_paths$value[i]) 
                             
                           
  }
  
  accuracy_log_values <- accuracy_log_values %>% bind_rows() %>% drop_na(MAPE)
  bestModel <- accuracy_log_values  %>% top_n(1, wt = desc(MAPE))  
  print(paste0("Best model for ", machine, " and snsr_key ", 
               snsr, " and ", db_src, " and db_src ", 
               db_src, " is ", bestModel$.model))
  return(bestModel$.model)
  
}

The problem is that when I add this to my plan I was doing as follows:

new_plan_dynamic_branch_test <- drake_plan(
  unitMetadata = getMetadata(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    operex_schema = config_parameters$SF_CONFIG$schema_name, db_src = c(1, 2, 3)
  ),
  lastData = getLastData(
    environment = "PROD",
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio,
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  finalMetadata = getApplicablePackages(
    unit_metadata = unitMetadata, 
    units_with_recent_data = lastData
  ),
  counterCombination = getCountersComb(
    df_in = finalMetadata, 
    environment = "PROD", 
    key_directory = config_parameters$LOCAL_CONFIG$DirectoryKeyCloud_RStudio, 
    schema = config_parameters$SF_CONFIG$schema_name
  ),
  counterCombinationIndex = counterCombination %>%  group_indices(),
  getFullData = getLifeFullData(
    environment = "PROD",
    key_directory = config_parameters$RSTUDIO_CLOUD_CONF$KeyDir,
    max_forecasting_horizon = argument_parser$horizon,
    list_of_machines = counterCombination$machine %>% unique()
  ),
  getIndividualData = target(
    getIndividualCounterData(
      full_data = getFullData,
      combination_df = counterCombination, 
      combination_df_index = counterCombinationIndex
    ),
    dynamic = cross( # Use `dynamic =` instcounterCombination %>% group_indices()ead of `transform =`
      counterCombinationIndex # no tidy evaluation needed for dynamic branching
    )
  ),
  processingData = target(
    featureEngineering(
      raw_data = getIndividualData,
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(getIndividualData)
  ),
  training_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "train"
    ),
    dynamic = map(processingData)
  ),
  testing_data = target(
    timeSeriesSplitter(
      input_data = processingData,
      max_forecast_horizon = argument_parser$horizon,
      type = "test"
    ),
    dynamic = map(processingData)
  ),
  models_training = target( ### split train/validation set
    trainModels(
      input_data = training_data,
      max_forecast_horizon = argument_parser$horizon, 
      model_type = model_type,
      max_multisession_cores = argument_parser$sessions
    ),
    dynamic = map(training_data), # dynamic branching
    transform = map(model_type = !!model_types)
  ),
  accuracy = target(
    accuracy_explorer(
      mode = "test",
      models = models_training,
      max_forecast_horizon = argument_parser$horizon,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_lifecounter3",
      testing_data = testing_data
    ), 
    dynamic = map(models_training, testing_data),
    transform = map(models_training)
  ),
  saveModels = target(
    saveModelsS3(
      model = models_training,
      bucket = argument_parser$outputbucket, 
      bucket_folder = "/test_3", 
      max_forecasting_horizon = argument_parser$horizon
    ),
    dynamic = map(models_training),
    transform = map(model_type)
  ),
 GetBestModel = target(
    bestModel(
      input_df = accuracy,
      bucket = argument_parser$outputbucket,
      bucket_folder = "/test_3",
      max_forecasting_horizon = argument_parser$horizon
     )
)
)

Clearly not the right approach as the target does not happen after saveModels:

image

My idea now is to return a dataframe in accuracy. I have modified the function as follows:

accuracy_explorer <- function(mode, models, max_forecast_horizon, bucket, bucket_folder, testing_data=NULL) {
  if(mode == "train") {
   ..................
  } else if(mode=="test"){
   ...................
  }
  s3write_using(accuracy_metrics, 
                write_csv,
                object = paste0(bucket_folder, 
                                "/",
                                unique(accuracy_metrics$machine),
                                "/horizon_",
                                max_forecast_horizon,
                                "/snsr_",
                                unique(accuracy_metrics$snsr),
                                "/db_src_",
                                unique(accuracy_metrics$db_src),
                                "/",
                                unique(accuracy_metrics$.model), 
                                "_accuracy_metrics.csv"), 
                bucket = bucket)
  return(accuracy_metrics)
}

So the idea is to now join all of the subtargets tibbles in one but honestly I am a bit lost of how to do this. I looked into #685 but I dont know if this is applicable for my case as I have dynamic and static branching mixed together.

Any suggestions?

BR
/Edgar

@wlandau
Copy link
Member

wlandau commented Oct 1, 2020

tibbles are great data structures for pipelines. For a single dynamic target, all the sub-targets are automatically combined.

library(drake)
library(tibble)
plan <- drake_plan(
  index = seq_len(3),
  data1 = target(tibble(x = index), dynamic = map(index)),
  data2 = data1
)

make(plan)
#> ▶ target index
#> ▶ dynamic data1
#> > subtarget data1_0b3474bd
#> > subtarget data1_b2a5c9b8
#> > subtarget data1_71f311ad
#> ■ finalize data1
#> ▶ target data2

# Sub-targets automatically combined in exploratory data analysis.
readd(data1)
#> # A tibble: 3 x 1
#>       x
#>   <int>
#> 1     1
#> 2     2
#> 3     3

# Sub-targets automatically combined in downstream targets.
readd(data2)
#> # A tibble: 3 x 1
#>       x
#>   <int>
#> 1     1
#> 2     2
#> 3     3

Created on 2020-10-01 by the reprex package (v0.3.0)

For dynamic branching within static branching, you can use combine() to bring everything together.

library(drake)
library(tibble)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
plan <- drake_plan(
  index_dynamic = seq_len(2),
  data_mapped = target(
    tibble(static = index_static, dynamic = index_dynamic),
    transform = map(index_static = c(1, 2)),
    dynamic = map(index_dynamic)
  ),
  data_combined = target(
    bind_rows(data_mapped),
    transform = combine(data_mapped)
  )
)

plot(plan)

make(plan)
#> ▶ target index_dynamic
#> ▶ dynamic data_mapped_1
#> > subtarget data_mapped_1_0b3474bd
#> > subtarget data_mapped_1_b2a5c9b8
#> ■ finalize data_mapped_1
#> ▶ dynamic data_mapped_2
#> > subtarget data_mapped_2_0b3474bd
#> > subtarget data_mapped_2_b2a5c9b8
#> ■ finalize data_mapped_2
#> ▶ target data_combined

readd(data_combined)
#> # A tibble: 4 x 2
#>   static dynamic
#>    <dbl>   <int>
#> 1      1       1
#> 2      1       2
#> 3      2       1
#> 4      2       2

Created on 2020-10-01 by the reprex package (v0.3.0)

Comments:

  • In future issues, would you simplify the examples you post? A good reproducible example is minimal, only containing the details relevant to the problem you are trying to solve: https://www.tidyverse.org/help/. Maintainers of open source packages generally do not have time to dive into all the intricacies of a someone's particular use case.
  • That said, I did notice you upload a lot of data to Amazon S3. So for future projects, you might have a look at targets, the long-term successor of drake. drake is never going away, but it is also never going to have built-in cloud support. But targets can seamlessly upload and track target return values on Amazon S3: https://wlandau.github.io/targets-manual/cloud.html

@wlandau wlandau closed this as completed Oct 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants