From 3e60f0d4723acd97216c64a48769e81980baf7b1 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Wed, 6 Jul 2022 17:29:52 -0400 Subject: [PATCH] Rightsize import batches [VS-486] (#7925) --- .dockstore.yml | 1 + scripts/variantstore/wdl/GvsImportGenomes.wdl | 47 +++++++++++++++---- .../wdl/GvsJointVariantCalling.wdl | 3 +- .../wdl/GvsQuickstartIntegration.wdl | 3 -- scripts/variantstore/wdl/GvsUnified.wdl | 6 ++- 5 files changed, 44 insertions(+), 16 deletions(-) diff --git a/.dockstore.yml b/.dockstore.yml index 664cb2c493e..d6b3e3909b0 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -198,6 +198,7 @@ workflows: - master - ah_var_store - rc-vs-483-beta-user-wdl + - vs_486_rightsize_import_batches - name: GvsCalculatePrecisionAndSensitivity subclass: WDL primaryDescriptorPath: /scripts/variantstore/wdl/GvsCalculatePrecisionAndSensitivity.wdl diff --git a/scripts/variantstore/wdl/GvsImportGenomes.wdl b/scripts/variantstore/wdl/GvsImportGenomes.wdl index ae8f074d09e..6e766d3f101 100644 --- a/scripts/variantstore/wdl/GvsImportGenomes.wdl +++ b/scripts/variantstore/wdl/GvsImportGenomes.wdl @@ -16,19 +16,46 @@ workflow GvsImportGenomes { Boolean skip_loading_vqsr_fields = false File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list" - # If increasing this, also consider increasing `load_data_preemptible_override` and `load_data_maxretries_override`. - Int load_data_batch_size = 5 + Int? load_data_batch_size Int? load_data_preemptible_override Int? load_data_maxretries_override File? load_data_gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/gg_VS-443_VETIngestValidation_20220531/gatk-package-4.2.0.0-531-gf8f4ede-SNAPSHOT-local.jar" String? service_account_json_path } + Int num_samples = length(external_sample_names) + Int max_auto_batch_size = 20000 + + if ((num_samples > max_auto_batch_size) && !(defined(load_data_batch_size))) { + call Utils.TerminateWorkflow as DieDueToTooManySamplesWithoutExplicitLoadDataBatchSize { + input: + message = "Importing " + num_samples + " samples but `load_data_batch_size` not explicitly specified; limit for auto batch-sizing is " + max_auto_batch_size + " samples." + } + } + + # At least 1, per limits above not more than 20. + Int effective_load_data_batch_size = if (defined(load_data_batch_size)) then select_first([load_data_batch_size]) + else if num_samples < 1000 then 1 + else num_samples / 1000 + + # Both preemptible and maxretries should be scaled up alongside import batch size since the likelihood of preemptions + # and retryable random BQ import errors increases with import batch size / job run time. + + # At least 3, per limits above not more than 5. + Int effective_load_data_preemptible = if (defined(load_data_preemptible_override)) then select_first([load_data_preemptible_override]) + else if effective_load_data_batch_size < 12 then 3 + else effective_load_data_batch_size / 4 + + # At least 3, per limits above not more than 5. + Int effective_load_data_maxretries = if (defined(load_data_maxretries_override)) then select_first([load_data_maxretries_override]) + else if (effective_load_data_batch_size < 12) then 3 + else effective_load_data_batch_size / 4 + # return an error if the lengths are not equal Int input_length = length(input_vcfs) Int input_indexes_length = length(input_vcf_indexes) if ((input_length != length(external_sample_names)) || (input_indexes_length != length(external_sample_names))) { - call Utils.TerminateWorkflow { + call Utils.TerminateWorkflow as DieDueToMismatchedVcfAndIndexLengths { input: message = "The lengths of workflow inputs `external_sample_names` (" + length(external_sample_names) + "), `input_vcfs` (" + input_length + ") and `input_vcf_indexes` (" + input_indexes_length + ") should be the same.\n\n" + @@ -58,7 +85,7 @@ workflow GvsImportGenomes { call CreateFOFNs { input: - batch_size = load_data_batch_size, + batch_size = effective_load_data_batch_size, input_vcf_index_list = CurateInputLists.input_vcf_indexes, input_vcf_list = CurateInputLists.input_vcfs, sample_name_list = CurateInputLists.sample_name_list, @@ -76,8 +103,8 @@ workflow GvsImportGenomes { input_vcfs = read_lines(CreateFOFNs.vcf_batch_vcf_fofns[i]), interval_list = interval_list, gatk_override = load_data_gatk_override, - load_data_preemptible_override = load_data_preemptible_override, - load_data_maxretries_override = load_data_maxretries_override, + load_data_preemptible = effective_load_data_preemptible, + load_data_maxretries = effective_load_data_maxretries, sample_names = read_lines(CreateFOFNs.vcf_sample_name_fofns[i]), sample_map = GetUningestedSampleIds.sample_map, service_account_json_path = service_account_json_path, @@ -149,8 +176,8 @@ task LoadData { Boolean skip_loading_vqsr_fields = false File? gatk_override - Int? load_data_preemptible_override - Int? load_data_maxretries_override + Int load_data_preemptible + Int load_data_maxretries String? service_account_json_path } @@ -224,10 +251,10 @@ task LoadData { >>> runtime { docker: "us.gcr.io/broad-gatk/gatk:4.1.7.0" - maxRetries: select_first([load_data_maxretries_override, 3]) + maxRetries: load_data_maxretries memory: "3.75 GB" disks: "local-disk 50 HDD" - preemptible: select_first([load_data_preemptible_override, 5]) + preemptible: load_data_preemptible cpu: 1 } output { diff --git a/scripts/variantstore/wdl/GvsJointVariantCalling.wdl b/scripts/variantstore/wdl/GvsJointVariantCalling.wdl index 346f06cb02c..24cec5a7ec0 100644 --- a/scripts/variantstore/wdl/GvsJointVariantCalling.wdl +++ b/scripts/variantstore/wdl/GvsJointVariantCalling.wdl @@ -22,6 +22,7 @@ workflow GvsJointVariantCalling { Int extract_maxretries_override = "" Int extract_preemptible_override = "" Int extract_scatter_count = "" + Int load_data_batch_size = "" Int load_data_preemptible_override = "" Int load_data_maxretries_override = "" Array[String] query_labels = [] @@ -64,7 +65,7 @@ workflow GvsJointVariantCalling { indel_recalibration_annotation_values = indel_recalibration_annotation_values, interval_list = interval_list, interval_weights_bed = interval_weights_bed, - load_data_batch_size = 5, + load_data_batch_size = load_data_batch_size, load_data_maxretries_override = load_data_maxretries_override, load_data_preemptible_override = load_data_preemptible_override, query_labels = query_labels, diff --git a/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl b/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl index 707e51ee1db..23832777f7f 100644 --- a/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl +++ b/scripts/variantstore/wdl/GvsQuickstartIntegration.wdl @@ -49,8 +49,6 @@ workflow GvsQuickstartIntegration { ] Int? extract_scatter_count - - Int load_data_batch_size = 1 } String project_id = "gvs-internal" @@ -75,7 +73,6 @@ workflow GvsQuickstartIntegration { # Force filtering off as it is not deterministic and the initial version of this integration test does not # allow for inexact matching of actual and expected results. extract_do_not_filter_override = true, - load_data_batch_size = load_data_batch_size, } call AssertIdenticalOutputs { diff --git a/scripts/variantstore/wdl/GvsUnified.wdl b/scripts/variantstore/wdl/GvsUnified.wdl index 1da148705f5..3a45b710d2a 100644 --- a/scripts/variantstore/wdl/GvsUnified.wdl +++ b/scripts/variantstore/wdl/GvsUnified.wdl @@ -26,9 +26,11 @@ workflow GvsUnified { Array[File] input_vcf_indexes File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list" + # The larger the `load_data_batch_size` the greater the probability of preemptions and non-retryable - # BigQuery errors. So if increasing the batch size, then preemptible and maxretries should also be increased. - Int load_data_batch_size = 5 + # BigQuery errors so if specifying this adjust preemptible and maxretries accordingly. Or just take the defaults, + # those should work fine in most cases. + Int? load_data_batch_size Int? load_data_preemptible_override Int? load_data_maxretries_override # End GvsImportGenomes