Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag for cost_observability table writing to support sub-cohort use case [VS-521] #8093

Merged
merged 16 commits into from
Nov 21, 2022
Merged
2 changes: 2 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_521_subcohort_cost
- name: GvsImportGenomes
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsImportGenomes.wdl
Expand Down Expand Up @@ -166,6 +167,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_vs_521_subcohort_cost
- name: GvsWithdrawSamples
subclass: WDL
primaryDescriptorPath: /scripts/variantstore/wdl/GvsWithdrawSamples.wdl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ task Add_AS_MAX_VQSLOD_ToVcf {
File input_vcf
String output_basename

String docker = "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
String docker = "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
Int cpu = 1
Int memory_mb = 3500
Int disk_size_gb = ceil(2*size(input_vcf, "GiB")) + 50
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsCallsetCost.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ task WorkflowComputeCosts {
>>>

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
}

output {
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsCreateVAT.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ task MakeSubpopulationFilesAndReadSchemaFiles {
# ------------------------------------------------
# Runtime settings:
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
memory: "1 GB"
preemptible: 3
cpu: "1"
Expand Down
4 changes: 2 additions & 2 deletions scripts/variantstore/wdl/GvsCreateVATAnnotations.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ task ExtractAnAcAfFromVCF {
# ------------------------------------------------
# Runtime settings:
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
maxRetries: 3
memory: "16 GB"
preemptible: 3
Expand Down Expand Up @@ -291,7 +291,7 @@ task PrepAnnotationJson {
# ------------------------------------------------
# Runtime settings:
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
memory: "8 GB"
preemptible: 5
cpu: "1"
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ task GenerateHailScripts {
File hail_create_vat_inputs_script = 'hail_create_vat_inputs.py'
}
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
disks: "local-disk 500 HDD"
}
}
8 changes: 6 additions & 2 deletions scripts/variantstore/wdl/GvsExtractCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ workflow GvsExtractCallset {
Int? split_intervals_mem_override
Float x_bed_weight_scaling = 4
Float y_bed_weight_scaling = 4
Boolean write_cost_to_db = true
}

File reference = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta"
Expand Down Expand Up @@ -165,6 +166,7 @@ workflow GvsExtractCallset {
extract_maxretries_override = extract_maxretries_override,
emit_pls = emit_pls,
emit_ads = emit_ads,
write_cost_to_db = write_cost_to_db
}
}

Expand Down Expand Up @@ -241,6 +243,7 @@ task ExtractTask {
String fq_filter_set_site_table
String fq_filter_set_tranches_table
String? filter_set_name
Boolean write_cost_to_db

# Runtime Options:
File? gatk_override
Expand All @@ -257,6 +260,7 @@ task ExtractTask {
}

String intervals_name = basename(intervals)
String cost_observability_line = if (write_cost_to_db == true) then "--cost-observability-tablename ~{cost_observability_tablename}" else ""

command <<<
set -e
Expand Down Expand Up @@ -289,11 +293,11 @@ task ExtractTask {
~{true='--emit-ads' false='' emit_ads} \
${FILTERING_ARGS} \
--dataset-id ~{dataset_id} \
--cost-observability-tablename ~{cost_observability_tablename} \
--call-set-identifier ~{call_set_identifier} \
--wdl-step GvsCreateCallset \
--wdl-call ExtractTask \
--shard-identifier ~{intervals_name}
--shard-identifier ~{intervals_name} \
~{cost_observability_line}

# Drop trailing slash if one exists
OUTPUT_GCS_DIR=$(echo ~{output_gcs_dir} | sed 's/\/$//')
Expand Down
11 changes: 8 additions & 3 deletions scripts/variantstore/wdl/GvsExtractCohortFromSampleNames.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ workflow GvsExtractCohortFromSampleNames {
Int? split_intervals_disk_size_override
Int? split_intervals_mem_override

File? gatk_override
File gatk_override = "gs://gvs-internal-scratch/rsa/gatk-package-4.2.0.0-624-ga2bb04e-SNAPSHOT-local.jar"

}

Boolean write_cost_to_db = if ((gvs_project != destination_project_id) || (gvs_project != query_project)) then false else true
Copy link
Collaborator

@mcovarr mcovarr Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOL can this be simplified (actual question, I don't know)?

Suggested change
Boolean write_cost_to_db = if ((gvs_project != destination_project_id) || (gvs_project != query_project)) then false else true
Boolean write_cost_to_db = (gvs_project == destination_project_id) && (gvs_project == query_project)


# writing the array to a file has to be done in a task
# https://support.terra.bio/hc/en-us/community/posts/360071465631-write-lines-write-map-write-tsv-write-json-fail-when-run-in-a-workflow-rather-than-in-a-task
if (defined(cohort_sample_names_array)) {
Expand All @@ -62,7 +65,8 @@ workflow GvsExtractCohortFromSampleNames {
dataset_name = gvs_dataset, # unused if fq_* args are given
destination_project = destination_project_id,
destination_dataset = destination_dataset_name,
fq_temp_table_dataset = fq_gvs_extraction_temp_tables_dataset
fq_temp_table_dataset = fq_gvs_extraction_temp_tables_dataset,
write_cost_to_db = write_cost_to_db
}

call GvsExtractCallset.GvsExtractCallset {
Expand All @@ -87,7 +91,8 @@ workflow GvsExtractCohortFromSampleNames {
split_intervals_disk_size_override = split_intervals_disk_size_override,
split_intervals_mem_override = split_intervals_mem_override,

gatk_override = gatk_override
gatk_override = gatk_override,
write_cost_to_db = write_cost_to_db
}

output {
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsPopulateAltAllele.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ task PopulateAltAlleleTable {
done
>>>
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
memory: "3 GB"
disks: "local-disk 10 HDD"
cpu: 1
Expand Down
10 changes: 7 additions & 3 deletions scripts/variantstore/wdl/GvsPrepareRangesCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ workflow GvsPrepareCallset {
Array[String]? query_labels
File? sample_names_to_extract
Boolean only_output_vet_tables = false
Boolean write_cost_to_db = true
}

String full_extract_prefix = if (control_samples) then "~{extract_table_prefix}_controls" else extract_table_prefix
Expand All @@ -40,7 +41,8 @@ workflow GvsPrepareCallset {
fq_destination_dataset = fq_destination_dataset,
temp_table_ttl_in_hours = 72,
control_samples = control_samples,
only_output_vet_tables = only_output_vet_tables
only_output_vet_tables = only_output_vet_tables,
write_cost_to_db = write_cost_to_db
}

output {
Expand All @@ -64,6 +66,7 @@ task PrepareRangesCallsetTask {
Array[String]? query_labels
Int temp_table_ttl_in_hours = 24
Boolean only_output_vet_tables
Boolean write_cost_to_db
}
meta {
# All kinds of BQ reading happening in the referenced Python script.
Expand Down Expand Up @@ -102,15 +105,16 @@ task PrepareRangesCallsetTask {
~{sep=" " query_label_args} \
--fq_sample_mapping_table ~{fq_sample_mapping_table} \
--ttl ~{temp_table_ttl_in_hours} \
~{true="--only_output_vet_tables True" false='' only_output_vet_tables}
~{true="--only_output_vet_tables True" false='' only_output_vet_tables} \
~{true="--write_cost_to_db True" false="--write_cost_to_db ''" write_cost_to_db}

>>>
output {
String fq_cohort_extract_table_prefix = "~{fq_destination_dataset}.~{destination_cohort_table_prefix}" # implementation detail of create_ranges_cohort_extract_data_table.py
}

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
memory: "3 GB"
disks: "local-disk 100 HDD"
bootDiskSizeGb: 15
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ task ScaleXYBedValues {
}

runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-08-alpine"
docker: "us.gcr.io/broad-dsde-methods/variantstore:2022-11-17-alpine"
maxRetries: 3
memory: "7 GB"
preemptible: 3
Expand Down
28 changes: 28 additions & 0 deletions scripts/variantstore/wdl/SUB_COHORT_WORKFLOW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# How to Run the GvsExtractCohortFromSampleNames Workflow

The purpose of `GvsExtractCohortFromSampleNames.wdl` is to take advantage of an existing GVS (Genomic Variant Store) in BigQuery, complete with filter model, to generate a callset with a subset of specified samples' data. It calls existing WDLs for the "Prepare" and "Extract" steps and allows for the data required to create the subset callset to be stored and queried (and, therefore, paid for) by a different Google project than the "parent" GVS (most probably paid for by AoU).

Required inputs:

| Parameter | Description |
|------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| call_set_identifier | a unique name for this cohort (used for cost tracking) |
| cohort_table_prefix | a unique name for this cohort (can be the same as `call_set_identifier`) should only contain letters and underscores |
| destination_dataset_name | name of the BigQuery dataset that's in the destination Google project ID |
| destination_project_id | Google project ID where the `destination_dataset_name` lives |
| extraction_uuid | unique name used as a query label for the "Prepare" step, can be the same as call_set_identifier and/or cohort_table_prefix (requested as an input by Verily for cost-trackings; if this isn't Verily-related, the value doesn't matter) |
| filter_set_name | the same input that was used for `GvsCreateFilterSet` (or `GvsJointVariantCalling`) |
| gvs_dataset | the same input value that was used for `dataset_name` in the GVS step WDLs (or `GvsJointVariantCalling`) |
| gvs_project | the same input value that was used for `project_id` in the GVS step WDLs (or `GvsJointVariantCalling`) |
| output_file_base_name | the base file name for the VCFs that will be created in the "Extract" step |
| query_project | Google project ID for the permissions/billing of the "Prepare" and "Extract" steps for this sub-cohort (can be the same as `destination_project_id`) |
| scatter_count | Number of shards to scatter "Extract" step across |
| control_samples | GCS path to a file that contains a list of the samples (`sample_name` field in GVS) to include in the sub-cohort; if not set, `cohort_sample_names_array` must be set |
| cohort_sample_names_array | Array of sample identifiers (`sample_name` field in GVS) to include in the sub-cohort; if not set, `control_samples` must be set |

Optional inputs of interest:

| Parameter | Description |
|------------------------|--------------------------------------------|
| drop_state | This should correspond to the same value set in `GvsImportGenomes` (or `GvsJointVariantCalling`) |
| output_gcs_dir | GCS path to a directory to copy the interval list files, the extract VCFs and a sample manifest into |
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ def make_extract_table(call_set_identifier,
destination_table_prefix,
fq_sample_mapping_table,
temp_table_ttl_hours,
only_output_vet_tables):
only_output_vet_tables,
write_cost_to_db):
try:
fq_destination_table_ref_data = f"{fq_destination_dataset}.{destination_table_prefix}__REF_DATA"
fq_destination_table_vet_data = f"{fq_destination_dataset}.{destination_table_prefix}__VET_DATA"
Expand Down Expand Up @@ -304,7 +305,7 @@ def make_extract_table(call_set_identifier,

finally:
utils.write_job_stats(JOBS, client, f"{fq_destination_dataset}", call_set_identifier, 'GvsPrepareRanges',
'PrepareRangesCallsetTask', output_table_prefix)
'PrepareRangesCallsetTask', output_table_prefix, write_cost_to_db)


if __name__ == '__main__':
Expand Down Expand Up @@ -334,6 +335,8 @@ def make_extract_table(call_set_identifier,
parser.add_argument('--ttl', type=int, help='Temp table TTL in hours', required=False, default=72)
parser.add_argument('--only_output_vet_tables', type=bool,
help='Only create __VET_DATA table, skip __REF_DATA and __SAMPLES tables', required=False, default=False)
parser.add_argument('--write_cost_to_db', type=bool,
help='Populate cost_observability table with BigQuery query bytes scanned', required=False, default=True)

sample_args = parser.add_mutually_exclusive_group(required=True)
sample_args.add_argument('--sample_names_to_extract', type=str,
Expand All @@ -358,4 +361,5 @@ def make_extract_table(call_set_identifier,
args.destination_cohort_table_prefix,
args.fq_sample_mapping_table,
args.ttl,
args.only_output_vet_tables)
args.only_output_vet_tables,
args.write_cost_to_db)
16 changes: 9 additions & 7 deletions scripts/variantstore/wdl/extract/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def utf8len(s):
return len(s.encode('utf-8'))


def write_job_stats(jobs, client, fq_dataset, call_set_identifier, step, call, shard_identifier):
def write_job_stats(jobs, client, fq_dataset, call_set_identifier, step, call, shard_identifier, write_cost_to_db = True):
total = 0

for query_return in jobs:
Expand All @@ -67,9 +67,11 @@ def write_job_stats(jobs, client, fq_dataset, call_set_identifier, step, call, s
print("\nTotal GiBs billed ", total/(1024 * 1024 * 1024), " GiBs\n")

# populate cost_observability data
sql = f"""INSERT INTO `{fq_dataset}.cost_observability`
(call_set_identifier, step, call, shard_identifier, event_key, call_start_timestamp, event_timestamp, event_bytes)
VALUES('{call_set_identifier}', '{step}', '{call}', '{shard_identifier}', 'BigQuery Query Scanned',
CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), {total})"""
query = client.query(sql)
query.result()
if write_cost_to_db:
sql = f"""INSERT INTO `{fq_dataset}.cost_observability`
(call_set_identifier, step, call, shard_identifier, event_key, call_start_timestamp, event_timestamp, event_bytes)
VALUES('{call_set_identifier}', '{step}', '{call}', '{shard_identifier}', 'BigQuery Query Scanned',
CURRENT_TIMESTAMP(), CURRENT_TIMESTAMP(), {total})"""
query = client.query(sql)
query.result()

Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ protected static VCFHeader generateVcfHeader(Set<String> sampleNames,
@Override
protected String[] customCommandLineValidation() {
final List<String> errors = new ArrayList<>();
if (projectID != null || datasetID != null || costObservabilityTableName != null || callSetIdentifier != null || wdlStep != null || wdlCall != null || shardIdentifier != null) {
if (projectID == null || datasetID == null || costObservabilityTableName == null || callSetIdentifier == null || wdlStep == null || wdlCall == null || shardIdentifier == null) {
errors.add("Parameters 'project-id', 'dataset-id', 'cost-observability-tablename', 'call-set-identifier', 'wdl-step', 'wdl-call', and 'shardIdentifier' must either ALL be set or ALL NOT BE set");
if (costObservabilityTableName != null) {
if (projectID == null || datasetID == null || callSetIdentifier == null || wdlStep == null || wdlCall == null || shardIdentifier == null) {
errors.add("Parameters 'project-id', 'dataset-id', 'call-set-identifier', 'wdl-step', 'wdl-call', and 'shardIdentifier' must be set if 'cost-observability-tablename' is set.");
}
}
if (!errors.isEmpty()) {
Expand Down