From f2dabef3898df0e303996930df06f152ddb9d23c Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 15 Mar 2024 13:23:16 -0400 Subject: [PATCH 01/12] fix(workflows): cBioPortal export workflow fixes + updates --- chord_metadata_service/chord/export/utils.py | 13 ++--- chord_metadata_service/chord/export/views.py | 2 +- .../workflows/wdls/cbioportal_export.wdl | 49 +++++++++---------- 3 files changed, 26 insertions(+), 38 deletions(-) diff --git a/chord_metadata_service/chord/export/utils.py b/chord_metadata_service/chord/export/utils.py index a14e5ff64..844100f82 100644 --- a/chord_metadata_service/chord/export/utils.py +++ b/chord_metadata_service/chord/export/utils.py @@ -86,7 +86,7 @@ def get_path(self, filename: str = ''): # if filename contains a subdirectory, ensure it is created dirpath = os.path.dirname(path) if not os.path.exists(dirpath): - os.makedirs(dirpath, 0o777) + os.makedirs(dirpath) return path @@ -102,13 +102,6 @@ def write_tar(self): tar_path = os.path.join(self.base_path, EXPORT_DIR, self.project_id + '.tar.gz') with tarfile.open(tar_path, 'w:gz') as tar: output_dir = self.get_path() - tar.add(output_dir, filter=reset_tar_info) + # tar.gz will contain one `export/` output directory: + tar.add(output_dir, arcname="export") return tar_path - - -def reset_tar_info(info: tarfile.TarInfo) -> tarfile.TarInfo: - info.gid = 0 - info.uid = 0 - info.uname = 'root' - info.gname = 'root' - return info diff --git a/chord_metadata_service/chord/export/views.py b/chord_metadata_service/chord/export/views.py index 93eb9408f..fe66c0eb2 100644 --- a/chord_metadata_service/chord/export/views.py +++ b/chord_metadata_service/chord/export/views.py @@ -48,7 +48,7 @@ def export(request: Request): if not BENTO_EXPORT_SCHEMA_VALIDATOR.is_valid(request.data): msg_list = [err.message for err in BENTO_EXPORT_SCHEMA_VALIDATOR.iter_errors(request.data)] return Response(errors.bad_request_error( - "Invalid ingest request body: " + "\n".join(msg_list)), + "Invalid export request body: " + "\n".join(msg_list)), status=400 # TODO: Validation errors ) diff --git a/chord_metadata_service/chord/workflows/wdls/cbioportal_export.wdl b/chord_metadata_service/chord/workflows/wdls/cbioportal_export.wdl index eae546bb2..b5dd4c3c5 100644 --- a/chord_metadata_service/chord/workflows/wdls/cbioportal_export.wdl +++ b/chord_metadata_service/chord/workflows/wdls/cbioportal_export.wdl @@ -3,7 +3,6 @@ version 1.0 workflow cbioportal { input { String project_dataset - String run_dir String katsu_url String drs_url String access_token @@ -16,7 +15,6 @@ workflow cbioportal { call katsu_dataset_export { input: dataset_id = dataset_id_from_project_dataset.dataset_id, - run_dir = run_dir, katsu_url = katsu_url, token = access_token, validate_ssl = validate_ssl @@ -25,15 +23,13 @@ workflow cbioportal { call get_maf { input: drs_url = drs_url, dataset_id = dataset_id_from_project_dataset.dataset_id, - run_dir = run_dir, token = access_token, - validate_ssl = validate_ssl + validate_ssl = validate_ssl, + export_data = katsu_dataset_export.export_data } output { - Array[File] cbio = katsu_dataset_export.data_txt - File stdout = katsu_dataset_export.txt_output - File stderr = katsu_dataset_export.err_output + File export_data = get_maf.export_data_with_maf } } @@ -45,7 +41,7 @@ task dataset_id_from_project_dataset { python3 -c 'print("~{project_dataset}".split(":")[1], end="")' >>> output { - String dataset_id = stdout() + String dataset_id = read_string(stdout()) } } @@ -53,7 +49,6 @@ task katsu_dataset_export { input { String dataset_id String katsu_url - String run_dir String token Boolean validate_ssl } @@ -62,23 +57,17 @@ task katsu_dataset_export { # command block (tested with womtool-v78). Using triple angle braces made # interpolation more straightforward. command <<< - # Export results at export_path and returns http code 200 in case of success - RESPONSE=$(curl -X POST ~{true="" false="-k" validate_ssl} -s -w "%{http_code}" \ + # Export results; Katsu returns a .tar.gz file + curl -X POST ~{true="" false="-k" validate_ssl} -s --fail-with-body \ -H "Content-Type: application/json" \ -H "Authorization: Bearer ~{token}" \ - -d '{"format": "cbioportal", "object_type": "dataset", "object_id": "~{dataset_id}", "output_path": "~{run_dir}"}' \ - "~{katsu_url}/private/export") - - if [ "${RESPONSE}" != "204" ] - then - echo "Error: Metadata service replied with HTTP code ${RESPONSE}" 1>&2 # to stderr - exit 1 - fi - echo ${RESPONSE} + -d '{"format": "cbioportal", "object_type": "dataset", "object_id": "~{dataset_id}"}' \ + -o export.tar.gz \ + "~{katsu_url}/private/export" >>> output { - Array[File] data_txt = glob("${run_dir}/export/${dataset_id}/*.txt") + File export_data = "export.tar.gz" File txt_output = stdout() File err_output = stderr() } @@ -87,21 +76,24 @@ task katsu_dataset_export { task get_maf { input { String drs_url - String run_dir String dataset_id String token Boolean validate_ssl + File export_data } command <<< + # Extract exported data + tar -xzvf ~{export_data} + + # Write data_mutations_extended python <>> output { - File txt_output_maf = stdout() - File err_output_maf = stderr() + File export_data_with_maf = "export_with_maf.tar.gz" } } From 23be88bdcca9f0f5c9705bb49fc0b738317776ee Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 22 Mar 2024 09:10:37 -0400 Subject: [PATCH 02/12] chore: use experiment result URL field for DRS MAF URIs --- chord_metadata_service/chord/export/cbioportal.py | 7 +++---- chord_metadata_service/chord/tests/test_export_cbio.py | 2 +- chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/chord_metadata_service/chord/export/cbioportal.py b/chord_metadata_service/chord/export/cbioportal.py index 47c013077..4a56c6c02 100644 --- a/chord_metadata_service/chord/export/cbioportal.py +++ b/chord_metadata_service/chord/export/cbioportal.py @@ -113,7 +113,7 @@ def study_export(get_path: Callable[[str], str], dataset_id: str): .annotate(biosample_id=F("experiment__biosample")) ) - maf_list(exp_res, file_maf_list) + write_maf_list(exp_res, file_maf_list) case_list_export(cbio_study_id, exp_res, file_case_list) with open(get_path(MUTATION_META_FILENAME), 'w', newline='\n') as file_mutation_meta: @@ -261,12 +261,11 @@ def sample_export(results, file_handle: TextIO): dict_writer.writerows(samples) -def maf_list(results, file_handle: TextIO): +def write_maf_list(results: list[ExperimentResult], file_handle: TextIO): """ List of maf files associated with this dataset. """ - maf_uri = [experiment.extra_properties["uri"] + "\n" for experiment in results] - file_handle.writelines(maf_uri) + file_handle.writelines(experiment.url + "\n" for experiment in results) def mutation_meta_export(study_id: str, file_handle: TextIO): diff --git a/chord_metadata_service/chord/tests/test_export_cbio.py b/chord_metadata_service/chord/tests/test_export_cbio.py index 26981e20d..342e914d8 100644 --- a/chord_metadata_service/chord/tests/test_export_cbio.py +++ b/chord_metadata_service/chord/tests/test_export_cbio.py @@ -206,7 +206,7 @@ def test_export_maf_list(self): maf_count = exp_res.count() self.assertTrue(maf_count > 0) with io.StringIO() as output: - exp.maf_list(exp_res, output) + exp.write_maf_list(exp_res, output) output.seek(0) i = 0 for line in output: diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index 04b627e2b..eaad9ec75 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -292,13 +292,13 @@ task katsu_update_experiment_results_with_maf { "description": "MAF file", "filename": row["maf"], "file_format": "MAF", + "url": row['uri'], # DRS record URL "data_output_type": "Derived data", "usage": "Downloaded", "creation_date": date.today().isoformat(), "created_by": "Bento", "genome_assembly_id": vcf_props.get("genome_assembly_id", "GRCh37"), # TODO: make fallback a parameter "extra_properties": { - "uri": row['uri'], "derived_from": vcf_props["identifier"] } }) From 09152f0dd994d77616cd9d520c68cf8e89e5c433 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 22 Mar 2024 10:00:30 -0400 Subject: [PATCH 03/12] fix: vcf2maf katsu ingest call (part 1) --- .../chord/ingest/experiments.py | 12 +++++----- .../chord/workflows/wdls/vcf2maf.wdl | 22 +++---------------- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/chord_metadata_service/chord/ingest/experiments.py b/chord_metadata_service/chord/ingest/experiments.py index e51248048..9088021c0 100644 --- a/chord_metadata_service/chord/ingest/experiments.py +++ b/chord_metadata_service/chord/ingest/experiments.py @@ -154,7 +154,7 @@ def ingest_experiments_workflow(json_data, dataset_id: str) -> list[em.Experimen return [ingest_experiment(exp, dataset_id, validate=False) for exp in exps] -def ingest_derived_experiment_results(json_data: list[dict]) -> list[em.ExperimentResult]: +def ingest_derived_experiment_results(json_data: list[dict], dataset_id: str) -> list[em.ExperimentResult]: """ Reads a JSON file containing a list of experiment results and adds them to the database. The linkage to experiments is inferred from the `derived_from` category @@ -176,7 +176,7 @@ def ingest_derived_experiment_results(json_data: list[dict]) -> list[em.Experime exp_res_list: list[em.ExperimentResult] = [] # Create a map of experiment results identifier to experiment id. # Prefetch results due to the many-to-many relationship - exp = em.Experiment.objects.all().prefetch_related("experiment_results") + exp = em.Experiment.objects.filter(dataset_id=dataset_id).prefetch_related("experiment_results") exp_result2exp = dict() for row in exp.values("id", "experiment_results__identifier"): exp_result2exp[row["experiment_results__identifier"]] = row["id"] @@ -201,8 +201,8 @@ def ingest_derived_experiment_results(json_data: list[dict]) -> list[em.Experime return exp_res_list -# The table_id is required to fit the bento_ingest.schema.json in bento_lib, -# but it is unused. It can be set to any valid table_id or to one of the override +# The dataset_id is required to fit the bento_ingest.schema.json in bento_lib, +# but it is unused. It can be set to any valid dataset_id or to one of the override # values defined in view_ingest.py -def ingest_maf_derived_from_vcf_workflow(json_data, _dataset_id: str) -> list[em.ExperimentResult]: - return ingest_derived_experiment_results(json_data) +def ingest_maf_derived_from_vcf_workflow(json_data, dataset_id: str) -> list[em.ExperimentResult]: + return ingest_derived_experiment_results(json_data, dataset_id) diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index eaad9ec75..c1c00c205 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -235,9 +235,6 @@ task vcf_2_maf { ' done - - echo "end loop" >> /tmp/dump.tsv - >>> output { @@ -303,14 +300,7 @@ task katsu_update_experiment_results_with_maf { } }) - EXPERIMENT_RESULTS_JSON = path.join("~{run_dir}", "experiment_results_maf.json") - with open(EXPERIMENT_RESULTS_JSON, "w") as file_handle: - json.dump(maf_exp_res_list, file_handle) - # Ingest metadata about MAF files into Katsu - # The following passes an absolute path to the current working directory. - # As Katsu has the /wes/tmp/ volume mounted with the same path - # internally, direct access to the file is guaranteed. headers = ( {"Authorization": "Bearer ~{access_token}"} @@ -318,18 +308,12 @@ task katsu_update_experiment_results_with_maf { else {} ) - metadata_url = f"~{katsu_url}/private/ingest" - data = { - "dataset_id": "FROM_DERIVED_DATA", - "workflow_id": "maf_derived_from_vcf_json", - "workflow_params": { - "derived_from_data_type": "experiment_result" - } - } + _, dataset_id = "~{project_dataset}".split(":") + metadata_url = f"~{katsu_url}/ingest/{dataset_id}/maf_derived_from_vcf_json" response = requests.post( metadata_url, headers=headers, - json=data, + json=maf_exp_res_list, verify=~{true="True" false="False" validate_ssl}, ) response.raise_for_status() From 5e86fa9dee0363c6b2df964e6faea358e6512f24 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 22 Mar 2024 10:02:45 -0400 Subject: [PATCH 04/12] test: fix test data for derived experiment result --- .../tests/example_derived_experiment_result.json | 2 +- .../chord/tests/test_api_export.py | 12 ------------ 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/chord_metadata_service/chord/tests/example_derived_experiment_result.json b/chord_metadata_service/chord/tests/example_derived_experiment_result.json index d779cce40..4204acfae 100644 --- a/chord_metadata_service/chord/tests/example_derived_experiment_result.json +++ b/chord_metadata_service/chord/tests/example_derived_experiment_result.json @@ -4,6 +4,7 @@ "description": "test add to experiment results", "filename": "sample1_01.vcf.gz.maf", "file_format": "MAF", + "url": "drs://wherever.org/aaaa-bbbb-12345678", "data_output_type": "Derived data", "usage": "Downloaded", "creation_date": "01-09-2021", @@ -26,7 +27,6 @@ "genome_assembly_id": "GRCh37", "extra_properties": { "derived_from": "sample1_02", - "uri": "drs://wherever.org/aaaa-bbbb-123456789" } } ] \ No newline at end of file diff --git a/chord_metadata_service/chord/tests/test_api_export.py b/chord_metadata_service/chord/tests/test_api_export.py index 673d13448..ab8815933 100644 --- a/chord_metadata_service/chord/tests/test_api_export.py +++ b/chord_metadata_service/chord/tests/test_api_export.py @@ -9,7 +9,6 @@ from rest_framework import status from rest_framework.test import APITestCase -from ..workflows.metadata import workflow_set from chord_metadata_service.chord.models import Project, Dataset from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP from chord_metadata_service.chord.workflows.metadata import WORKFLOW_PHENOPACKETS_JSON @@ -18,17 +17,6 @@ from .example_ingest import EXAMPLE_INGEST_PHENOPACKET -def generate_phenopackets_ingest(table_id): - return { - "table_id": table_id, - "workflow_id": WORKFLOW_PHENOPACKETS_JSON, - "workflow_metadata": workflow_set.get_workflow(WORKFLOW_PHENOPACKETS_JSON).model_dump(mode="json"), - "workflow_params": { - "json_document": "" # TODO - } - } - - class ExportTest(APITestCase): def setUp(self) -> None: # Creates a test database and populate with a phenopacket test file From 737b31c407984f9863e4a64f469cf444aa5cd234 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 22 Mar 2024 10:18:35 -0400 Subject: [PATCH 05/12] test: more cbioportal experiment result test fixes --- .../chord/tests/example_derived_experiment_result.json | 6 +++--- chord_metadata_service/chord/tests/test_ingest.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/chord_metadata_service/chord/tests/example_derived_experiment_result.json b/chord_metadata_service/chord/tests/example_derived_experiment_result.json index 4204acfae..a2401d87b 100644 --- a/chord_metadata_service/chord/tests/example_derived_experiment_result.json +++ b/chord_metadata_service/chord/tests/example_derived_experiment_result.json @@ -11,8 +11,7 @@ "created_by": "Admin", "genome_assembly_id": "GRCh37", "extra_properties": { - "derived_from": "sample1_01", - "uri": "drs://wherever.org/aaaa-bbbb-12345678" + "derived_from": "sample1_01" } }, { @@ -20,13 +19,14 @@ "description": "test add to experiment results", "filename": "sample1_02.vcf.gz.maf", "file_format": "MAF", + "url": "drs://wherever.org/aaaa-bbbb-123456789", "data_output_type": "Derived data", "usage": "Downloaded", "creation_date": "01-09-2021", "created_by": "Admin", "genome_assembly_id": "GRCh37", "extra_properties": { - "derived_from": "sample1_02", + "derived_from": "sample1_02" } } ] \ No newline at end of file diff --git a/chord_metadata_service/chord/tests/test_ingest.py b/chord_metadata_service/chord/tests/test_ingest.py index 468505475..f58aa0add 100644 --- a/chord_metadata_service/chord/tests/test_ingest.py +++ b/chord_metadata_service/chord/tests/test_ingest.py @@ -1,6 +1,5 @@ -from django.test import TestCase -from chord_metadata_service.chord.ingest.views import DATASET_ID_OVERRIDES from dateutil.parser import isoparse +from django.test import TestCase from chord_metadata_service.chord.models import Project, Dataset from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP @@ -242,7 +241,7 @@ def test_ingesting_experiment_results_json(self): ) # ingest list of experiment results experiment_results = WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_MAF_DERIVED_FROM_VCF_JSON]( - EXAMPLE_INGEST_EXPERIMENT_RESULT, DATASET_ID_OVERRIDES + EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier ) self.assertEqual(len(experiment_results), len(EXAMPLE_INGEST_EXPERIMENT_RESULT)) # check that it has been linked to the same experiment as the file it From a8f4fb049f251e8c9bad3a3020774fddf1fe7174 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Fri, 22 Mar 2024 11:46:39 -0400 Subject: [PATCH 06/12] refact: remove dummy derived experiment result ingest workflow replace with a custom endpoint for now. perhaps there's a better soln... --- .../chord/ingest/__init__.py | 3 +- .../chord/ingest/experiments.py | 9 +-- chord_metadata_service/chord/ingest/views.py | 65 ++++++++++++------- .../chord/tests/test_api_ingest.py | 6 +- .../chord/tests/test_export_cbio.py | 6 +- .../chord/tests/test_ingest.py | 4 +- chord_metadata_service/chord/urls.py | 2 + .../chord/workflows/metadata.py | 19 ------ .../wdls/maf_derived_from_vcf_json.wdl | 28 -------- .../chord/workflows/wdls/vcf2maf.wdl | 2 +- 10 files changed, 52 insertions(+), 92 deletions(-) delete mode 100644 chord_metadata_service/chord/workflows/wdls/maf_derived_from_vcf_json.wdl diff --git a/chord_metadata_service/chord/ingest/__init__.py b/chord_metadata_service/chord/ingest/__init__.py index 465b970a5..d9ea6e268 100644 --- a/chord_metadata_service/chord/ingest/__init__.py +++ b/chord_metadata_service/chord/ingest/__init__.py @@ -1,6 +1,6 @@ from chord_metadata_service.chord.workflows import metadata as wm -from .experiments import ingest_experiments_workflow, ingest_maf_derived_from_vcf_workflow +from .experiments import ingest_experiments_workflow from .fhir import ingest_fhir_workflow from .phenopackets import ingest_phenopacket_workflow from .readsets import ingest_readset_workflow @@ -16,5 +16,4 @@ wm.WORKFLOW_PHENOPACKETS_JSON: ingest_phenopacket_workflow, wm.WORKFLOW_FHIR_JSON: ingest_fhir_workflow, wm.WORKFLOW_READSET: ingest_readset_workflow, - wm.WORKFLOW_MAF_DERIVED_FROM_VCF_JSON: ingest_maf_derived_from_vcf_workflow, } diff --git a/chord_metadata_service/chord/ingest/experiments.py b/chord_metadata_service/chord/ingest/experiments.py index 9088021c0..4743ce57c 100644 --- a/chord_metadata_service/chord/ingest/experiments.py +++ b/chord_metadata_service/chord/ingest/experiments.py @@ -17,7 +17,7 @@ "validate_experiment", "ingest_experiment", "ingest_experiments_workflow", - "ingest_maf_derived_from_vcf_workflow", + "ingest_derived_experiment_results", ] from .exceptions import IngestError @@ -199,10 +199,3 @@ def ingest_derived_experiment_results(json_data: list[dict], dataset_id: str) -> exp_res_list.append(new_experiment_results) return exp_res_list - - -# The dataset_id is required to fit the bento_ingest.schema.json in bento_lib, -# but it is unused. It can be set to any valid dataset_id or to one of the override -# values defined in view_ingest.py -def ingest_maf_derived_from_vcf_workflow(json_data, dataset_id: str) -> list[em.ExperimentResult]: - return ingest_derived_experiment_results(json_data, dataset_id) diff --git a/chord_metadata_service/chord/ingest/views.py b/chord_metadata_service/chord/ingest/views.py index af06f0379..06939517e 100644 --- a/chord_metadata_service/chord/ingest/views.py +++ b/chord_metadata_service/chord/ingest/views.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import traceback import uuid @@ -8,47 +7,36 @@ from django.db import transaction from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import AllowAny +from rest_framework.request import Request as DrfRequest from rest_framework.response import Response +from typing import Any, Callable from bento_lib.responses import errors +from chord_metadata_service.logger import logger +from chord_metadata_service.chord.models import Dataset +from . import experiments from . import WORKFLOW_INGEST_FUNCTION_MAP from .exceptions import IngestError -from ..models import Dataset -FROM_DERIVED_DATA = "FROM_DERIVED_DATA" -DATASET_ID_OVERRIDES = {FROM_DERIVED_DATA} # These special values skip the checks on the table - -logger = logging.getLogger(__name__) - - -@api_view(["POST"]) -@permission_classes([AllowAny]) -def ingest_into_dataset(request, dataset_id: str, workflow_id: str): - logger.info(f"Received a {workflow_id} ingest request for dataset {dataset_id}.") - - # Check that the workflow exists - if workflow_id not in WORKFLOW_INGEST_FUNCTION_MAP: - return Response(errors.bad_request_error(f"Ingestion workflow ID {workflow_id} does not exist"), status=400) - - if dataset_id not in DATASET_ID_OVERRIDES: - if not Dataset.objects.filter(identifier=dataset_id).exists(): - return Response(errors.bad_request_error(f"Dataset with ID {dataset_id} does not exist"), status=400) - dataset_id = str(uuid.UUID(dataset_id)) # Normalize dataset ID to UUID's str format. - +def call_ingest_function_and_handle(fn: Callable[[Any, str], Any], data, dataset_id: str) -> Response: try: with transaction.atomic(): # Wrap ingestion in a transaction, so if it fails we don't end up in a partial state in the database. - WORKFLOW_INGEST_FUNCTION_MAP[workflow_id](request.data, dataset_id) + fn(data, dataset_id) except IngestError as e: - return Response(errors.bad_request_error(f"Encountered ingest error: {e}"), status=400) + err = f"Encountered ingest error: {e}\n{traceback.format_exc()}" + logger.error(err) + return Response(errors.bad_request_error(err), status=400) except ValidationError as e: + validation_errors = tuple(e.error_list if hasattr(e, "error_list") else e.error_dict.items()) + logger.error(f"Encountered validation errors during ingestion: {validation_errors}") return Response(errors.bad_request_error( "Encountered validation errors during ingestion", - *(e.error_list if hasattr(e, "error_list") else e.error_dict.items()), + *validation_errors, )) except Exception as e: @@ -57,3 +45,30 @@ def ingest_into_dataset(request, dataset_id: str, workflow_id: str): return Response(errors.internal_server_error(f"Encountered an exception while processing an ingest attempt " f"(error: {repr(e)}"), status=500) return Response(status=204) + + +@api_view(["POST"]) +@permission_classes([AllowAny]) +def ingest_derived_experiment_results(request: DrfRequest, dataset_id: str): + return call_ingest_function_and_handle(experiments.ingest_derived_experiment_results, request.data, dataset_id) + + +@api_view(["POST"]) +@permission_classes([AllowAny]) +def ingest_into_dataset(request: DrfRequest, dataset_id: str, workflow_id: str): + logger.info(f"Received a {workflow_id} ingest request for dataset {dataset_id}.") + + # Check that the workflow exists + if workflow_id not in WORKFLOW_INGEST_FUNCTION_MAP: + err = f"Ingestion workflow ID {workflow_id} does not exist" + logger.error(f"Error encountered while ingesting into dataset {dataset_id}: {err}") + return Response(errors.bad_request_error(err), status=400) + + if not Dataset.objects.filter(identifier=dataset_id).exists(): + err = f"Dataset with ID {dataset_id} does not exist" + logger.error( + f"Error encountered while ingesting into dataset {dataset_id} with workflow {workflow_id}: {err}") + return Response(errors.bad_request_error(err), status=400) + dataset_id = str(uuid.UUID(dataset_id)) # Normalize dataset ID to UUID's str format. + + return call_ingest_function_and_handle(WORKFLOW_INGEST_FUNCTION_MAP[workflow_id], request.data, dataset_id) diff --git a/chord_metadata_service/chord/tests/test_api_ingest.py b/chord_metadata_service/chord/tests/test_api_ingest.py index 0dc865de3..633be91cc 100644 --- a/chord_metadata_service/chord/tests/test_api_ingest.py +++ b/chord_metadata_service/chord/tests/test_api_ingest.py @@ -69,13 +69,13 @@ def test_phenopackets_ingest(self): ) self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - # Bad ingestion body JSON + # Bad ingestion body JSON - JSON parse error 400 r = self.client.post( reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")), content_type="application/json", - data="\{\}\}", # noqa: W605 + data="{}}", # noqa: W605 ) - self.assertEqual(r.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) + self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) # Invalid phenopacket JSON validation invalid_phenopacket = load_local_json("example_invalid_phenopacket.json") diff --git a/chord_metadata_service/chord/tests/test_export_cbio.py b/chord_metadata_service/chord/tests/test_export_cbio.py index 342e914d8..e4f01f0e3 100644 --- a/chord_metadata_service/chord/tests/test_export_cbio.py +++ b/chord_metadata_service/chord/tests/test_export_cbio.py @@ -19,9 +19,9 @@ from chord_metadata_service.chord.models import Project, Dataset from chord_metadata_service.experiments.models import ExperimentResult from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP +from chord_metadata_service.chord.ingest.experiments import ingest_derived_experiment_results from chord_metadata_service.chord.workflows.metadata import ( WORKFLOW_EXPERIMENTS_JSON, - WORKFLOW_MAF_DERIVED_FROM_VCF_JSON, WORKFLOW_PHENOPACKETS_JSON, ) from chord_metadata_service.patients.models import Individual @@ -51,9 +51,7 @@ def setUp(self) -> None: EXAMPLE_INGEST_EXPERIMENT, self.d.identifier ) # append derived MAF files to experiment results - WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_MAF_DERIVED_FROM_VCF_JSON]( - EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier - ) + ingest_derived_experiment_results(EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier) self.exp_res = ExperimentResult.objects.all() @staticmethod diff --git a/chord_metadata_service/chord/tests/test_ingest.py b/chord_metadata_service/chord/tests/test_ingest.py index f58aa0add..be2097799 100644 --- a/chord_metadata_service/chord/tests/test_ingest.py +++ b/chord_metadata_service/chord/tests/test_ingest.py @@ -7,6 +7,7 @@ from chord_metadata_service.chord.ingest.experiments import ( validate_experiment, ingest_experiment, + ingest_derived_experiment_results, ) from chord_metadata_service.chord.ingest.schema import schema_validation from chord_metadata_service.chord.ingest.phenopackets import ( @@ -16,7 +17,6 @@ ) from chord_metadata_service.chord.workflows.metadata import ( WORKFLOW_EXPERIMENTS_JSON, - WORKFLOW_MAF_DERIVED_FROM_VCF_JSON, WORKFLOW_PHENOPACKETS_JSON, ) from chord_metadata_service.phenopackets.models import Biosample, PhenotypicFeature, Phenopacket @@ -240,7 +240,7 @@ def test_ingesting_experiment_results_json(self): EXAMPLE_INGEST_EXPERIMENT, self.d.identifier ) # ingest list of experiment results - experiment_results = WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_MAF_DERIVED_FROM_VCF_JSON]( + experiment_results = ingest_derived_experiment_results( EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier ) self.assertEqual(len(experiment_results), len(EXAMPLE_INGEST_EXPERIMENT_RESULT)) diff --git a/chord_metadata_service/chord/urls.py b/chord_metadata_service/chord/urls.py index 9a13dd5bc..d90521515 100644 --- a/chord_metadata_service/chord/urls.py +++ b/chord_metadata_service/chord/urls.py @@ -13,6 +13,8 @@ path('private/export', views_export.export, name="export"), + path('ingest-derived-experiment-results/', views_ingest.ingest_derived_experiment_results, + name="ingest-derived-experiment-results"), path('ingest//', views_ingest.ingest_into_dataset, name="ingest-into-dataset"), path('data-types', views_data_types.data_type_list, name="data-type-list"), diff --git a/chord_metadata_service/chord/workflows/metadata.py b/chord_metadata_service/chord/workflows/metadata.py index 9b023dafc..bd82456cc 100644 --- a/chord_metadata_service/chord/workflows/metadata.py +++ b/chord_metadata_service/chord/workflows/metadata.py @@ -8,7 +8,6 @@ "WORKFLOW_FHIR_JSON", "WORKFLOW_READSET", "WORKFLOW_DOCUMENT", - "WORKFLOW_MAF_DERIVED_FROM_VCF_JSON", "WORKFLOW_VCF2MAF", "WORKFLOW_CBIOPORTAL", @@ -22,7 +21,6 @@ WORKFLOW_FHIR_JSON = "fhir_json" WORKFLOW_READSET = "readset" WORKFLOW_DOCUMENT = "document" -WORKFLOW_MAF_DERIVED_FROM_VCF_JSON = "maf_derived_from_vcf_json" WORKFLOW_VCF2MAF = "vcf2maf" WORKFLOW_CBIOPORTAL = "cbioportal" @@ -124,23 +122,6 @@ def json_file_input(id_: str, required: bool = True): ], )) -workflow_set.add_workflow(WORKFLOW_MAF_DERIVED_FROM_VCF_JSON, wm.WorkflowDefinition( - type="ingestion", - name="MAF files derived from VCF files as a JSON", - description="This ingestion workflow will add to the current experiment results MAF files that were generated from " - "VCF files found in the Dataset.", - data_type=DATA_TYPE_EXPERIMENT, # for permissions - tags=[DATA_TYPE_EXPERIMENT, WORKFLOW_TAG_CBIOPORTAL], - file="maf_derived_from_vcf_json.wdl", - inputs=[ - # injected - ACCESS_TOKEN_INPUT, - # user - PROJECT_DATASET_INPUT, - json_file_input("json_document"), - ], -)) - # Analysis workflows --------------------------------------------------------------------------------------------------- workflow_set.add_workflow(WORKFLOW_VCF2MAF, wm.WorkflowDefinition( diff --git a/chord_metadata_service/chord/workflows/wdls/maf_derived_from_vcf_json.wdl b/chord_metadata_service/chord/workflows/wdls/maf_derived_from_vcf_json.wdl deleted file mode 100644 index 70b07dff6..000000000 --- a/chord_metadata_service/chord/workflows/wdls/maf_derived_from_vcf_json.wdl +++ /dev/null @@ -1,28 +0,0 @@ -version 1.0 - -workflow maf_derived_from_vcf_json { - input { - File json_document - } - - call copy_task { - input: json_document_in = json_document - } - - output { - File json_document_out = copy_task.json_document - } -} - -task copy_task { - input { - File json_document_in - } - - command { - cp "~{json_document_in}" ingest.json - } - output { - File json_document = "ingest.json" - } -} diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index c1c00c205..f25e4cbbe 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -309,7 +309,7 @@ task katsu_update_experiment_results_with_maf { ) _, dataset_id = "~{project_dataset}".split(":") - metadata_url = f"~{katsu_url}/ingest/{dataset_id}/maf_derived_from_vcf_json" + metadata_url = f"~{katsu_url}/ingest-derived-experiment-results/{dataset_id}" response = requests.post( metadata_url, headers=headers, From 8298b3ecbcb99945f725ad206dc58c910235b5c4 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 11:23:10 -0400 Subject: [PATCH 07/12] test(chord): api test for derived experiment results --- .../chord/tests/test_api_ingest.py | 41 +++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/chord_metadata_service/chord/tests/test_api_ingest.py b/chord_metadata_service/chord/tests/test_api_ingest.py index 633be91cc..719422c3e 100644 --- a/chord_metadata_service/chord/tests/test_api_ingest.py +++ b/chord_metadata_service/chord/tests/test_api_ingest.py @@ -4,9 +4,20 @@ from rest_framework import status from rest_framework.test import APITestCase +from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP +from chord_metadata_service.chord.workflows.metadata import ( + workflow_set, + WORKFLOW_PHENOPACKETS_JSON, + WORKFLOW_EXPERIMENTS_JSON, +) from chord_metadata_service.restapi.tests.utils import load_local_json + from .constants import VALID_PROJECT_1, valid_dataset_1 -from ..workflows.metadata import workflow_set, WORKFLOW_PHENOPACKETS_JSON +from .example_ingest import ( + EXAMPLE_INGEST_PHENOPACKET, + EXAMPLE_INGEST_EXPERIMENT, + EXAMPLE_INGEST_EXPERIMENT_RESULT, +) def generate_phenopackets_ingest(dataset_id): @@ -45,7 +56,7 @@ def test_workflows(self): # TODO: Check file contents -class IngestTest(APITestCase): +class APITestCaseWithDataset(APITestCase): def setUp(self) -> None: r = self.client.post(reverse("project-list"), data=json.dumps(VALID_PROJECT_1), content_type="application/json") self.project = r.json() @@ -53,25 +64,28 @@ def setUp(self) -> None: r = self.client.post('/api/datasets', data=json.dumps(valid_dataset_1(self.project["identifier"])), content_type="application/json") self.dataset = r.json() + self.dataset_id = self.dataset["identifier"] + +class IngestTest(APITestCaseWithDataset): def test_phenopackets_ingest(self): # Invalid workflow ID r = self.client.post( - reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json_invalid")), + reverse("ingest-into-dataset", args=(self.dataset_id, "phenopackets_json_invalid")), content_type="application/json", ) self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) # No ingestion body r = self.client.post( - reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")), + reverse("ingest-into-dataset", args=(self.dataset_id, WORKFLOW_PHENOPACKETS_JSON)), content_type="application/json", ) self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) # Bad ingestion body JSON - JSON parse error 400 r = self.client.post( - reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")), + reverse("ingest-into-dataset", args=(self.dataset_id, WORKFLOW_PHENOPACKETS_JSON)), content_type="application/json", data="{}}", # noqa: W605 ) @@ -80,7 +94,7 @@ def test_phenopackets_ingest(self): # Invalid phenopacket JSON validation invalid_phenopacket = load_local_json("example_invalid_phenopacket.json") r = self.client.post( - reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")), + reverse("ingest-into-dataset", args=(self.dataset_id, WORKFLOW_PHENOPACKETS_JSON)), content_type="application/json", data=json.dumps(invalid_phenopacket), ) @@ -89,8 +103,21 @@ def test_phenopackets_ingest(self): # Success valid_phenopacket = load_local_json("example_phenopacket_v2.json") r = self.client.post( - reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")), + reverse("ingest-into-dataset", args=(self.dataset_id, WORKFLOW_PHENOPACKETS_JSON)), content_type="application/json", data=json.dumps(valid_phenopacket), ) self.assertEqual(r.status_code, status.HTTP_204_NO_CONTENT) + + +class IngestDerivedExperimentResultsTest(APITestCaseWithDataset): + def test_ingest_derived_experiment_results(self): + # ingest list of experiments + WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_PHENOPACKETS_JSON](EXAMPLE_INGEST_PHENOPACKET, self.dataset_id) + WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_EXPERIMENTS_JSON](EXAMPLE_INGEST_EXPERIMENT, self.dataset_id) + # ingest list of experiment results + self.client.post( + reverse("ingest-derived-experiment-results", args=(self.dataset_id,)), + content_type="application/json", + data=json.dumps(EXAMPLE_INGEST_EXPERIMENT_RESULT), + ) From 45e8ae6f9c8e1406756266ceb1025ac879e61f0f Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 11:24:21 -0400 Subject: [PATCH 08/12] ci: try to fix codecov upload --- .github/workflows/test.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ae4108cff..c2ef0b9c7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,8 +44,11 @@ jobs: - name: Test run: | export POSTGRES_USER="postgres" && export POSTGRES_PASSWORD="postgres" && export POSTGRES_PORT=5432 - poetry run coverage run ./manage.py test + poetry run coverage run --branch ./manage.py test + poetry run coverage xml - name: Upload Coverage to Codecov uses: codecov/codecov-action@v4 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + with: + token: ${{ secrets.CODECOV_TOKEN }} + file: ./coverage.xml + verbose: true From 5a4fd185bd56da0f8579f6f9b2669fdd470e25ea Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 11:24:53 -0400 Subject: [PATCH 09/12] test(tox): use branch coverage and generate html report --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index efe185cfc..b4073d3ce 100644 --- a/tox.ini +++ b/tox.ini @@ -11,5 +11,6 @@ allowlist_externals = poetry commands = poetry install --sync - poetry run coverage run ./manage.py test + poetry run coverage run --branch ./manage.py test + poetry run coverage html poetry run flake8 ./chord_metadata_service From b83f925afd18d0eaf2d061fd2f9326fb3eef3264 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 13:05:14 -0400 Subject: [PATCH 10/12] fix(workflows): bad validate_ssl type in vcf2maf WDL --- chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index f25e4cbbe..b2b93fc2f 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -5,7 +5,7 @@ workflow vcf2maf { String drs_url String katsu_url String access_token - String validate_ssl + Boolean validate_ssl String project_dataset String vep_cache_dir String run_dir From 2b48cbb5288b68f176343e78e05b1c278cdbc8b4 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 13:21:12 -0400 Subject: [PATCH 11/12] fix(workflows): various vcf2maf tweaks --- .../chord/workflows/wdls/vcf2maf.wdl | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index b2b93fc2f..163fcde76 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -8,7 +8,6 @@ workflow vcf2maf { Boolean validate_ssl String project_dataset String vep_cache_dir - String run_dir # Defaults (see: https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md#declared-inputs-defaults-and-overrides) String vep_species = "Homo_sapiens" # ensembl syntax @@ -23,13 +22,13 @@ workflow vcf2maf { call vcf_2_maf { input: + project_dataset = project_dataset, vcf_files = katsu_dataset_export_vcf.vcf_files, vep_species = vep_species, vep_cache_dir = vep_cache_dir, drs_url = drs_url, access_token = access_token, validate_ssl = validate_ssl, - run_dir = run_dir } call katsu_update_experiment_results_with_maf { @@ -39,7 +38,6 @@ workflow vcf2maf { katsu_url = katsu_url, access_token = access_token, validate_ssl = validate_ssl, - run_dir = run_dir } output { @@ -97,7 +95,7 @@ task katsu_dataset_export_vcf { # TODO add a default global parameter for when genome_assembly_id # is not defined on experiment results records. - assembly_id = result.get("genome_assembly_id", "GRCh37") + assembly_id = result.get("genome_assembly_id", "GRCh38") # Query DRS with the filename to get the absolute file path in # DRS for processing. @@ -137,13 +135,13 @@ task katsu_dataset_export_vcf { task vcf_2_maf { input { + String project_dataset File vcf_files String vep_species String vep_cache_dir String drs_url String access_token Boolean validate_ssl - String run_dir } # Enclosing command with curly braces {} causes issues with parsing in this @@ -162,7 +160,7 @@ task vcf_2_maf { # prepare file names export vcf_file_name=$(basename ${orig_vcf_filename}) filtered_vcf=$(echo ${vcf_file_name} | sed 's/\(.*\.\)vcf\.gz/\1filtered\.vcf/') - export maf=~{run_dir}/${vcf_file_name}.maf + export maf=${vcf_file_name}.maf # filter out variants that are homozyguous and identical to assemby ref. bcftools view -i 'GT[*]="alt"' ${g_vcf} > ${filtered_vcf} @@ -206,20 +204,23 @@ task vcf_2_maf { import os import sys - params = { - "path": os.environ["maf"], - "deduplicate": True - } + project_id, dataset_id = "~{project_dataset}".split(":") - drs_url = "~{drs_url}/ingest" try: - response = requests.post( - drs_url, - headers={"Authorization": "Bearer ~{access_token}"} if "~{access_token}" else {}, - json=params, - verify=~{true="True" false="False" validate_ssl}, - ) - response.raise_for_status() + with open(os.environ["maf"], "r") as fh: + response = requests.post( + "~{drs_url}/ingest", + headers={"Authorization": "Bearer ~{access_token}"} if "~{access_token}" else {}, + files={"file": fh}, + data={ + "deduplicate": True, + "project_id": project_id, + "dataset_id": dataset_id, + "data_type": "experiment", + }, + verify=~{true="True" false="False" validate_ssl}, + ) + response.raise_for_status() except requests.exceptions.RequestException as e: msg = e.response.json() if hasattr(e, "response") else "" @@ -230,7 +231,7 @@ task vcf_2_maf { with open("maf.list.tsv", "a") as maf_list_fh: maf_list_fh.write( - os.environ["vcf_file_name"] + "\t" + os.path.basename(params["path"]) + "\t" + uri + "\n" + os.environ["vcf_file_name"] + "\t" + os.path.basename(os.environ["maf"]) + "\t" + uri + "\n" ) ' @@ -253,7 +254,6 @@ task katsu_update_experiment_results_with_maf { String katsu_url String access_token Boolean validate_ssl - String run_dir File experiment_results_json File maf_list } @@ -294,7 +294,7 @@ task katsu_update_experiment_results_with_maf { "usage": "Downloaded", "creation_date": date.today().isoformat(), "created_by": "Bento", - "genome_assembly_id": vcf_props.get("genome_assembly_id", "GRCh37"), # TODO: make fallback a parameter + "genome_assembly_id": vcf_props.get("genome_assembly_id", "GRCh38"), # TODO: make fallback a parameter "extra_properties": { "derived_from": vcf_props["identifier"] } From ab64fa6e86faae50834e42cfd9bd1c84c886042e Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 3 Apr 2024 13:51:53 -0400 Subject: [PATCH 12/12] fix(workflows): provide access token when making exp res get req [vcf2maf] --- chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl index 163fcde76..87438d2ec 100644 --- a/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl +++ b/chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl @@ -17,6 +17,7 @@ workflow vcf2maf { input: project_dataset = project_dataset, drs_url = drs_url, katsu_url = katsu_url, + access_token = access_token, validate_ssl = validate_ssl } @@ -50,6 +51,7 @@ task katsu_dataset_export_vcf { String project_dataset String drs_url String katsu_url + String access_token Boolean validate_ssl } @@ -75,7 +77,11 @@ task katsu_dataset_export_vcf { # TODO: handle pagination, i.e. if the `next` property is set, loop # over the pages of results metadata_url = f"~{katsu_url}/api/experimentresults?datasets={dataset_id}&file_format=vcf&page_size=10000" - response = requests.get(metadata_url, verify=~{true="True" false="False" validate_ssl}) + response = requests.get( + metadata_url, + headers={"Authorization": "Bearer ~{access_token}"} if "~{access_token}" else {}, + verify=~{true="True" false="False" validate_ssl}, + ) r = response.json() if r["count"] == 0: