Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact!: derived experiment result ingestion changes #494

Merged
merged 14 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions chord_metadata_service/chord/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from chord_metadata_service.chord.workflows import metadata as wm

from .experiments import ingest_experiments_workflow, ingest_maf_derived_from_vcf_workflow
from .experiments import ingest_experiments_workflow
from .fhir import ingest_fhir_workflow
from .phenopackets import ingest_phenopacket_workflow
from .readsets import ingest_readset_workflow
Expand All @@ -16,5 +16,4 @@
wm.WORKFLOW_PHENOPACKETS_JSON: ingest_phenopacket_workflow,
wm.WORKFLOW_FHIR_JSON: ingest_fhir_workflow,
wm.WORKFLOW_READSET: ingest_readset_workflow,
wm.WORKFLOW_MAF_DERIVED_FROM_VCF_JSON: ingest_maf_derived_from_vcf_workflow,
}
9 changes: 1 addition & 8 deletions chord_metadata_service/chord/ingest/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"validate_experiment",
"ingest_experiment",
"ingest_experiments_workflow",
"ingest_maf_derived_from_vcf_workflow",
"ingest_derived_experiment_results",
]

from .exceptions import IngestError
Expand Down Expand Up @@ -199,10 +199,3 @@ def ingest_derived_experiment_results(json_data: list[dict], dataset_id: str) ->
exp_res_list.append(new_experiment_results)

return exp_res_list


# The dataset_id is required to fit the bento_ingest.schema.json in bento_lib,
# but it is unused. It can be set to any valid dataset_id or to one of the override
# values defined in view_ingest.py
def ingest_maf_derived_from_vcf_workflow(json_data, dataset_id: str) -> list[em.ExperimentResult]:
return ingest_derived_experiment_results(json_data, dataset_id)
65 changes: 40 additions & 25 deletions chord_metadata_service/chord/ingest/views.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,42 @@
from __future__ import annotations

import logging
import traceback
import uuid

from django.core.exceptions import ValidationError
from django.db import transaction
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.request import Request as DrfRequest
from rest_framework.response import Response
from typing import Any, Callable

from bento_lib.responses import errors

from chord_metadata_service.logger import logger
from chord_metadata_service.chord.models import Dataset
from . import experiments
from . import WORKFLOW_INGEST_FUNCTION_MAP
from .exceptions import IngestError
from ..models import Dataset


FROM_DERIVED_DATA = "FROM_DERIVED_DATA"
DATASET_ID_OVERRIDES = {FROM_DERIVED_DATA} # These special values skip the checks on the table

logger = logging.getLogger(__name__)


@api_view(["POST"])
@permission_classes([AllowAny])
def ingest_into_dataset(request, dataset_id: str, workflow_id: str):
logger.info(f"Received a {workflow_id} ingest request for dataset {dataset_id}.")

# Check that the workflow exists
if workflow_id not in WORKFLOW_INGEST_FUNCTION_MAP:
return Response(errors.bad_request_error(f"Ingestion workflow ID {workflow_id} does not exist"), status=400)

if dataset_id not in DATASET_ID_OVERRIDES:
if not Dataset.objects.filter(identifier=dataset_id).exists():
return Response(errors.bad_request_error(f"Dataset with ID {dataset_id} does not exist"), status=400)
dataset_id = str(uuid.UUID(dataset_id)) # Normalize dataset ID to UUID's str format.

def call_ingest_function_and_handle(fn: Callable[[Any, str], Any], data, dataset_id: str) -> Response:
try:
with transaction.atomic():
# Wrap ingestion in a transaction, so if it fails we don't end up in a partial state in the database.
WORKFLOW_INGEST_FUNCTION_MAP[workflow_id](request.data, dataset_id)
fn(data, dataset_id)

except IngestError as e:
return Response(errors.bad_request_error(f"Encountered ingest error: {e}"), status=400)
err = f"Encountered ingest error: {e}\n{traceback.format_exc()}"
logger.error(err)
return Response(errors.bad_request_error(err), status=400)

except ValidationError as e:
validation_errors = tuple(e.error_list if hasattr(e, "error_list") else e.error_dict.items())
logger.error(f"Encountered validation errors during ingestion: {validation_errors}")
return Response(errors.bad_request_error(
"Encountered validation errors during ingestion",
*(e.error_list if hasattr(e, "error_list") else e.error_dict.items()),
*validation_errors,
))

except Exception as e:
Expand All @@ -57,3 +45,30 @@ def ingest_into_dataset(request, dataset_id: str, workflow_id: str):
return Response(errors.internal_server_error(f"Encountered an exception while processing an ingest attempt "
f"(error: {repr(e)}"), status=500)
return Response(status=204)


@api_view(["POST"])
@permission_classes([AllowAny])
def ingest_derived_experiment_results(request: DrfRequest, dataset_id: str):
return call_ingest_function_and_handle(experiments.ingest_derived_experiment_results, request.data, dataset_id)


@api_view(["POST"])
@permission_classes([AllowAny])
def ingest_into_dataset(request: DrfRequest, dataset_id: str, workflow_id: str):
logger.info(f"Received a {workflow_id} ingest request for dataset {dataset_id}.")

# Check that the workflow exists
if workflow_id not in WORKFLOW_INGEST_FUNCTION_MAP:
err = f"Ingestion workflow ID {workflow_id} does not exist"
logger.error(f"Error encountered while ingesting into dataset {dataset_id}: {err}")
return Response(errors.bad_request_error(err), status=400)

if not Dataset.objects.filter(identifier=dataset_id).exists():
err = f"Dataset with ID {dataset_id} does not exist"
logger.error(
f"Error encountered while ingesting into dataset {dataset_id} with workflow {workflow_id}: {err}")
return Response(errors.bad_request_error(err), status=400)
dataset_id = str(uuid.UUID(dataset_id)) # Normalize dataset ID to UUID's str format.

return call_ingest_function_and_handle(WORKFLOW_INGEST_FUNCTION_MAP[workflow_id], request.data, dataset_id)
6 changes: 3 additions & 3 deletions chord_metadata_service/chord/tests/test_api_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ def test_phenopackets_ingest(self):
)
self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)

# Bad ingestion body JSON
# Bad ingestion body JSON - JSON parse error 400
r = self.client.post(
reverse("ingest-into-dataset", args=(self.dataset["identifier"], "phenopackets_json")),
content_type="application/json",
data="\{\}\}", # noqa: W605
data="{}}", # noqa: W605
)
self.assertEqual(r.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)

# Invalid phenopacket JSON validation
invalid_phenopacket = load_local_json("example_invalid_phenopacket.json")
Expand Down
6 changes: 2 additions & 4 deletions chord_metadata_service/chord/tests/test_export_cbio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
from chord_metadata_service.chord.models import Project, Dataset
from chord_metadata_service.experiments.models import ExperimentResult
from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP
from chord_metadata_service.chord.ingest.experiments import ingest_derived_experiment_results
from chord_metadata_service.chord.workflows.metadata import (
WORKFLOW_EXPERIMENTS_JSON,
WORKFLOW_MAF_DERIVED_FROM_VCF_JSON,
WORKFLOW_PHENOPACKETS_JSON,
)
from chord_metadata_service.patients.models import Individual
Expand Down Expand Up @@ -51,9 +51,7 @@ def setUp(self) -> None:
EXAMPLE_INGEST_EXPERIMENT, self.d.identifier
)
# append derived MAF files to experiment results
WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_MAF_DERIVED_FROM_VCF_JSON](
EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier
)
ingest_derived_experiment_results(EXAMPLE_INGEST_EXPERIMENT_RESULT, self.d.identifier)
self.exp_res = ExperimentResult.objects.all()

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions chord_metadata_service/chord/tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from chord_metadata_service.chord.ingest.experiments import (
validate_experiment,
ingest_experiment,
ingest_derived_experiment_results,
)
from chord_metadata_service.chord.ingest.schema import schema_validation
from chord_metadata_service.chord.ingest.phenopackets import (
Expand All @@ -15,7 +16,6 @@
from chord_metadata_service.chord.tests.helpers import ModelFieldsTestMixin, ProjectTestCase
from chord_metadata_service.chord.workflows.metadata import (
WORKFLOW_EXPERIMENTS_JSON,
WORKFLOW_MAF_DERIVED_FROM_VCF_JSON,
WORKFLOW_PHENOPACKETS_JSON,
)
from chord_metadata_service.phenopackets.models import Biosample, PhenotypicFeature, Phenopacket
Expand Down Expand Up @@ -286,7 +286,7 @@ def test_ingesting_experiment_results_json(self):
EXAMPLE_INGEST_EXPERIMENT, self.dataset.identifier
)
# ingest list of experiment results
experiment_results = WORKFLOW_INGEST_FUNCTION_MAP[WORKFLOW_MAF_DERIVED_FROM_VCF_JSON](
experiment_results = ingest_derived_experiment_results(
EXAMPLE_INGEST_EXPERIMENT_RESULT, self.dataset.identifier
)
self.assertEqual(len(experiment_results), len(EXAMPLE_INGEST_EXPERIMENT_RESULT))
Expand Down
2 changes: 2 additions & 0 deletions chord_metadata_service/chord/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

path('private/export', views_export.export, name="export"),

path('ingest-derived-experiment-results/<str:dataset_id>', views_ingest.ingest_derived_experiment_results,
davidlougheed marked this conversation as resolved.
Show resolved Hide resolved
name="ingest-derived-experiment-results"),
path('ingest/<str:dataset_id>/<str:workflow_id>', views_ingest.ingest_into_dataset, name="ingest-into-dataset"),

path('data-types', views_data_types.data_type_list, name="data-type-list"),
Expand Down
19 changes: 0 additions & 19 deletions chord_metadata_service/chord/workflows/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"WORKFLOW_FHIR_JSON",
"WORKFLOW_READSET",
"WORKFLOW_DOCUMENT",
"WORKFLOW_MAF_DERIVED_FROM_VCF_JSON",
"WORKFLOW_VCF2MAF",
"WORKFLOW_CBIOPORTAL",

Expand All @@ -22,7 +21,6 @@
WORKFLOW_FHIR_JSON = "fhir_json"
WORKFLOW_READSET = "readset"
WORKFLOW_DOCUMENT = "document"
WORKFLOW_MAF_DERIVED_FROM_VCF_JSON = "maf_derived_from_vcf_json"
WORKFLOW_VCF2MAF = "vcf2maf"
WORKFLOW_CBIOPORTAL = "cbioportal"

Expand Down Expand Up @@ -124,23 +122,6 @@ def json_file_input(id_: str, required: bool = True):
],
))

workflow_set.add_workflow(WORKFLOW_MAF_DERIVED_FROM_VCF_JSON, wm.WorkflowDefinition(
type="ingestion",
name="MAF files derived from VCF files as a JSON",
description="This ingestion workflow will add to the current experiment results MAF files that were generated from "
"VCF files found in the Dataset.",
data_type=DATA_TYPE_EXPERIMENT, # for permissions
tags=[DATA_TYPE_EXPERIMENT, WORKFLOW_TAG_CBIOPORTAL],
file="maf_derived_from_vcf_json.wdl",
inputs=[
# injected
ACCESS_TOKEN_INPUT,
# user
PROJECT_DATASET_INPUT,
json_file_input("json_document"),
],
))

# Analysis workflows ---------------------------------------------------------------------------------------------------

workflow_set.add_workflow(WORKFLOW_VCF2MAF, wm.WorkflowDefinition(
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion chord_metadata_service/chord/workflows/wdls/vcf2maf.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ task katsu_update_experiment_results_with_maf {
)

_, dataset_id = "~{project_dataset}".split(":")
metadata_url = f"~{katsu_url}/ingest/{dataset_id}/maf_derived_from_vcf_json"
metadata_url = f"~{katsu_url}/ingest-derived-experiment-results/{dataset_id}"
response = requests.post(
metadata_url,
headers=headers,
Expand Down
Loading