Skip to content

Commit

Permalink
Processing versioning tasks (#182)
Browse files Browse the repository at this point in the history
* Pushing up pulled in code changes from the annotation-task refactor, pairing and other work done in the last week, before losing power and wifi.

* Was able to process versioning tasks for annotation units. Skipped all other tasks for now, will be working on that next. Updated test fixture's annotation-configuration to match current Rosalution annotation configuration. Paired with Angelina on some of this stuff.

* testing process tasks for datasets without dependencies and datasets with dependencies

* Tests pass for CPAM0002, need to rework CPAM0046

* Paired with Rabab to refactor how we manage skipping a dependency for unit tests when processing annotation unit tasks; agreed upon a base set of datasets to use in configuration; and updated the neccesary code patching

* Missed a file.

* got it working; heck ya

* tests passing, linting & formatting passing

* wip for genomic units;linting; and formatting

* finished cleaning up genomic unit unit tests and added parameterized test methods to have more then one test case peer unit test

* wip

* wip to get annotation by analysis name'

* backend wip for getting annotations by analysis

* rabab & angelina pair for getting version result

* formatted backend files

* Able to retrieve version for 'rest' versioning type. Hardcoded 'rosalution' type version for rosalution's manifest. Paired with Angelina on Wednesday to create a couple of helper functions for testing. Thursday - Rabab worked on combining & testing all 3 versioning types in one test.

* Fixed some of the linting errors

* Retrieve and show annotations (#180)

* Updated backend to include annotation retrieval for dependencies and ui

* Updated unit tests and integration to pass

* finished added the version calcs and fixed version retrieval, checking if transcripts exists is still broken, the case where transcript_id and no transcripts are listed in the variant is the case that needs to be fixed

* fixed creatining multiple genomic units when uploading twice, investigating why transcripts are showing as not existing when they do

* wrapped up cleaning the tests; removed extra logging; linted and formatted; paired with Rabab

---------

Co-authored-by: SeriousHorncat <ange.unoantonison@gmail.com>
  • Loading branch information
fatimarabab and SeriousHorncat committed Sep 23, 2024
1 parent 40c806f commit 10783fe
Show file tree
Hide file tree
Showing 31 changed files with 3,630 additions and 1,612 deletions.
119 changes: 73 additions & 46 deletions backend/src/core/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import logging
import queue

from .annotation_task import AnnotationTaskFactory
from ..repository.analysis_collection import AnalysisCollection
from ..repository.genomic_unit_collection import GenomicUnitCollection

from .annotation_task import AnnotationTaskFactory, VersionAnnotationTask
from ..models.analysis import Analysis
from ..repository.annotation_config_collection import AnnotationConfigCollection
from ..core.annotation_unit import AnnotationUnit
Expand Down Expand Up @@ -83,72 +86,96 @@ def queue_annotation_tasks(self, analysis: Analysis, annotation_task_queue: Anno
annotation_task_queue.put(annotation_unit_queued)

@staticmethod
def process_tasks(annotation_queue, genomic_unit_collection): # pylint: disable=too-many-locals
def process_tasks(
annotation_queue: AnnotationQueue, analysis_name: str, genomic_unit_collection: GenomicUnitCollection,
analysis_collection: AnalysisCollection
): # pylint: disable=too-many-branches,too-many-locals
"""Processes items that have been added to the queue"""
logger.info("%s Processing annotation tasks queue ...", annotation_log_label())

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
annotation_task_futures = {}
while not annotation_queue.empty():
annotation_unit = annotation_queue.get()
latest = False
if genomic_unit_collection.annotation_exist(annotation_unit.genomic_unit, annotation_unit.dataset
) and annotation_unit.is_version_latest():
logger.info('%s Annotation Exists...', format_annotation_logging(annotation_unit))
latest = True
continue
ready = True

if annotation_unit.has_dependencies():
missing_dependencies = annotation_unit.get_missing_dependencies()
for missing in missing_dependencies:
annotation_value = genomic_unit_collection.find_genomic_unit_annotation_value(
annotation_unit.genomic_unit, missing
)
ready = annotation_unit.ready_for_annotation(annotation_value, missing)

if not ready and not latest:
if annotation_unit.should_continue_annotation():
logger.info(
'%s Delaying Annotation, Missing %s Dependencies...',
format_annotation_logging(annotation_unit), annotation_unit.get_missing_dependencies()
)
annotation_queue.put(annotation_unit)
else:
logger.info(
'%s Canceling Annotation, Missing %s Dependencies...',
format_annotation_logging(annotation_unit), annotation_unit.get_missing_dependencies()
)
if not annotation_unit.version_exists():
version_task = AnnotationTaskFactory.create_version_task(annotation_unit)
logger.info('%s Creating Task To Version...', format_annotation_logging(annotation_unit))
annotation_task_futures[executor.submit(version_task.annotate)] = version_task
else:
if genomic_unit_collection.annotation_exist(annotation_unit):
logger.info('%s Annotation Exists...', format_annotation_logging(annotation_unit))
continue

if annotation_unit.has_dependencies():
missing_dependencies = annotation_unit.get_missing_dependencies()
for missing_dataset_name in missing_dependencies:
analysis_manifest_dataset = analysis_collection.get_manifest_dataset_config(
analysis_name, missing_dataset_name
)
if analysis_manifest_dataset is None:
continue

dependency_annotation_unit = AnnotationUnit(
annotation_unit.genomic_unit, analysis_manifest_dataset
)
dependency_annotation_unit.set_latest_version(analysis_manifest_dataset['version'])
annotation_value = genomic_unit_collection.find_genomic_unit_annotation_value(
dependency_annotation_unit
)
if annotation_value:
annotation_unit.set_annotation_for_dependency(missing_dataset_name, annotation_value)

continue
if not annotation_unit.conditions_met_to_gather_annotation():
if annotation_unit.should_continue_annotation():
logger.info(
'%s Delaying Annotation, Missing %s Dependencies %s/10 times...',
format_annotation_logging(annotation_unit), annotation_unit.get_missing_dependencies(),
annotation_unit.get_delay_count() + 1
)
annotation_queue.put(annotation_unit)
else:
logger.info(
'%s Canceling Annotation, Missing %s Dependencies...',
format_annotation_logging(annotation_unit), annotation_unit.get_missing_dependencies()
)
continue

task = AnnotationTaskFactory.create(annotation_unit)
logger.info('%s Creating Task To Annotate...', format_annotation_logging(annotation_unit))
annotation_task = AnnotationTaskFactory.create_annotation_task(annotation_unit)
logger.info('%s Creating Task To Annotate...', format_annotation_logging(annotation_unit))

annotation_task_futures[executor.submit(task.annotate)] = (annotation_unit.genomic_unit, task)
annotation_task_futures[executor.submit(annotation_task.annotate)] = annotation_task

for future in concurrent.futures.as_completed(annotation_task_futures):
annotation_unit.genomic_unit, annotation_task = annotation_task_futures[future]
logger.info('%s Query completed...', format_annotation_logging(annotation_unit))
task = annotation_task_futures[future]

try:
result_temp = future.result()

for annotation in annotation_task.extract(result_temp):
task_process_result = future.result()
if isinstance(task, VersionAnnotationTask):
annotation_unit = task.annotation_unit
version = task.extract_version(task_process_result)
annotation_unit.set_latest_version(version)
logger.info(
'%s Saving %s...',
format_annotation_logging(
annotation_unit, annotation_task.annotation_unit.dataset['data_set']
), annotation['value']
)
genomic_unit_collection.annotate_genomic_unit(
annotation_task.annotation_unit.genomic_unit, annotation
'%s Version Calculated %s...', format_annotation_logging(annotation_unit), version
)
annotation_queue.put(annotation_unit)
else:
for annotation in task.extract(task_process_result):
logger.info(
'%s Saving %s...',
format_annotation_logging(annotation_unit, task.annotation_unit.get_dataset_name()),
annotation['value']
)

genomic_unit_collection.annotate_genomic_unit(
task.annotation_unit.genomic_unit, annotation
)
analysis_collection.add_dataset_to_manifest(analysis_name, annotation_unit)

except FileNotFoundError as error:
logger.info(
'%s exception happened %s with %s and %s', annotation_log_label(), error,
annotation_unit.genomic_unit, annotation_task
annotation_unit.genomic_unit, task
)

del annotation_task_futures[future]
Expand Down
137 changes: 88 additions & 49 deletions backend/src/core/annotation_task.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
"""Tasks for annotating a genomic unit with datasets"""
from abc import abstractmethod
from datetime import date
import json
from random import randint
import time

# pylint: disable=too-few-public-methods
# Disabling too few public metods due to utilizing Pydantic/FastAPI BaseSettings class
import logging
import jq
import requests

from ..core.annotation_unit import AnnotationUnit

# create logger
logger = logging.getLogger(__name__)


Expand All @@ -31,14 +29,31 @@ class AnnotationTaskInterface:
def __init__(self, annotation_unit: AnnotationUnit):
self.annotation_unit = annotation_unit

def aggregate_string_replacements(self, base_string):
def aggregate_string_replacements(self, base_string) -> str:
"""
Replaces the content 'base_string' where strings within the pattern
{item} are replaced, 'item' can be the genomic unit's type such as
{gene} or {hgvs_variant} or a dataset dependency, such as {Entrez Gene Id}.
The follow are examples of the genomic_unit's dict's attributes like
genomic_unit['gene'] or genomic_unit['Entrez Gene Id']
example base string:
https://grch37.rest.ensembl.org/vep/human/hgvs/{hgvs_variant}?content-type=application/json;CADD=1;refseq=1;
return value:
https://grch37.rest.ensembl.org/vep/human/hgvs/NM_001017980.3:c.164G>T?content-type=application/json;CADD=1;refseq=1;
example base string:
.[].transcript_consequences[] | select( .transcript_id | contains(\"{transcript}\") ) | { CADD: .cadd_phred }
return value:
.[].transcript_consequences[] | select( .transcript_id | contains(\"NM_001017980\") ) | { CADD: .cadd_phred }
genomic unit within the annotation unit in this task to be
{
'hgvs_variant': "hgvs_variant",
'transcript': 'NM_001017980',
}
"""
genomic_unit_string = f"{{{self.annotation_unit.get_genomic_unit_type()}}}"
replace_string = base_string.replace(genomic_unit_string, self.annotation_unit.get_genomic_unit())
Expand All @@ -56,7 +71,12 @@ def aggregate_string_replacements(self, base_string):
def annotate(self):
"""Interface for implementation of of retrieving the annotation for a genomic unit and its set of datasets"""

def extract(self, json_result):
def __json_extract__(self, jq_query, json_to_parse):
"""Private ethod to execute jq to extract JSON"""
jq_results = iter(jq.compile(jq_query).input(json_to_parse).all())
return jq_results

def extract(self, incomming_json):
""" Interface extraction method for annotation tasks """
annotations = []

Expand All @@ -65,19 +85,18 @@ def extract(self, json_result):
if 'attribute' in self.annotation_unit.dataset: # pylint: disable=too-many-nested-blocks
annotation_unit_json = {
"data_set": self.annotation_unit.dataset['data_set'],
"data_source": self.annotation_unit.dataset['data_source'],
"version": "",
"value": "",
"data_source": self.annotation_unit.dataset['data_source'], "value": "",
"version": self.annotation_unit.version
}

replaced_attributes = self.aggregate_string_replacements(self.annotation_unit.dataset['attribute'])
jq_results = empty_gen()
try:
jq_results = iter(jq.compile(replaced_attributes).input(json_result).all())
replaced_attributes = self.aggregate_string_replacements(self.annotation_unit.dataset['attribute'])
jq_results = self.__json_extract__(replaced_attributes, incomming_json)
except ValueError as value_error:
logger.info((
'Failed to annotate "%s" from "%s" on %s with error "%s"', annotation_unit_json['data_set'],
annotation_unit_json['data_source'], json.dumps(json_result), value_error
annotation_unit_json['data_source'], json.dumps(incomming_json), value_error
))
jq_result = next(jq_results, None)
while jq_result is not None:
Expand All @@ -102,6 +121,29 @@ def extract(self, json_result):

return annotations

def extract_version(self, incoming_version_json):
""" Interface extraction method for Version Annotation tasks"""

version = ""

versioning_type = self.annotation_unit.get_dataset_version_type()
if versioning_type == "rosalution":
version = incoming_version_json["rosalution"]
elif versioning_type == "date":
version = incoming_version_json["date"]
else:
jq_query = self.annotation_unit.dataset['version_attribute']

jq_result = empty_gen()
try:
jq_result = self.__json_extract__(jq_query, incoming_version_json)
except ValueError as value_error:
logger.info(('Failed to extract version', value_error))
jq_result = next(jq_result, None)
version = jq_result

return version


class ForgeAnnotationTask(AnnotationTaskInterface):
"""
Expand Down Expand Up @@ -169,24 +211,6 @@ def annotate(self):
json_result = result.json()
return json_result

def base_url(self):
"""
Creates the base url for the annotation according to the configuration. Searches for string {genomic_unit_type}
within the 'url' attribute and replaces it with the genomic_unit being annotated.
"""
string_to_replace = f"{{{self.annotation_unit.dataset['genomic_unit_type']}}}"
replace_string = self.annotation_unit.dataset['url'].replace(
string_to_replace, self.annotation_unit.get_genomic_unit()
)

if 'dependencies' in self.annotation_unit.dataset:
for depedency in self.annotation_unit.dataset['dependencies']:
depedency_replace_string = f"{{{depedency}}}"
replace_string = replace_string.replace(
depedency_replace_string, self.annotation_unit.genomic_unit[depedency]
)
return replace_string

def build_url(self):
"""
Builds the URL from the base_url and then appends the list of query parameters for the list of datasets.
Expand All @@ -197,43 +221,49 @@ def build_url(self):
class VersionAnnotationTask(AnnotationTaskInterface):
"""An annotation task that gets the version of the annotation"""

version_types = {}

def __init__(self, annotation_unit):
"""initializes the task with the annotation_unit.genomic_unit"""
AnnotationTaskInterface.__init__(self, annotation_unit)
self.version_types = {
"rest": self.get_annotation_version_from_rest, "rosalution": self.get_annotation_version_from_rosalution,
"date": self.get_annotation_version_from_date
}

def annotate(self):
"""placeholder for annotating a genomic unit with version"""
return "not-implemented"

def versioning_by_type(self, versioning_type):
"""Gets version by versioning type and returns the version data to the annotation unit"""

version_type = self.annotation_unit.dataset['versioning_type']
version = ""

if versioning_type == "rest":
version = self.get_annotation_version_from_rest()
elif versioning_type == "rosalution":
version = self.get_annotation_version_from_rosalution()
elif versioning_type == "date":
version = self.get_annotation_version_from_date()
if version_type not in self.version_types:
logger.error(('Failed versioning: "%s" is an Invalid Version Type', version_type))
return {}

version = self.version_types[version_type]()
return version

def get_annotation_version_from_rest(self):
"""Gets version for rest type and returns the version data"""
version_from_rest = ""
# getting version from rest
return version_from_rest
version = {"rest": "rosalution-temp-manifest-00"}

url_to_query = self.annotation_unit.dataset['version_url']
result = requests.get(url_to_query, verify=False, headers={"Accept": "application/json"}, timeout=30)
version = result.json()
return version

def get_annotation_version_from_rosalution(self):
"""Gets version for rosalution type and returns the version data"""
version_from_rosalution = ""
# getting version from rosalution
return version_from_rosalution

version = {"rosalution": "rosalution-manifest-00"}
return version

def get_annotation_version_from_date(self):
"""Gets version for date type and returns the version data"""
version_from_date = ""
# getting version from date
return version_from_date

version = {"date": str(date.today())}
return version


class AnnotationTaskFactory:
Expand All @@ -256,7 +286,7 @@ def register(cls, key: str, annotation_task_interface: AnnotationTaskInterface):
cls.tasks[key] = annotation_task_interface

@classmethod
def create(cls, annotation_unit: AnnotationUnit):
def create_annotation_task(cls, annotation_unit: AnnotationUnit):
"""
Creates an annotation task with a genomic_units and dataset json. Instantiates the class according to
a datasets 'annotation_source_type' from the datasets configurtion.
Expand All @@ -267,3 +297,12 @@ def create(cls, annotation_unit: AnnotationUnit):
new_task = cls.tasks[annotation_task_type](annotation_unit)
# new_task.set(annotation_unit.dataset)
return new_task

@classmethod
def create_version_task(cls, annotation_unit: AnnotationUnit):
"""
Creates an annotation task with a genomic_units and dataset json. Instantiates the class according to
a datasets 'annotation_source_type' from the datasets configurtion.
"""
new_task = cls.tasks["version"](annotation_unit)
return new_task
Loading

0 comments on commit 10783fe

Please sign in to comment.