Skip to content

Commit

Permalink
Rightsize import batches [VS-486] (#7925)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr authored Jul 6, 2022
1 parent 874d615 commit 3e60f0d
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 16 deletions.
1 change: 1 addition & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ workflows:
- master
- ah_var_store
- rc-vs-483-beta-user-wdl
- vs_486_rightsize_import_batches
- name: GvsCalculatePrecisionAndSensitivity
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsCalculatePrecisionAndSensitivity.wdl
Expand Down
47 changes: 37 additions & 10 deletions scripts/variantstore/wdl/GvsImportGenomes.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,46 @@ workflow GvsImportGenomes {
Boolean skip_loading_vqsr_fields = false

File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list"
# If increasing this, also consider increasing `load_data_preemptible_override` and `load_data_maxretries_override`.
Int load_data_batch_size = 5
Int? load_data_batch_size
Int? load_data_preemptible_override
Int? load_data_maxretries_override
File? load_data_gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/gg_VS-443_VETIngestValidation_20220531/gatk-package-4.2.0.0-531-gf8f4ede-SNAPSHOT-local.jar"
String? service_account_json_path
}

Int num_samples = length(external_sample_names)
Int max_auto_batch_size = 20000

if ((num_samples > max_auto_batch_size) && !(defined(load_data_batch_size))) {
call Utils.TerminateWorkflow as DieDueToTooManySamplesWithoutExplicitLoadDataBatchSize {
input:
message = "Importing " + num_samples + " samples but `load_data_batch_size` not explicitly specified; limit for auto batch-sizing is " + max_auto_batch_size + " samples."
}
}

# At least 1, per limits above not more than 20.
Int effective_load_data_batch_size = if (defined(load_data_batch_size)) then select_first([load_data_batch_size])
else if num_samples < 1000 then 1
else num_samples / 1000

# Both preemptible and maxretries should be scaled up alongside import batch size since the likelihood of preemptions
# and retryable random BQ import errors increases with import batch size / job run time.
# At least 3, per limits above not more than 5.
Int effective_load_data_preemptible = if (defined(load_data_preemptible_override)) then select_first([load_data_preemptible_override])
else if effective_load_data_batch_size < 12 then 3
else effective_load_data_batch_size / 4

# At least 3, per limits above not more than 5.
Int effective_load_data_maxretries = if (defined(load_data_maxretries_override)) then select_first([load_data_maxretries_override])
else if (effective_load_data_batch_size < 12) then 3
else effective_load_data_batch_size / 4

# return an error if the lengths are not equal
Int input_length = length(input_vcfs)
Int input_indexes_length = length(input_vcf_indexes)
if ((input_length != length(external_sample_names)) || (input_indexes_length != length(external_sample_names))) {
call Utils.TerminateWorkflow {
call Utils.TerminateWorkflow as DieDueToMismatchedVcfAndIndexLengths {
input:
message = "The lengths of workflow inputs `external_sample_names` (" + length(external_sample_names) +
"), `input_vcfs` (" + input_length + ") and `input_vcf_indexes` (" + input_indexes_length + ") should be the same.\n\n" +
Expand Down Expand Up @@ -58,7 +85,7 @@ workflow GvsImportGenomes {

call CreateFOFNs {
input:
batch_size = load_data_batch_size,
batch_size = effective_load_data_batch_size,
input_vcf_index_list = CurateInputLists.input_vcf_indexes,
input_vcf_list = CurateInputLists.input_vcfs,
sample_name_list = CurateInputLists.sample_name_list,
Expand All @@ -76,8 +103,8 @@ workflow GvsImportGenomes {
input_vcfs = read_lines(CreateFOFNs.vcf_batch_vcf_fofns[i]),
interval_list = interval_list,
gatk_override = load_data_gatk_override,
load_data_preemptible_override = load_data_preemptible_override,
load_data_maxretries_override = load_data_maxretries_override,
load_data_preemptible = effective_load_data_preemptible,
load_data_maxretries = effective_load_data_maxretries,
sample_names = read_lines(CreateFOFNs.vcf_sample_name_fofns[i]),
sample_map = GetUningestedSampleIds.sample_map,
service_account_json_path = service_account_json_path,
Expand Down Expand Up @@ -149,8 +176,8 @@ task LoadData {
Boolean skip_loading_vqsr_fields = false

File? gatk_override
Int? load_data_preemptible_override
Int? load_data_maxretries_override
Int load_data_preemptible
Int load_data_maxretries
String? service_account_json_path
}

Expand Down Expand Up @@ -224,10 +251,10 @@ task LoadData {
>>>
runtime {
docker: "us.gcr.io/broad-gatk/gatk:4.1.7.0"
maxRetries: select_first([load_data_maxretries_override, 3])
maxRetries: load_data_maxretries
memory: "3.75 GB"
disks: "local-disk 50 HDD"
preemptible: select_first([load_data_preemptible_override, 5])
preemptible: load_data_preemptible
cpu: 1
}
output {
Expand Down
3 changes: 2 additions & 1 deletion scripts/variantstore/wdl/GvsJointVariantCalling.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ workflow GvsJointVariantCalling {
Int extract_maxretries_override = ""
Int extract_preemptible_override = ""
Int extract_scatter_count = ""
Int load_data_batch_size = ""
Int load_data_preemptible_override = ""
Int load_data_maxretries_override = ""
Array[String] query_labels = []
Expand Down Expand Up @@ -64,7 +65,7 @@ workflow GvsJointVariantCalling {
indel_recalibration_annotation_values = indel_recalibration_annotation_values,
interval_list = interval_list,
interval_weights_bed = interval_weights_bed,
load_data_batch_size = 5,
load_data_batch_size = load_data_batch_size,
load_data_maxretries_override = load_data_maxretries_override,
load_data_preemptible_override = load_data_preemptible_override,
query_labels = query_labels,
Expand Down
3 changes: 0 additions & 3 deletions scripts/variantstore/wdl/GvsQuickstartIntegration.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ workflow GvsQuickstartIntegration {
]

Int? extract_scatter_count

Int load_data_batch_size = 1
}
String project_id = "gvs-internal"

Expand All @@ -75,7 +73,6 @@ workflow GvsQuickstartIntegration {
# Force filtering off as it is not deterministic and the initial version of this integration test does not
# allow for inexact matching of actual and expected results.
extract_do_not_filter_override = true,
load_data_batch_size = load_data_batch_size,
}

call AssertIdenticalOutputs {
Expand Down
6 changes: 4 additions & 2 deletions scripts/variantstore/wdl/GvsUnified.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ workflow GvsUnified {
Array[File] input_vcf_indexes
File interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list"


# The larger the `load_data_batch_size` the greater the probability of preemptions and non-retryable
# BigQuery errors. So if increasing the batch size, then preemptible and maxretries should also be increased.
Int load_data_batch_size = 5
# BigQuery errors so if specifying this adjust preemptible and maxretries accordingly. Or just take the defaults,
# those should work fine in most cases.
Int? load_data_batch_size
Int? load_data_preemptible_override
Int? load_data_maxretries_override
# End GvsImportGenomes
Expand Down

0 comments on commit 3e60f0d

Please sign in to comment.