From c7c26e0b20011d299596be4e1eed59c2afe2422d Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:29:56 -0400 Subject: [PATCH 01/10] Copy subgraph before modification --- strider/fetcher.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/strider/fetcher.py b/strider/fetcher.py index bef79085..e9348d9f 100644 --- a/strider/fetcher.py +++ b/strider/fetcher.py @@ -164,6 +164,9 @@ async def generate_from_kp( ) for batch_results in batch(onehop_results, 1_000_000): result_map = defaultdict(list) + # copy subqgraph between each batch + # before we fill it with result curies + populated_subqgraph = copy.deepcopy(subqgraph) for result in batch_results: # add edge to results and kgraph @@ -227,12 +230,12 @@ async def generate_from_kp( # pin nodes for qnode_id, bindings in result.node_bindings.items(): - if qnode_id not in subqgraph["nodes"]: + if qnode_id not in populated_subqgraph["nodes"]: continue - subqgraph["nodes"][qnode_id]["ids"] = ( - subqgraph["nodes"][qnode_id].get("ids") or [] + populated_subqgraph["nodes"][qnode_id]["ids"] = ( + populated_subqgraph["nodes"][qnode_id].get("ids") or [] ) + [binding.id for binding in bindings] - qnode_ids = set(subqgraph["nodes"].keys()) & set( + qnode_ids = set(populated_subqgraph["nodes"].keys()) & set( result.node_bindings.keys() ) key_fcn = lambda res: tuple( @@ -249,7 +252,7 @@ async def generate_from_kp( generators.append( self.generate_from_result( - copy.deepcopy(subqgraph), + populated_subqgraph, lambda result: result_map[key_fcn(result)], call_stack, ) From 8fb40660095cbef4f2d6c4327a6e087b33c22e13 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:33:16 -0400 Subject: [PATCH 02/10] Deduplicate curies from all results in qgraph --- strider/fetcher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/strider/fetcher.py b/strider/fetcher.py index e9348d9f..979d6ed3 100644 --- a/strider/fetcher.py +++ b/strider/fetcher.py @@ -232,9 +232,11 @@ async def generate_from_kp( for qnode_id, bindings in result.node_bindings.items(): if qnode_id not in populated_subqgraph["nodes"]: continue - populated_subqgraph["nodes"][qnode_id]["ids"] = ( + # add curies from result into the qgraph + # need to call set() to remove any duplicates + populated_subqgraph["nodes"][qnode_id]["ids"] = list(set(( populated_subqgraph["nodes"][qnode_id].get("ids") or [] - ) + [binding.id for binding in bindings] + ) + [binding.id for binding in bindings])) qnode_ids = set(populated_subqgraph["nodes"].keys()) & set( result.node_bindings.keys() ) From 2ea923f1face0de96666fb30b6126e3619f9e24f Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:34:10 -0400 Subject: [PATCH 03/10] Make qgraph easier to copy and paste from logs --- strider/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/strider/server.py b/strider/server.py index a92798cf..b248da5c 100644 --- a/strider/server.py +++ b/strider/server.py @@ -9,6 +9,7 @@ """ import copy import datetime +import json import os import uuid import logging @@ -454,7 +455,7 @@ async def lookup( fetcher = Fetcher(logger, parameters) - logger.info(f"Doing lookup for qgraph: {qgraph}") + logger.info(f"Doing lookup for qgraph: {json.dumps(qgraph)}") try: await fetcher.setup(qgraph, registry, information_content_threshold) except NoAnswersError: From 330ca25ef49ec65e23a5fb7de44b7e2ba14d0494 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:35:07 -0400 Subject: [PATCH 04/10] Add some timing metrics to the logs --- strider/server.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/strider/server.py b/strider/server.py index b248da5c..17577222 100644 --- a/strider/server.py +++ b/strider/server.py @@ -435,6 +435,7 @@ async def lookup( qid: str = None, ) -> dict: """Perform lookup operation.""" + lookup_start_time = datetime.datetime.now() qgraph = query_dict["message"]["query_graph"] log_level = query_dict.get("log_level") or "INFO" @@ -478,9 +479,12 @@ async def lookup( output_auxgraphs = AuxiliaryGraphs.parse_obj({}) + message_merging_time = 0 + async with fetcher: async for result_kgraph, result, result_auxgraph in fetcher.lookup(None): # Update the kgraph + start_merging = datetime.datetime.now() output_kgraph.update(result_kgraph) # Update the aux graphs @@ -497,6 +501,9 @@ async def lookup( # add new result to hashmap output_results[sub_result_hash] = result + stop_merging = datetime.datetime.now() + message_merging_time += (stop_merging - start_merging).total_seconds() + results = Results.parse_obj([]) for result in output_results.values(): # copy so result analyses don't get combined somehow @@ -518,6 +525,11 @@ async def lookup( collapse_sets(output_query, logger) output_query.logs = list(log_handler.contents()) + lookup_end_time = datetime.datetime.now() + logger.info({ + "total_lookup_time": (lookup_end_time - lookup_start_time).total_seconds(), + "total_merging": message_merging_time, + }) return output_query.dict(exclude_none=True) @@ -561,6 +573,7 @@ async def async_lookup( async def multi_lookup(multiqid, callback, queries: dict, query_keys: list): "Performs lookup for multiple queries and sends all results to callback url" + start_time = datetime.datetime.now() async def single_lookup(query_key): qid = f"{multiqid}.{str(uuid.uuid4())[:8]}" @@ -624,6 +637,8 @@ async def single_lookup(query_key): LOGGER.error( f"[{multiqid}] Failed to send 'completed' response back to {callback} with error: {e}" ) + end_time = datetime.datetime.now() + LOGGER.info(f"[{multiqid}] took {(end_time - start_time).total_seconds()} seconds") @APP.post("/plan", response_model=dict[str, list[str]], include_in_schema=False) From d026b713db36f37aa34e9c3b0b06e19d679a85ed Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:37:00 -0400 Subject: [PATCH 05/10] Fix error in JSON decode error handler that causes query to crash --- strider/throttle.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/strider/throttle.py b/strider/throttle.py index d7e3eae4..cb3703a5 100644 --- a/strider/throttle.py +++ b/strider/throttle.py @@ -351,8 +351,8 @@ async def process_batch( self.logger.warning( { "message": f"Received bad JSON data from {self.id}", - "request": e.request, - "response": e.response.text, + "request": json.dumps(merged_request_value), + "response": response.text, "error": str(e), } ) From af6d472fe6dd747631fecc45b1ac86bfa4f09606 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:39:58 -0400 Subject: [PATCH 06/10] Black --- strider/fetcher.py | 9 ++++++--- strider/server.py | 10 ++++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/strider/fetcher.py b/strider/fetcher.py index 979d6ed3..a1d8b0ac 100644 --- a/strider/fetcher.py +++ b/strider/fetcher.py @@ -234,9 +234,12 @@ async def generate_from_kp( continue # add curies from result into the qgraph # need to call set() to remove any duplicates - populated_subqgraph["nodes"][qnode_id]["ids"] = list(set(( - populated_subqgraph["nodes"][qnode_id].get("ids") or [] - ) + [binding.id for binding in bindings])) + populated_subqgraph["nodes"][qnode_id]["ids"] = list( + set( + (populated_subqgraph["nodes"][qnode_id].get("ids") or []) + + [binding.id for binding in bindings] + ) + ) qnode_ids = set(populated_subqgraph["nodes"].keys()) & set( result.node_bindings.keys() ) diff --git a/strider/server.py b/strider/server.py index 17577222..f9415dd4 100644 --- a/strider/server.py +++ b/strider/server.py @@ -526,10 +526,12 @@ async def lookup( output_query.logs = list(log_handler.contents()) lookup_end_time = datetime.datetime.now() - logger.info({ - "total_lookup_time": (lookup_end_time - lookup_start_time).total_seconds(), - "total_merging": message_merging_time, - }) + logger.info( + { + "total_lookup_time": (lookup_end_time - lookup_start_time).total_seconds(), + "total_merging": message_merging_time, + } + ) return output_query.dict(exclude_none=True) From f67797236fe7824449d0a5e2f13a7f91379b8039 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Fri, 8 Sep 2023 15:40:10 -0400 Subject: [PATCH 07/10] Include json import --- strider/throttle.py | 1 + 1 file changed, 1 insertion(+) diff --git a/strider/throttle.py b/strider/throttle.py index cb3703a5..9fd2feda 100644 --- a/strider/throttle.py +++ b/strider/throttle.py @@ -3,6 +3,7 @@ from asyncio.queues import QueueEmpty from asyncio.tasks import Task import copy +import json import datetime from functools import wraps import itertools From e7bb5c619ede6f0f458ca24b6844469c73e8ef32 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Mon, 11 Sep 2023 12:44:46 -0400 Subject: [PATCH 08/10] Change datetime to time --- strider/server.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/strider/server.py b/strider/server.py index f9415dd4..fdba1ba1 100644 --- a/strider/server.py +++ b/strider/server.py @@ -14,6 +14,7 @@ import uuid import logging import warnings +import time import traceback import asyncio @@ -435,7 +436,7 @@ async def lookup( qid: str = None, ) -> dict: """Perform lookup operation.""" - lookup_start_time = datetime.datetime.now() + lookup_start_time = time.time() qgraph = query_dict["message"]["query_graph"] log_level = query_dict.get("log_level") or "INFO" @@ -484,7 +485,7 @@ async def lookup( async with fetcher: async for result_kgraph, result, result_auxgraph in fetcher.lookup(None): # Update the kgraph - start_merging = datetime.datetime.now() + start_merging = time.time() output_kgraph.update(result_kgraph) # Update the aux graphs @@ -501,8 +502,8 @@ async def lookup( # add new result to hashmap output_results[sub_result_hash] = result - stop_merging = datetime.datetime.now() - message_merging_time += (stop_merging - start_merging).total_seconds() + stop_merging = time.time() + message_merging_time += (stop_merging - start_merging) results = Results.parse_obj([]) for result in output_results.values(): @@ -525,10 +526,10 @@ async def lookup( collapse_sets(output_query, logger) output_query.logs = list(log_handler.contents()) - lookup_end_time = datetime.datetime.now() + lookup_end_time = time.time() logger.info( { - "total_lookup_time": (lookup_end_time - lookup_start_time).total_seconds(), + "total_lookup_time": (lookup_end_time - lookup_start_time), "total_merging": message_merging_time, } ) @@ -575,7 +576,7 @@ async def async_lookup( async def multi_lookup(multiqid, callback, queries: dict, query_keys: list): "Performs lookup for multiple queries and sends all results to callback url" - start_time = datetime.datetime.now() + start_time = time.time() async def single_lookup(query_key): qid = f"{multiqid}.{str(uuid.uuid4())[:8]}" @@ -639,8 +640,8 @@ async def single_lookup(query_key): LOGGER.error( f"[{multiqid}] Failed to send 'completed' response back to {callback} with error: {e}" ) - end_time = datetime.datetime.now() - LOGGER.info(f"[{multiqid}] took {(end_time - start_time).total_seconds()} seconds") + end_time = time.time() + LOGGER.info(f"[{multiqid}] took {(end_time - start_time)} seconds") @APP.post("/plan", response_model=dict[str, list[str]], include_in_schema=False) From 8679b653cf00c77184f8d64cbb17b38dd589a635 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Mon, 11 Sep 2023 12:50:45 -0400 Subject: [PATCH 09/10] Add more to a comment --- strider/fetcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/strider/fetcher.py b/strider/fetcher.py index a1d8b0ac..69a89c9c 100644 --- a/strider/fetcher.py +++ b/strider/fetcher.py @@ -166,6 +166,8 @@ async def generate_from_kp( result_map = defaultdict(list) # copy subqgraph between each batch # before we fill it with result curies + # this keeps the sub query graph from being modified and passing + # extra curies into subsequent batches populated_subqgraph = copy.deepcopy(subqgraph) for result in batch_results: # add edge to results and kgraph From 5fd19f569b38dd7a974422a25802a68feed29bfb Mon Sep 17 00:00:00 2001 From: Max Wang Date: Mon, 11 Sep 2023 12:51:09 -0400 Subject: [PATCH 10/10] Run black --- strider/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/strider/server.py b/strider/server.py index fdba1ba1..eaf00935 100644 --- a/strider/server.py +++ b/strider/server.py @@ -503,7 +503,7 @@ async def lookup( output_results[sub_result_hash] = result stop_merging = time.time() - message_merging_time += (stop_merging - start_merging) + message_merging_time += stop_merging - start_merging results = Results.parse_obj([]) for result in output_results.values():