Skip to content

Commit

Permalink
Merge pull request #172 from RobokopU24/loadingReactome
Browse files Browse the repository at this point in the history
Loading reactome
  • Loading branch information
EvanDietzMorris authored Aug 4, 2023
2 parents 73c6622 + 682cbeb commit 75d021c
Show file tree
Hide file tree
Showing 20 changed files with 1,444 additions and 143 deletions.
16 changes: 8 additions & 8 deletions Common/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from Common.data_sources import get_available_data_sources
from Common.load_manager import SourceDataManager
from Common.kgx_file_merger import KGXFileMerger
from Common.neo4j_tools import Neo4jTools
from Common.neo4j_tools import create_neo4j_dump
from Common.kgxmodel import GraphSpec, SubGraphSource, DataSource, NormalizationScheme
from Common.metadata import Metadata, GraphMetadata, SourceMetadata
from Common.supplementation import SequenceVariantSupplementation
Expand Down Expand Up @@ -111,13 +111,13 @@ def build_graph(self, graph_id: str):

if 'neo4j' in graph_spec.graph_output_format.lower():
self.logger.info(f'Starting Neo4j dump pipeline for {graph_id}...')
neo4j_tools = Neo4jTools(graph_id=graph_id,
graph_version=graph_version)
dump_success = neo4j_tools.create_neo4j_dump(graph_id=graph_id,
graph_version=graph_version,
graph_directory=graph_output_dir,
nodes_filename=NODES_FILENAME,
edges_filename=EDGES_FILENAME)
dump_success = create_neo4j_dump(graph_id=graph_id,
graph_version=graph_version,
graph_directory=graph_output_dir,
nodes_filename=NODES_FILENAME,
edges_filename=EDGES_FILENAME,
logger=self.logger)

if dump_success:
graph_output_url = self.get_graph_output_URL(graph_id, graph_version)
graph_metadata.set_dump_url(f'{graph_output_url}graph_{graph_version}.db.dump')
Expand Down
2 changes: 2 additions & 0 deletions Common/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
BIOLINK = 'Biolink'
CAM_KP = 'CAM-KP'
CHEBI_PROPERTIES = 'CHEBIProps'
REACTOME = 'Reactome'
CORD19 = 'Cord19'
CTD = 'CTD'
DRUG_CENTRAL = 'DrugCentral'
Expand Down Expand Up @@ -45,6 +46,7 @@
BIOLINK: ("parsers.biolink.src.loadBL", "BLLoader"),
CAM_KP: ("parsers.camkp.src.loadCAMKP", "CAMKPLoader"),
CHEBI_PROPERTIES: ("parsers.chebi.src.loadChebiProperties", "ChebiPropertiesLoader"),
REACTOME: ("parsers.Reactome.src.loadReactome", "ReactomeLoader"),
CORD19: ("parsers.cord19.src.loadCord19", "Cord19Loader"),
CTD: ("parsers.CTD.src.loadCTD", "CTDLoader"),
DRUG_CENTRAL: ("parsers.drugcentral.src.loaddrugcentral", "DrugCentralLoader"),
Expand Down
2 changes: 1 addition & 1 deletion Common/kgx_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class KGXFileWriter:
logger = LoggingUtil.init_logging("Data_services.Common.KGXFileWriter",
line_format='medium',
level=logging.DEBUG,
log_file_path=os.environ['DATA_SERVICES_LOGS'])
log_file_path=os.environ.get('DATA_SERVICES_LOGS'))
"""
constructor
:param nodes_output_file_path: the file path for the nodes file
Expand Down
4 changes: 2 additions & 2 deletions Common/loader_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
if not os.path.exists(self.data_path):
os.mkdir(self.data_path)
else:
self.data_path = os.environ["DATA_SERVICES_STORAGE"]
self.data_path = os.environ.get("DATA_SERVICES_STORAGE")

# the final output lists of nodes and edges
self.final_node_list: list = []
Expand All @@ -47,7 +47,7 @@ def __init__(self, test_mode: bool = False, source_data_dir: str = None):
self.logger = LoggingUtil.init_logging(f"Data_services.parsers.{self.get_name()}",
level=logging.INFO,
line_format='medium',
log_file_path=os.environ['DATA_SERVICES_LOGS'])
log_file_path=os.environ.get('DATA_SERVICES_LOGS'))

def get_latest_source_version(self):
"""Determine and return the latest source version ie. a unique identifier associated with the latest version."""
Expand Down
196 changes: 105 additions & 91 deletions Common/neo4j_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,117 +10,74 @@
class Neo4jTools:

def __init__(self,
graph_id: str,
graph_version: str,
neo4j_host: str = '0.0.0.0',
http_port: int = 7474,
https_port: int = 7473,
bolt_port: int = 7687):
self.graph_id = graph_id
self.graph_version = graph_version
bolt_port: int = 7687,
password: str = None):
self.host = neo4j_host
self.http_port = http_port
self.https_port = https_port
self.bolt_port = bolt_port
self.password = password if password else os.environ['DATA_SERVICES_NEO4J_PASSWORD']
self.graph_db_uri = f'bolt://{neo4j_host}:{bolt_port}'
self.graph_db_auth = ("neo4j", os.environ['DATA_SERVICES_NEO4J_PASSWORD'])
self.graph_db_auth = ("neo4j", self.password)
self.neo4j_driver = neo4j.GraphDatabase.driver(self.graph_db_uri, auth=self.graph_db_auth)
self.logger = LoggingUtil.init_logging("Data_services.Common.neo4j_tools",
line_format='medium',
log_file_path=os.environ['DATA_SERVICES_LOGS'])

def create_neo4j_dump(self,
graph_id: str,
graph_version: str,
graph_directory: str,
nodes_filename: str = 'nodes.jsonl',
edges_filename: str = 'edges.jsonl'):

graph_nodes_file_path = os.path.join(graph_directory, nodes_filename)
graph_edges_file_path = os.path.join(graph_directory, edges_filename)
nodes_csv_filename = 'nodes.temp_csv'
edges_csv_filename = 'edges.temp_csv'
csv_nodes_file_path = os.path.join(graph_directory, nodes_csv_filename)
csv_edges_file_path = os.path.join(graph_directory, edges_csv_filename)
if os.path.exists(csv_nodes_file_path) and os.path.exists(csv_edges_file_path):
self.logger.info(f'CSV files were already created for {graph_id}({graph_version})')
else:
self.logger.info(f'Creating CSV files for {graph_id}({graph_version})...')
kgx_file_converter.convert_jsonl_to_neo4j_csv(nodes_input_file=graph_nodes_file_path,
edges_input_file=graph_edges_file_path,
nodes_output_file=csv_nodes_file_path,
edges_output_file=csv_edges_file_path)
self.logger.info(f'CSV files created for {graph_id}({graph_version})...')

graph_dump_file_path = os.path.join(graph_directory, f'graph_{graph_version}.db.dump')
if os.path.exists(graph_dump_file_path):
self.logger.info(f'Neo4j dump already exists for {graph_id}({graph_version})')
return True

neo4j_access = Neo4jTools(graph_id=graph_id, graph_version=graph_version)
try:
password_exit_code = neo4j_access.set_initial_password()
if password_exit_code != 0:
return False

import_exit_code = neo4j_access.import_csv_files(graph_directory=graph_directory,
csv_nodes_filename=nodes_csv_filename,
csv_edges_filename=edges_csv_filename)
if import_exit_code != 0:
return False

start_exit_code = neo4j_access.start_neo4j()
if start_exit_code != 0:
return False

waiting_exit_code = neo4j_access.wait_for_neo4j_initialization()
if waiting_exit_code != 0:
return False

indexes_exit_code = neo4j_access.add_db_indexes()
if indexes_exit_code != 0:
return False

stop_exit_code = neo4j_access.stop_neo4j()
if stop_exit_code != 0:
return False

dump_exit_code = neo4j_access.create_backup_dump(graph_dump_file_path)
if dump_exit_code != 0:
return False

finally:
neo4j_access.close()

self.logger.info(f'Success! Neo4j dump created with indexes for {graph_id}({graph_version})')
return True

def import_csv_files(self,
graph_directory: str,
csv_nodes_filename: str = None,
csv_edges_filename: str = None):

password_exit_code = self.set_initial_password()
if password_exit_code != 0:
return password_exit_code

self.logger.info(f'Importing csv files to neo4j...')
neo4j_import_cmd = ["neo4j-admin", "import", f"--nodes={csv_nodes_filename}",
f"--relationships={csv_edges_filename}",
'--delimiter=TAB',
'--array-delimiter=U+001F',
'--force=true']
'--force']
import_results: subprocess.CompletedProcess = subprocess.run(neo4j_import_cmd,
cwd=graph_directory,
stderr=subprocess.PIPE)
capture_output=True)
self.logger.info(import_results.stdout)
import_results_return_code = import_results.returncode
if import_results_return_code != 0:
error_message = f'Neo4j import subprocess error (ExitCode {import_results_return_code}): ' \
f'{import_results.stderr.decode("UTF-8")}'
self.logger.error(error_message)
return import_results_return_code

def load_backup_dump(self,
dump_file_path: str = None):
password_exit_code = self.set_initial_password()
if password_exit_code != 0:
return password_exit_code

self.logger.info(f'Loading a neo4j backup dump {dump_file_path}...')
neo4j_load_cmd = ['neo4j-admin', 'load', f'--from={dump_file_path}', '--force']
load_results: subprocess.CompletedProcess = subprocess.run(neo4j_load_cmd,
capture_output=True)
self.logger.info(load_results.stdout)
load_results_return_code = load_results.returncode
if load_results_return_code != 0:
error_message = f'Neo4j load subprocess error (ExitCode {load_results_return_code}): ' \
f'{load_results.stderr.decode("UTF-8")}'
self.logger.error(error_message)
return load_results_return_code

def create_backup_dump(self,
dump_file_path: str = None):
self.logger.info(f'Creating a backup dump of the neo4j...')
neo4j_dump_cmd = ['neo4j-admin', 'dump', f'--to={dump_file_path}']
dump_results: subprocess.CompletedProcess = subprocess.run(neo4j_dump_cmd,
stderr=subprocess.PIPE)
capture_output=True)
self.logger.info(dump_results.stdout)
dump_results_return_code = dump_results.returncode
if dump_results_return_code != 0:
error_message = f'Neo4j dump subprocess error (ExitCode {dump_results_return_code}): ' \
Expand All @@ -139,7 +96,8 @@ def stop_neo4j(self):
def __issue_neo4j_command(self, command: str):
neo4j_cmd = ['neo4j', f'{command}']
neo4j_results: subprocess.CompletedProcess = subprocess.run(neo4j_cmd,
stderr=subprocess.PIPE)
capture_output=True)
self.logger.info(neo4j_results.stdout)
neo4j_results_return_code = neo4j_results.returncode
if neo4j_results_return_code != 0:
error_message = f'Neo4j {command} subprocess error (ExitCode {neo4j_results_return_code}): ' \
Expand All @@ -148,9 +106,11 @@ def __issue_neo4j_command(self, command: str):
return neo4j_results_return_code

def set_initial_password(self):
neo4j_cmd = ['neo4j-admin', 'set-initial-password', os.environ['DATA_SERVICES_NEO4J_PASSWORD']]
self.logger.info('Setting initial password for Neo4j...')
neo4j_cmd = ['neo4j-admin', 'set-initial-password', self.password]
neo4j_results: subprocess.CompletedProcess = subprocess.run(neo4j_cmd,
stderr=subprocess.PIPE)
capture_output=True)
self.logger.info(neo4j_results.stdout)
neo4j_results_return_code = neo4j_results.returncode
if neo4j_results_return_code != 0:
error_message = f'Neo4j {neo4j_cmd} subprocess error (ExitCode {neo4j_results_return_code}): ' \
Expand Down Expand Up @@ -178,17 +138,6 @@ def add_db_indexes(self):
try:
with self.neo4j_driver.session() as session:

# edge id index
cypher_result = list(session.run("CALL db.relationshipTypes()"))
rel_types = [result['relationshipType'] for result in cypher_result]
self.logger.info(f'Adding edge indexes for rel types: {rel_types}')
for i, rel_type in enumerate(rel_types):
index_name = f'edge_id_{i}'
edge_id_index_cypher = f'CREATE INDEX {index_name} FOR ()-[r:`{rel_type}`]-() ON (r.id)'
session.run(edge_id_index_cypher).consume()
indexes_added += 1
index_names.append(index_name)

# node name index
node_name_index_cypher = f'CREATE INDEX node_name_index FOR (n:`{NAMED_THING}`) on (n.name)'
self.logger.info(f'Adding node name index on {NAMED_THING}.name')
Expand Down Expand Up @@ -236,7 +185,7 @@ def add_db_indexes(self):
self.logger.info(f"Adding indexes successful. {indexes_added} indexes created.")
return 0

def wait_for_neo4j_initialization(self, counter: int = 1):
def wait_for_neo4j_initialization(self, counter: int = 1, max_retries: int = 10):
try:
with self.neo4j_driver.session() as session:
session.run("return 1")
Expand All @@ -245,7 +194,7 @@ def wait_for_neo4j_initialization(self, counter: int = 1):
except neo4j.exceptions.AuthError as e:
raise e
except Exception as e:
if counter > 8:
if counter > max_retries:
self.logger.error(f'Waited too long for Neo4j initialization... giving up..')
return 1
self.logger.info(f'Waiting for Neo4j container to finish initialization... {repr(e)}')
Expand All @@ -256,3 +205,68 @@ def close(self):
self.neo4j_driver.close()


def create_neo4j_dump(graph_id: str,
graph_version: str,
graph_directory: str,
nodes_filename: str = 'nodes.jsonl',
edges_filename: str = 'edges.jsonl',
logger=None):
graph_nodes_file_path = os.path.join(graph_directory, nodes_filename)
graph_edges_file_path = os.path.join(graph_directory, edges_filename)
nodes_csv_filename = 'nodes.temp_csv'
edges_csv_filename = 'edges.temp_csv'
csv_nodes_file_path = os.path.join(graph_directory, nodes_csv_filename)
csv_edges_file_path = os.path.join(graph_directory, edges_csv_filename)
if os.path.exists(csv_nodes_file_path) and os.path.exists(csv_edges_file_path):
if logger:
logger.info(f'CSV files were already created for {graph_id}({graph_version})')
else:
if logger:
logger.info(f'Creating CSV files for {graph_id}({graph_version})...')
kgx_file_converter.convert_jsonl_to_neo4j_csv(nodes_input_file=graph_nodes_file_path,
edges_input_file=graph_edges_file_path,
nodes_output_file=csv_nodes_file_path,
edges_output_file=csv_edges_file_path)
if logger:
logger.info(f'CSV files created for {graph_id}({graph_version})...')

graph_dump_file_path = os.path.join(graph_directory, f'graph_{graph_version}.db.dump')
if os.path.exists(graph_dump_file_path):
if logger:
logger.info(f'Neo4j dump already exists for {graph_id}({graph_version})')
return True

neo4j_access = Neo4jTools()
try:
import_exit_code = neo4j_access.import_csv_files(graph_directory=graph_directory,
csv_nodes_filename=nodes_csv_filename,
csv_edges_filename=edges_csv_filename)
if import_exit_code != 0:
return False

start_exit_code = neo4j_access.start_neo4j()
if start_exit_code != 0:
return False

waiting_exit_code = neo4j_access.wait_for_neo4j_initialization()
if waiting_exit_code != 0:
return False

indexes_exit_code = neo4j_access.add_db_indexes()
if indexes_exit_code != 0:
return False

stop_exit_code = neo4j_access.stop_neo4j()
if stop_exit_code != 0:
return False

dump_exit_code = neo4j_access.create_backup_dump(graph_dump_file_path)
if dump_exit_code != 0:
return False

finally:
neo4j_access.close()

if logger:
logger.info(f'Success! Neo4j dump created with indexes for {graph_id}({graph_version})')
return True
4 changes: 2 additions & 2 deletions Common/normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self,
self.logger = LoggingUtil.init_logging("Data_services.Common.NodeNormalizer",
level=log_level,
line_format='medium',
log_file_path=os.environ['DATA_SERVICES_LOGS'])
log_file_path=os.environ.get('DATA_SERVICES_LOGS'))
# storage for regular nodes that failed to normalize
self.failed_to_normalize_ids = set()
# storage for variant nodes that failed to normalize
Expand Down Expand Up @@ -339,7 +339,7 @@ def __init__(self,
:param log_level - overrides default log level
"""
# create a logger
self.logger = LoggingUtil.init_logging("Data_services.Common.EdgeNormalizer", level=log_level, line_format='medium', log_file_path=os.environ['DATA_SERVICES_LOGS'])
self.logger = LoggingUtil.init_logging("Data_services.Common.EdgeNormalizer", level=log_level, line_format='medium', log_file_path=os.environ.get('DATA_SERVICES_LOGS'))
# normalization map for future look up of all normalized predicates
self.edge_normalization_lookup = {}
self.cached_edge_norms = {}
Expand Down
5 changes: 5 additions & 0 deletions Common/prefixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# Then, no matter what the prefix changed to, it would always be tied to the URL and could always be looked up within
# data services using a fixed key independent of biolink changes.

REACTOME = 'REACT'
CHEBI='CHEBI'
CTD='CTD'
CHEMBL='CHEMBL'
CHEMBL_MECHANISM='CHEMBL.MECHANISM'
CLINVAR='CLINVAR'
DBSNP='DBSNP'
DGIDB='DGIdb'
DRUGBANK='DRUGBANK'
Expand All @@ -34,6 +36,8 @@
HGNC_FAMILY='HGNC.FAMILY'
HP='HP'
HMDB='HMDB'
KEGG_COMPOUND='KEGG.COMPOUND'
KEGG_GLYCAN='KEGG.GLYCAN'
MEDDRA='MEDDRA'
MESH='MESH'
MONDO='MONDO'
Expand All @@ -42,6 +46,7 @@
NCIT='NCIT'
OMIM='OMIM'
ORPHANET='ORPHANET'
PUBCHEM_COMPOUND='PUBCHEM.COMPOUND'
PUBMED='PMID'
UBERON='UBERON'
UNIPROTKB='UniProtKB'
Expand Down
Loading

0 comments on commit 75d021c

Please sign in to comment.