diff --git a/.dockstore.yml b/.dockstore.yml index 0d91f4e95bc..13acb09b04d 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -65,6 +65,13 @@ workflows: branches: - master - ah_var_store + - name: GvsCreateAltAllele + subclass: WDL + primaryDescriptorPath: /scripts/variantstore/wdl/GvsCreateAltAllele.wdl + filters: + branches: + - master + - ah_var_store - name: GvsExtractCallset subclass: WDL primaryDescriptorPath: /scripts/variantstore/wdl/GvsExtractCallset.wdl diff --git a/scripts/variantstore/TERRA_QUICKSTART.md b/scripts/variantstore/TERRA_QUICKSTART.md index f68efeca83c..8c66a1eb02d 100644 --- a/scripts/variantstore/TERRA_QUICKSTART.md +++ b/scripts/variantstore/TERRA_QUICKSTART.md @@ -57,15 +57,16 @@ These are the required parameters which must be supplied to the workflow: **NOTE**: if your workflow fails, you will need to manually remove a lockfile from the output directory. It is called LOCKFILE, and can be removed with `gsutil rm` ## 2. Create Alt Allele Table -**NOTE:** This is a bit of a kludge until we gain more confidence that the data loaded into the ALT_ALLELE table for feature training are optimal and we can automate this process +This step loads data into the ALT_ALLELE table from the `vet_*` tables. -You'll need to run this from the BigQuery Console for your dataset. +This is done by running the `GvsCreateAltAllele` workflow with the following parameters: -Load the SQL script you can find here in the [GATK GitHub Repository](https://github.com/broadinstitute/gatk/blob/ah_var_store/scripts/variantstore/bq/alt_allele_creation.example.sql) - -There are three places where you need to replace the string `spec-ops-aou.gvs_tieout_acmg_v1` with your project and dataset name in the form `PROJECT.DATASET` +| Parameter | Description | +| ----------------- | ----------- | +| data_project | The name of the google project containing the dataset | +| default_dataset | The name of the dataset | -Execute the script, it should take 30-60 seconds to run resulting in the creation of the `ALT_ALLELE` table in your dataset +**Note:** This workflow does not use the Terra Entity model to run, so be sure to select `Run workflow with inputs defined by file paths` ## 3. Create Filter Set diff --git a/scripts/variantstore/wdl/GvsCreateAltAllele.wdl b/scripts/variantstore/wdl/GvsCreateAltAllele.wdl new file mode 100644 index 00000000000..ab9da6880b9 --- /dev/null +++ b/scripts/variantstore/wdl/GvsCreateAltAllele.wdl @@ -0,0 +1,193 @@ +version 1.0 + +workflow GvsCreateAltAllele { + input { + String dataset_project + String query_project_id = dataset_project + String default_dataset + + String? service_account_json_path + } + + call GetVetTableNames { + input: + query_project_id = query_project_id, + dataset_project_id = dataset_project, + dataset_name = default_dataset, + service_account_json_path = service_account_json_path + } + + call CreateAltAlleleTable { + input: + query_project_id = query_project_id, + dataset_project_id = dataset_project, + dataset_name = default_dataset, + service_account_json_path = service_account_json_path + } + + scatter (idx in range(length(GetVetTableNames.vet_tables))) { + call PopulateAltAlleleTable { + input: + create_table_done = CreateAltAlleleTable.done, + vet_table_name = GetVetTableNames.vet_tables[idx], + query_project_id = query_project_id, + dataset_project_id = dataset_project, + dataset_name = default_dataset, + service_account_json_path = service_account_json_path + } + } + + output { + Array[String] vet_tables_loaded = PopulateAltAlleleTable.done + } +} + +task GetVetTableNames { + meta { + volatile: true + } + + input { + String query_project_id + String dataset_project_id + String dataset_name + + String? service_account_json_path + } + + String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false' + + command <<< + set -e + + if [ ~{has_service_account_file} = 'true' ]; then + gsutil cp ~{service_account_json_path} local.service_account.json + gcloud auth activate-service-account --key-file=local.service_account.json + gcloud config set project ~{query_project_id} + fi + + echo "project_id = ~{query_project_id}" > ~/.bigqueryrc + bq query --location=US --project_id=~{query_project_id} --format=csv --use_legacy_sql=false \ + "SELECT table_name FROM ~{dataset_project_id}.~{dataset_name}.INFORMATION_SCHEMA.TABLES WHERE table_name LIKE 'vet_%' ORDER BY table_name" > vet_tables.csv + + # remove the header row from the CSV file + sed -i 1d vet_tables.csv + >>> + + output { + Array[String] vet_tables = read_lines("vet_tables.csv") + } + + runtime { + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:305.0.0" + memory: "3 GB" + disks: "local-disk 10 HDD" + preemptible: 3 + cpu: 1 + } +} + +task CreateAltAlleleTable { + input { + String query_project_id + String dataset_project_id + String dataset_name + + String? service_account_json_path + } + + String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false' + + command <<< + set -e + + if [ ~{has_service_account_file} = 'true' ]; then + gsutil cp ~{service_account_json_path} local.service_account.json + gcloud auth activate-service-account --key-file=local.service_account.json + gcloud config set project ~{query_project_id} + fi + + echo "project_id = ~{query_project_id}" > ~/.bigqueryrc + bq query --location=US --project_id=~{query_project_id} --format=csv --use_legacy_sql=false \ + "CREATE OR REPLACE TABLE ~{dataset_project_id}.~{dataset_name}.alt_allele ( + location INT64, + sample_id INT64, + ref STRING, + allele STRING, + allele_pos INT64, + call_GT STRING, + call_GQ INT64, + as_raw_mq STRING, + raw_mq INT64, + as_raw_mqranksum STRING, + raw_mqranksum_x_10 INT64, + as_qualapprox STRING, + qualapprox STRING, + qual INT64, + as_raw_readposranksum STRING, + raw_readposranksum_x_10 INT64, + as_sb_table STRING, + sb_ref_plus INT64, + sb_ref_minus INT64, + sb_alt_plus INT64, + sb_alt_minus INT64, + call_AD STRING, + ref_ad INT64, + ad INT64 + ) PARTITION BY RANGE_BUCKET(location, GENERATE_ARRAY(0, 25000000000000, 1000000000000)) + CLUSTER BY location, sample_id;" + + >>> + + output { + String done = "done" + } + + runtime { + docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:305.0.0" + memory: "3 GB" + disks: "local-disk 10 HDD" + cpu: 1 + } +} + +task PopulateAltAlleleTable { + input { + String create_table_done + String vet_table_name + String query_project_id + String dataset_project_id + String dataset_name + + String? service_account_json_path + } + + String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false' + + command <<< + set -e + + if [ ~{has_service_account_file} = 'true' ]; then + gsutil cp ~{service_account_json_path} local.service_account.json + gcloud auth activate-service-account --key-file=local.service_account.json + SERVICE_ACCOUNT_STANZA="--sa_key_path local.service_account.json " + fi + + python3 /app/populate_alt_allele_table.py \ + --query_project ~{query_project_id} \ + --vet_table_name ~{vet_table_name} \ + --fq_dataset ~{dataset_project_id}.~{dataset_name} \ + $SERVICE_ACCOUNT_STANZA + >>> + + output { + String done = "~{vet_table_name}" + } + + runtime { + docker: "us.gcr.io/broad-dsde-methods/variantstore:ah_var_store_20210903" + memory: "3 GB" + disks: "local-disk 10 HDD" + cpu: 1 + } +} diff --git a/scripts/variantstore/wdl/extract/Dockerfile b/scripts/variantstore/wdl/extract/Dockerfile index bb4316bf27c..0453b11ed98 100644 --- a/scripts/variantstore/wdl/extract/Dockerfile +++ b/scripts/variantstore/wdl/extract/Dockerfile @@ -1,14 +1,17 @@ FROM gcr.io/google.com/cloudsdktool/cloud-sdk:349.0.0 # Copy the application's requirements.txt and run pip to install -ADD requirements.txt /app/requirements.txt +COPY requirements.txt /app/requirements.txt RUN pip install -r /app/requirements.txt RUN apt-get update && apt-get -y upgrade && apt-get -y install bcftools -# Add the application source code. -ADD create_cohort_extract_data_table.py /app -ADD create_variant_annotation_table.py /app -ADD extract_subpop.py /app +# Copy the application source code. +COPY create_cohort_extract_data_table.py /app +COPY create_variant_annotation_table.py /app +COPY extract_subpop.py /app +COPY populate_alt_allele_table.py /app +COPY alt_allele_positions.sql /app +COPY alt_allele_temp_function.sql /app WORKDIR /app ENTRYPOINT ["/bin/bash"] diff --git a/scripts/variantstore/wdl/extract/alt_allele_positions.sql b/scripts/variantstore/wdl/extract/alt_allele_positions.sql new file mode 100644 index 00000000000..9a622fc5364 --- /dev/null +++ b/scripts/variantstore/wdl/extract/alt_allele_positions.sql @@ -0,0 +1,72 @@ +select location, sample_id, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(0)] as ref, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(1)] as allele, + 1 as allele_pos, call_GT, call_GQ, + as_raw_mq, + cast(SPLIT(as_raw_mq,'|')[OFFSET(1)] as int64) raw_mq, + as_raw_mqranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_mqranksum_x_10, + as_qualapprox, + qualapprox, + cast(SPLIT(as_qualapprox,'|')[OFFSET(0)] as int64) as qual, + as_raw_readposranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_readposranksum_x_10, + as_sb_table, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(0)] as int64) as sb_alt_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(1)] as int64) as sb_alt_minus, + call_AD, + cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad, + cast(SPLIT(call_AD,',')[OFFSET(1)] as int64) as ad +from position1 + +union all + +select location, sample_id, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(0)] as ref, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(0)]))[OFFSET(1)] as allele, + 1 as allele_pos, call_GT, call_GQ, + as_raw_mq, + cast(SPLIT(as_raw_mq,'|')[OFFSET(1)] as int64) raw_mq, + as_raw_mqranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_mqranksum_x_10, + as_qualapprox, + qualapprox, + cast(SPLIT(as_qualapprox,'|')[OFFSET(0)] as int64) as qual, + as_raw_readposranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(0)] as float64) * 10.0 as int64) as raw_readposranksum_x_10, + as_sb_table, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(0)] as int64) as sb_alt_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(1)],',')[OFFSET(1)] as int64) as sb_alt_minus, + call_AD, + cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad, + cast(SPLIT(call_AD,',')[OFFSET(1)] as int64) as ad +from position2 + +union all + +select location, sample_id, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(1)]))[OFFSET(0)] as ref, + SPLIT(minimize(ref, SPLIT(alt,',')[OFFSET(1)]))[OFFSET(1)] as allele, + 2 as allele_pos, call_GT, call_GQ, + as_raw_mq, + cast(SPLIT(as_raw_mq,'|')[OFFSET(2)] as int64) raw_mq, + as_raw_mqranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_mqranksum,',')[SAFE_OFFSET(1)] as float64) * 10.0 as int64) as raw_mqranksum_x_10, + as_qualapprox, + qualapprox, + cast(SPLIT(as_qualapprox,'|')[OFFSET(1)] as int64) as qual, + as_raw_readposranksum, + SAFE_cast(SAFE_cast(SPLIT(as_raw_readposranksum,',')[SAFE_OFFSET(1)] as float64) * 10.0 as int64) as raw_readposranksum_x_10, + as_sb_table, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(0)] as int64) as sb_ref_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(0)],',')[OFFSET(1)] as int64) as sb_ref_minus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(2)],',')[OFFSET(0)] as int64) as sb_alt_plus, + cast(SPLIT(SPLIT(as_sb_table,'|')[OFFSET(2)],',')[OFFSET(1)] as int64) as sb_alt_minus, + call_AD, + cast(SPLIT(call_AD,',')[OFFSET(0)] as int64) as ref_ad, + cast(SPLIT(call_AD,',')[OFFSET(2)] as int64) as ad +from position2; diff --git a/scripts/variantstore/wdl/extract/alt_allele_temp_function.sql b/scripts/variantstore/wdl/extract/alt_allele_temp_function.sql new file mode 100644 index 00000000000..cb3d0d2dc7b --- /dev/null +++ b/scripts/variantstore/wdl/extract/alt_allele_temp_function.sql @@ -0,0 +1,14 @@ +CREATE TEMPORARY FUNCTION minimize(ref STRING, allele STRING) +RETURNS STRING + LANGUAGE js AS """ + let done = false + while (!done && ref.length !== 1) { + if (ref.slice(-1) === allele.slice(-1)) { + ref = ref.slice(0, -1) + allele = allele.slice(0,-1) + } else { + done = true + } + } + return ref+','+allele + """; diff --git a/scripts/variantstore/wdl/extract/populate_alt_allele_table.py b/scripts/variantstore/wdl/extract/populate_alt_allele_table.py new file mode 100644 index 00000000000..65019883b03 --- /dev/null +++ b/scripts/variantstore/wdl/extract/populate_alt_allele_table.py @@ -0,0 +1,80 @@ +import time +import os + +from google.cloud import bigquery +from google.cloud.bigquery.job import QueryJobConfig +from google.oauth2 import service_account +from pathlib import Path + +import argparse + +client = None + +def execute_with_retry(label, sql): + retry_delay = [30, 60, 90] # 3 retries with incremental backoff + start = time.time() + while len(retry_delay) > 0: + try: + query_label = label.replace(" ","-").strip().lower() + + existing_labels = client._default_query_job_config.labels + job_labels = existing_labels + job_labels["gvs_query_name"] = query_label + job_config = bigquery.QueryJobConfig(labels=job_labels) + query = client.query(sql, job_config=job_config) + + print(f"STARTING - {label}") + results = query.result() + print(f"COMPLETED ({time.time() - start} s, {3-len(retry_delay)} retries) - {label}") + return results + except Exception as err: + # if there are no retries left... raise + if (len(retry_delay) == 0): + raise err + else: + t = retry_delay.pop(0) + print(f"Error {err} running query {label}, sleeping for {t}") + time.sleep(t) + +def populate_alt_allele_table(query_project, vet_table_name, fq_dataset, sa_key_path): + global client + default_config = QueryJobConfig(priority="INTERACTIVE", use_query_cache=True) + + if sa_key_path: + credentials = service_account.Credentials.from_service_account_file( + sa_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + + client = bigquery.Client(credentials=credentials, + project=query_project, + default_query_job_config=default_config) + else: + client = bigquery.Client(project=query_project, + default_query_job_config=default_config) + + os.chdir(os.path.dirname(__file__)) + alt_allele_temp_function = Path('alt_allele_temp_function.sql').read_text() + alt_allele_positions = Path('alt_allele_positions.sql').read_text() + fq_vet_table = f"{fq_dataset}.{vet_table_name}" + query_with = f"""INSERT INTO {fq_dataset}.alt_allele + WITH + position1 as (select * from {fq_vet_table} WHERE call_GT IN ('0/1', '1/0', '1/1', '0|1', '1|0', '1|1', '0/2', '0|2','2/0', '2|0')), + position2 as (select * from {fq_vet_table} WHERE call_GT IN ('1/2', '1|2', '2/1', '2|1'))""" + + sql = alt_allele_temp_function + query_with + alt_allele_positions + result = execute_with_retry(f"into alt allele from {vet_table_name}", sql) + return result + +if __name__ == '__main__': + parser = argparse.ArgumentParser(allow_abbrev=False, description='Populate an alt_allele table for the BigQuery Variant Store') + + parser.add_argument('--query_project',type=str, help='Google project where query should be executed', required=True) + parser.add_argument('--vet_table_name',type=str, help='vet table name to ingest', required=True) + parser.add_argument('--fq_dataset',type=str, help='project and dataset for data', required=True) + parser.add_argument('--sa_key_path',type=str, help='Path to json key file for SA', required=False) + + + # Execute the parse_args() method + args = parser.parse_args() + + populate_alt_allele_table(args.query_project, args.vet_table_name, args.fq_dataset, args.sa_key_path)