diff --git a/parsers/monarchkg/src/loadMonarchKG.py b/parsers/monarchkg/src/loadMonarchKG.py index bc6ac23f..2417cb2f 100644 --- a/parsers/monarchkg/src/loadMonarchKG.py +++ b/parsers/monarchkg/src/loadMonarchKG.py @@ -5,7 +5,7 @@ from Common.loader_interface import SourceDataLoader from Common.kgxmodel import kgxedge -from Common.biolink_constants import PUBLICATIONS +from Common.biolink_constants import * from Common.utils import GetData @@ -18,7 +18,7 @@ class MonarchKGLoader(SourceDataLoader): source_id: str = 'MonarchKG' provenance_id: str = 'infores:monarchinitiative' - parsing_version: str = '1.0' + parsing_version: str = '1.1' def __init__(self, test_mode: bool = False, source_data_dir: str = None): """ @@ -89,12 +89,13 @@ def parse_data(self) -> dict: with tar_files.extractfile(self.monarch_edge_file_archive_path) as edges_file: for line in edges_file: monarch_edge = orjson.loads(line) - subject_id = monarch_edge['subject'] - object_id = monarch_edge['object'] - predicate = monarch_edge['predicate'] + # normally we wouldn't use constants to read FROM a source, + # but in this case monarch kg is biolink compliant, so they should be the same + subject_id = monarch_edge[SUBJECT_ID] + object_id = monarch_edge[OBJECT_ID] + predicate = monarch_edge[PREDICATE] if not (subject_id and object_id and predicate): skipped_bad_record_counter += 1 - print(line) continue if predicate not in self.desired_predicates: @@ -103,26 +104,32 @@ def parse_data(self) -> dict: # get the knowledge sources, map them to something else if needed, # then check if edge should be ignored due to the knowledge source - primary_knowledge_source = self.knowledge_source_mapping.get(monarch_edge['primary_knowledge_source'], - monarch_edge['primary_knowledge_source']) - aggregator_knowledge_sources = [self.knowledge_source_mapping.get(ks, ks) for ks in monarch_edge['aggregator_knowledge_source']] + primary_knowledge_source = self.knowledge_source_mapping.get(monarch_edge[PRIMARY_KNOWLEDGE_SOURCE], + monarch_edge[PRIMARY_KNOWLEDGE_SOURCE]) + aggregator_knowledge_sources = [self.knowledge_source_mapping.get(ks, ks) for ks in monarch_edge[AGGREGATOR_KNOWLEDGE_SOURCES]] if primary_knowledge_source in self.knowledge_source_ignore_list or \ any([ks in self.knowledge_source_ignore_list for ks in aggregator_knowledge_sources]): skipped_ignore_knowledge_source += 1 continue - edge_properties = {} - if monarch_edge['publications']: - edge_properties[PUBLICATIONS] = monarch_edge['publications'] + edge_properties = { + KNOWLEDGE_LEVEL: monarch_edge[KNOWLEDGE_LEVEL] if KNOWLEDGE_LEVEL in monarch_edge else NOT_PROVIDED, + AGENT_TYPE: monarch_edge[AGENT_TYPE] if AGENT_TYPE in monarch_edge else NOT_PROVIDED + } + if monarch_edge[PUBLICATIONS]: + edge_properties[PUBLICATIONS] = monarch_edge[PUBLICATIONS] for edge_attribute in monarch_edge: if '_qualifier' in edge_attribute and monarch_edge[edge_attribute]: edge_properties[edge_attribute] = monarch_edge[edge_attribute] + elif edge_attribute == QUALIFIED_PREDICATE and monarch_edge[QUALIFIED_PREDICATE]: + edge_properties[QUALIFIED_PREDICATE] = monarch_edge[QUALIFIED_PREDICATE] output_edge = kgxedge( subject_id=subject_id, predicate=predicate, object_id=object_id, primary_knowledge_source=primary_knowledge_source, - aggregator_knowledge_sources=aggregator_knowledge_sources + aggregator_knowledge_sources=aggregator_knowledge_sources, + edgeprops=edge_properties ) self.output_file_writer.write_node(object_id) self.output_file_writer.write_node(subject_id)