Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Curate input arrays to skip already ingested sample data [VS-246] #7862

Merged
merged 21 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ workflows:
branches:
- master
- ah_var_store
- rsa_skip_samples
- rc-vs-418-allow-withdrawn
- name: GvsPrepareRangesCallset
subclass: WDL
Expand Down
165 changes: 67 additions & 98 deletions scripts/variantstore/wdl/GvsImportGenomes.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ version 1.0
import "GvsUtils.wdl" as GvsUtils

workflow GvsImportGenomes {

input {
Boolean go = true
String dataset_name
Expand Down Expand Up @@ -32,7 +31,7 @@ workflow GvsImportGenomes {
}
}

call GetSampleIds {
call GetUningestedSampleIds {
input:
dataset_name = dataset_name,
project_id = project_id,
Expand All @@ -41,28 +40,30 @@ workflow GvsImportGenomes {
service_account_json_path = service_account_json_path
}

call CheckForDuplicateData {
call CurateInputLists {
input:
dataset_name = dataset_name,
project_id = project_id,
sample_names = external_sample_names,
input_vcf_index_list = write_lines(input_vcf_indexes),
input_vcf_list = write_lines(input_vcfs),
input_sample_name_list = write_lines(external_sample_names),
input_sample_map = GetUningestedSampleIds.sample_map,
service_account_json_path = service_account_json_path
}

call CreateFOFNs {
input:
batch_size = 1,
input_vcf_index_list = write_lines(input_vcf_indexes),
input_vcf_list = write_lines(input_vcfs),
sample_name_list = write_lines(external_sample_names),
input_vcf_index_list = CurateInputLists.index_list,
input_vcf_list = CurateInputLists.vcf_list,
sample_name_list = CurateInputLists.sample_name_list
}

scatter (i in range(length(CreateFOFNs.vcf_batch_vcf_fofns))) {
call LoadData {
input:
dataset_name = dataset_name,
project_id = project_id,
duplicate_check_passed = CheckForDuplicateData.done,
drop_state = "FORTY",
drop_state_includes_greater_than = false,
input_vcf_indexes = read_lines(CreateFOFNs.vcf_batch_vcf_index_fofns[i]),
Expand All @@ -72,7 +73,7 @@ workflow GvsImportGenomes {
load_data_preemptible_override = load_data_preemptible_override,
load_data_maxretries_override = load_data_maxretries_override,
sample_names = read_lines(CreateFOFNs.vcf_sample_name_fofns[i]),
sample_map = GetSampleIds.sample_map,
sample_map = GetUningestedSampleIds.sample_map,
service_account_json_path = service_account_json_path,
}
}
Expand All @@ -91,78 +92,6 @@ workflow GvsImportGenomes {
}
}

task CheckForDuplicateData {
input {
String dataset_name
String project_id

Array[String] sample_names

String? service_account_json_path
}

String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false'
Int num_samples = length(sample_names)

meta {
volatile: true
}

command <<<
set -e

if [ ~{has_service_account_file} = 'true' ]; then
gsutil cp ~{service_account_json_path} local.service_account.json
gcloud auth activate-service-account --key-file=local.service_account.json
gcloud config set project ~{project_id}
fi

echo "project_id = ~{project_id}" > ~/.bigqueryrc

INFO_SCHEMA_TABLE="~{dataset_name}.INFORMATION_SCHEMA.PARTITIONS"
TEMP_TABLE="~{dataset_name}.sample_dupe_check"
SAMPLE_INFO_TABLE="~{dataset_name}.sample_info"

# create a temp table with the sample_names
bq --project_id=~{project_id} mk ${TEMP_TABLE} "sample_name:STRING"
NAMES_FILE=~{write_lines(sample_names)}
bq load --project_id=~{project_id} ${TEMP_TABLE} $NAMES_FILE "sample_name:STRING"

# check the INFORMATION_SCHEMA.PARTITIONS table to see if any of input sample names/ids have data loaded into their partitions
# this returns the list of sample names that do already have data loaded
echo "WITH items as (SELECT s.sample_id, s.sample_name, s.is_loaded, s.withdrawn FROM \`${TEMP_TABLE}\` t left outer join \`${SAMPLE_INFO_TABLE}\` s on (s.sample_name = t.sample_name)) " >> query.sql
echo "SELECT i.sample_name FROM \`${INFO_SCHEMA_TABLE}\` p JOIN items i ON (p.partition_id = CAST(i.sample_id AS STRING)) WHERE p.total_logical_bytes > 0 AND (table_name like 'ref_ranges_%' OR table_name like 'vet_%')" >> query.sql
echo "UNION DISTINCT " >> query.sql
echo "SELECT i.sample_name FROM items i WHERE i.is_loaded = True " >> query.sql
echo "UNION DISTINCT " >> query.sql
echo "SELECT i.sample_name FROM items i WHERE i.sample_id IN (SELECT sample_id FROM \`~{dataset_name}.sample_load_status\`) " >> query.sql


cat query.sql | bq --location=US --project_id=~{project_id} query --format=csv -n ~{num_samples} --use_legacy_sql=false | sed -e '/sample_name/d' > duplicates

# remove the temp table
bq --project_id=~{project_id} rm -f -t ${TEMP_TABLE}

# true if there is data in results
if [ -s duplicates ]; then
echo "ERROR: Trying to load samples that have already been loaded"
cat duplicates
exit 1
fi
>>>
runtime {
docker: "gcr.io/google.com/cloudsdktool/cloud-sdk:305.0.0"
memory: "1 GB"
disks: "local-disk 10 HDD"
preemptible: 5
cpu: 1
}
output {
Boolean done = true
File? duplicates = "duplicates"
}
}

task CreateFOFNs {
input {
Int batch_size
Expand Down Expand Up @@ -199,7 +128,6 @@ task LoadData {
String dataset_name
String project_id

Boolean duplicate_check_passed
Array[File] input_vcf_indexes
Array[File] input_vcfs
File interval_list
Expand Down Expand Up @@ -297,8 +225,6 @@ task LoadData {
}
}



task SetIsLoadedColumn {
meta {
volatile: true
Expand Down Expand Up @@ -342,7 +268,7 @@ task SetIsLoadedColumn {
}
}

task GetSampleIds {
task GetUningestedSampleIds {
meta {
volatile: true
}
Expand Down Expand Up @@ -371,20 +297,19 @@ task GetSampleIds {
echo "project_id = ~{project_id}" > ~/.bigqueryrc

# create temp table with the sample_names and load external sample names into temp table -- make sure it doesn't exist already
set +e
TEMP_TABLE="~{dataset_name}.sample_names_to_load"
bq show --project_id ~{project_id} ${TEMP_TABLE} > /dev/null
BQ_SHOW_RC=$?
set -e

# if there is already a table of sample names or something else is wrong, bail
if [ $BQ_SHOW_RC -eq 0 ]; then
echo "There is already a list of sample names. This may need manual cleanup. Exiting"
exit 1
fi
set +e
TEMP_TABLE="~{dataset_name}.sample_names_to_load"
bq show --project_id ~{project_id} ${TEMP_TABLE} > /dev/null
BQ_SHOW_RC=$?
set -e

# if there is already a table of sample names or something else is wrong, bail
if [ $BQ_SHOW_RC -eq 0 ]; then
echo "There is already a list of sample names. This may need manual cleanup. Exiting"
exit 1
fi

echo "Creating the external sample name list table ${TEMP_TABLE}"
TEMP_TABLE="~{dataset_name}.sample_names_to_load"
bq --project_id=~{project_id} mk ${TEMP_TABLE} "sample_name:STRING"
NAMES_FILE=~{write_lines(external_sample_names)}
bq load --project_id=~{project_id} ${TEMP_TABLE} $NAMES_FILE "sample_name:STRING"
Expand All @@ -406,8 +331,9 @@ task GetSampleIds {
python3 -c "from math import ceil; print(ceil($max_sample_id/~{samples_per_table}))" > max_sample_id
python3 -c "from math import ceil; print(ceil($min_sample_id/~{samples_per_table}))" > min_sample_id

# get sample map of samples that haven't been loaded yet
bq --project_id=~{project_id} query --format=csv --use_legacy_sql=false -n ~{num_samples} \
"SELECT sample_id, samples.sample_name FROM \`~{dataset_name}.~{table_name}\` AS samples JOIN \`${TEMP_TABLE}\` AS temp ON samples.sample_name=temp.sample_name" > sample_map
"SELECT sample_id, samples.sample_name FROM \`~{dataset_name}.~{table_name}\` AS samples JOIN \`${TEMP_TABLE}\` AS temp ON samples.sample_name=temp.sample_name WHERE samples.sample_id NOT IN (SELECT sample_id FROM \`~{dataset_name}.sample_load_status\` WHERE status='FINISHED')" > sample_map

cut -d, -f1 sample_map > gvs_ids
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:
Might be clearer as:

Suggested change
cut -d, -f1 sample_map > gvs_ids
cut -d ',' -f1 sample_map > gvs_ids


Expand All @@ -428,3 +354,46 @@ task GetSampleIds {
File gvs_ids = "gvs_ids"
}
}

task CurateInputLists {
input {
String dataset_name
String project_id
File input_vcf_index_list
File input_vcf_list
File input_sample_map
File input_sample_name_list
rsasch marked this conversation as resolved.
Show resolved Hide resolved

String? service_account_json_path
}

String has_service_account_file = if (defined(service_account_json_path)) then 'true' else 'false'
command <<<
set -ex
if [ ~{has_service_account_file} = 'true' ]; then
gsutil cp ~{service_account_json_path} local.service_account.json
gcloud auth activate-service-account --key-file=local.service_account.json
fi

gsutil cp ~{input_vcf_index_list} input_vcf_index_list_file
rsasch marked this conversation as resolved.
Show resolved Hide resolved
gsutil cp ~{input_vcf_list} input_vcf_list_file
gsutil cp ~{input_sample_map} input_sample_map_file
gsutil cp ~{input_sample_name_list} input_sample_name_list_file

python3 /app/curate_input_array_files.py
>>>
runtime {
docker: "us.gcr.io/broad-dsde-methods/variantstore:rsa_skip_samples_20220520"
memory: "3 GB"
disks: "local-disk 100 HDD"
bootDiskSizeGb: 15
preemptible: 0
rsasch marked this conversation as resolved.
Show resolved Hide resolved
cpu: 1
}

output {
File index_list = "output_vcf_index_list_file"
File vcf_list = "output_vcf_list_file"
File sample_name_list = "output_sample_name_list_file"
}
}
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/extract/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ COPY alt_allele_positions.sql /app
COPY alt_allele_temp_function.sql /app
COPY utils.py /app
COPY add_max_as_vqslod.py /app
COPY curate_input_array_files.py /app

WORKDIR /app
48 changes: 48 additions & 0 deletions scripts/variantstore/wdl/extract/curate_input_array_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import numpy as np
from contextlib import contextmanager

SAMPLE_MAP_FILE_SUFFIX = "sample_map_file"
SAMPLE_NAME_FILE_SUFFIX = "sample_name_list_file"
VCF_FILE_SUFFIX = "vcf_list_file"
VCF_INDEX_FILE_SUFFIX = "vcf_index_list_file"

@contextmanager
rsasch marked this conversation as resolved.
Show resolved Hide resolved
def handle_file_error(file_name):
try:
yield
except:
print(f"ERROR: required file named '{file_name}' does not exist.")
rsasch marked this conversation as resolved.
Show resolved Hide resolved


def curate_input_arrays():
sample_map_array = vcf_array = vcf_indexes_array = sample_names_array = []
with handle_file_error(f"input_{SAMPLE_MAP_FILE_SUFFIX}"):
sample_map_array = np.loadtxt(f"input_{SAMPLE_MAP_FILE_SUFFIX}", dtype=str, delimiter=",")
with handle_file_error(f"input_{VCF_FILE_SUFFIX}"):
vcf_array = np.loadtxt(f"input_{VCF_FILE_SUFFIX}", dtype=str)
with handle_file_error(f"input_{VCF_INDEX_FILE_SUFFIX}"):
vcf_indexes_array = np.loadtxt(f"input_{VCF_INDEX_FILE_SUFFIX}", dtype=str)
with handle_file_error(f"input_{SAMPLE_NAME_FILE_SUFFIX}"):
sample_names_array = np.loadtxt(f"input_{SAMPLE_NAME_FILE_SUFFIX}", dtype=str)
rows_to_delete = []

# use input_sample_names_array to figure out which index "rows" to delete
for i in range(len(sample_names_array)):
if sample_names_array[i] not in sample_map_array:
rows_to_delete.append(i)

# re-create input arrays using array of "rows" to delete
vcf_array = [vcf_array[i] for i in range(len(vcf_array)) if i not in rows_to_delete]
vcf_indexes_array = [vcf_indexes_array[i] for i in range(len(vcf_indexes_array)) if
i not in rows_to_delete]
sample_names_array = [sample_names_array[i] for i in range(len(sample_names_array)) if
i not in rows_to_delete]

print(f"Creating 'output_{SAMPLE_NAME_FILE_SUFFIX}', 'output_{VCF_FILE_SUFFIX}' and 'output_{VCF_INDEX_FILE_SUFFIX}'.")
np.savetxt(f"output_{SAMPLE_NAME_FILE_SUFFIX}", sample_names_array, fmt='%s')
np.savetxt(f"output_{VCF_FILE_SUFFIX}", vcf_array, fmt='%s')
np.savetxt(f"output_{VCF_INDEX_FILE_SUFFIX}", vcf_indexes_array, fmt='%s')


if __name__ == '__main__':
curate_input_arrays()
1 change: 1 addition & 0 deletions scripts/variantstore/wdl/extract/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
google-cloud-bigquery
ijson
numpy