diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index 27f8e47..a0ec458 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -29,27 +29,24 @@ async def normalize_message(app: FastAPI, message: Message) -> Message: Given a TRAPI message, updates the message to include a normalized qgraph, kgraph, and results """ - try: - ret = Message() + ret = Message() - logger.debug(f"message.query_graph is None: {message.query_graph is None}") - if message.query_graph is not None: - merged_qgraph = await normalize_qgraph(app, message.query_graph) - ret.query_graph = merged_qgraph + logger.debug(f"message.query_graph is None: {message.query_graph is None}") + if message.query_graph is not None: + merged_qgraph = await normalize_qgraph(app, message.query_graph) + ret.query_graph = merged_qgraph - logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") - if message.knowledge_graph is not None: - merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) - ret.knowledge_graph = merged_kgraph + logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") + if message.knowledge_graph is not None: + merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) + ret.knowledge_graph = merged_kgraph - logger.debug(f"message.results is None: {message.results is None}") - if message.results is not None: - merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) - ret.results = merged_results + logger.debug(f"message.results is None: {message.results is None}") + if message.results is not None: + merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) + ret.results = merged_results - return ret - except Exception as e: - logger.error(f'normalize_message Exception: {e}') + return ret async def normalize_results(app, @@ -72,84 +69,77 @@ async def normalize_results(app, node_binding_seen = set() - try: - for node_code, node_bindings in result.node_bindings.items(): - merged_node_bindings = [] - for n_bind in node_bindings: - merged_binding = n_bind.dict() - # merged_binding['id'] = node_id_map[n_bind.id.__root__] - merged_binding['id'] = node_id_map[n_bind.id] - - # get the information content value - ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - - # did we get a good attribute dict - if ic_attrib: - if 'attributes' in merged_binding: - merged_binding['attributes'].append(ic_attrib) - else: - merged_binding['attributes'] = [ic_attrib] - - node_binding_information = [ - "atts" if k == 'attributes' - else (k, tuple(v)) if isinstance(v, list) - else (k, v) - for k, v in merged_binding.items() - ] + for node_code, node_bindings in result.node_bindings.items(): + merged_node_bindings = [] + for n_bind in node_bindings: + merged_binding = n_bind.dict() + # merged_binding['id'] = node_id_map[n_bind.id.__root__] + merged_binding['id'] = node_id_map[n_bind.id] - # if there are attributes in the node binding - if 'attributes' in merged_binding: - # storage for the pydantic Attributes - attribs = [] - - # the items in list of attributes must be of type Attribute - # in order to reuse hash method - if merged_binding['attributes'] is not None: - for attrib in merged_binding['attributes']: - new_attrib = Attribute.parse_obj(attrib) - - # add the new Attribute to the list - attribs.append(new_attrib) - - # call to get the hash - atty_hash = _hash_attributes(attribs) - node_binding_information.append(atty_hash) - node_binding_hash = frozenset(node_binding_information) + # get the information content value + ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - if node_binding_hash in node_binding_seen: - continue + # did we get a good attribute dict + if ic_attrib: + if 'attributes' in merged_binding: + merged_binding['attributes'].append(ic_attrib) else: - node_binding_seen.add(node_binding_hash) - merged_node_bindings.append(merged_binding) - - merged_result['node_bindings'][node_code] = merged_node_bindings + merged_binding['attributes'] = [ic_attrib] + + node_binding_information = [ + "atts" if k == 'attributes' + else (k, tuple(v)) if isinstance(v, list) + else (k, v) + for k, v in merged_binding.items() + ] + + # if there are attributes in the node binding + if 'attributes' in merged_binding: + # storage for the pydantic Attributes + attribs = [] + + # the items in list of attributes must be of type Attribute + # in order to reuse hash method + if merged_binding['attributes'] is not None: + for attrib in merged_binding['attributes']: + new_attrib = Attribute.parse_obj(attrib) + + # add the new Attribute to the list + attribs.append(new_attrib) + + # call to get the hash + atty_hash = _hash_attributes(attribs) + node_binding_information.append(atty_hash) + node_binding_hash = frozenset(node_binding_information) + + if node_binding_hash in node_binding_seen: + continue + else: + node_binding_seen.add(node_binding_hash) + merged_node_bindings.append(merged_binding) - except Exception as e: - logger.exception(e) + merged_result['node_bindings'][node_code] = merged_node_bindings edge_binding_seen = set() - try: - for edge_code, edge_bindings in result.edge_bindings.items(): - merged_edge_bindings = [] - for e_bind in edge_bindings: - merged_binding = e_bind.dict() - merged_binding['id'] = edge_id_map[e_bind.id] - - edge_binding_hash = frozenset([ - (k, freeze(v)) - for k, v in merged_binding.items() - ]) - - if edge_binding_hash in edge_binding_seen: - continue - else: - edge_binding_seen.add(edge_binding_hash) - merged_edge_bindings.append(merged_binding) + for edge_code, edge_bindings in result.edge_bindings.items(): + merged_edge_bindings = [] + for e_bind in edge_bindings: + merged_binding = e_bind.dict() + merged_binding['id'] = edge_id_map[e_bind.id] - merged_result['edge_bindings'][edge_code] = merged_edge_bindings - except Exception as e: - logger.exception(e) + edge_binding_hash = frozenset([ + (k, freeze(v)) + for k, v in merged_binding.items() + ]) + + if edge_binding_hash in edge_binding_seen: + continue + else: + edge_binding_seen.add(edge_binding_hash) + merged_edge_bindings.append(merged_binding) + + merged_result['edge_bindings'][edge_code] = merged_edge_bindings try: # This used to have some list comprehension based on types. But in TRAPI 1.1 the list/dicts get pretty deep. @@ -189,28 +179,25 @@ async def normalize_qgraph(app: FastAPI, qgraph: QueryGraph) -> QueryGraph: node_code_map: Dict[str, Union[str, List]] = {} for node_code, node in qgraph.nodes.items(): - try: - merged_nodes[node_code] = node.dict() + merged_nodes[node_code] = node.dict() - # as of TRAPI 1.1, node.id must be none or a list. - # node.id can be none, a string, or a list - if not node.ids: - # do nothing - continue - else: - if not isinstance(node.ids.__root__, list): - raise Exception("node.ids must be a list") - primary_ids = set() - for nr in node.ids.__root__: - equivalent_curies = await get_equivalent_curies(app, nr) - if equivalent_curies[nr]: - primary_ids.add(equivalent_curies[nr]['id']['identifier']) - else: - primary_ids.add(nr) - merged_nodes[node_code]['ids'] = list(primary_ids) - node_code_map[node_code] = list(primary_ids) - except Exception as e: - logger.error(f'normalize_qgraph Exception: {e}') + # as of TRAPI 1.1, node.id must be none or a list. + # node.id can be none, a string, or a list + if not node.ids: + # do nothing + continue + else: + if not isinstance(node.ids.__root__, list): + raise Exception("node.ids must be a list") + primary_ids = set() + for nr in node.ids.__root__: + equivalent_curies = await get_equivalent_curies(app, nr) + if equivalent_curies[nr]: + primary_ids.add(equivalent_curies[nr]['id']['identifier']) + else: + primary_ids.add(nr) + merged_nodes[node_code]['ids'] = list(primary_ids) + node_code_map[node_code] = list(primary_ids) return QueryGraph.parse_obj({ 'nodes': merged_nodes, @@ -241,151 +228,148 @@ async def normalize_kgraph( node_id_map: Dict[str, str] = {} edge_id_map: Dict[str, str] = {} - try: - # Map for each node id (curie) and its primary id - node_id_map: Dict[str, str] = {} + # Map for each node id (curie) and its primary id + node_id_map: Dict[str, str] = {} - # Map for each edge id and its primary id - edge_id_map: Dict[str, str] = {} + # Map for each edge id and its primary id + edge_id_map: Dict[str, str] = {} - # Map for each edge to its s,p,r,o signature - primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} + # Map for each edge to its s,p,r,o signature + primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} - # cache for primary node ids - primary_nodes_seen = set() + # cache for primary node ids + primary_nodes_seen = set() - # Count of times a node has been merged for attribute merging - node_merge_count: Dict[str, int] = {} + # Count of times a node has been merged for attribute merging + node_merge_count: Dict[str, int] = {} - # cache for nodes - nodes_seen = set() + # cache for nodes + nodes_seen = set() - # cache for subject, predicate, relation, object, attribute hash tuples - edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() + # cache for subject, predicate, relation, object, attribute hash tuples + edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() - for node_id, node in kgraph.nodes.items(): - if node_id in nodes_seen: - continue + for node_id, node in kgraph.nodes.items(): + if node_id in nodes_seen: + continue - nodes_seen.add(node_id) - node_id_map[node_id] = node_id # expected to overridden by primary id + nodes_seen.add(node_id) + node_id_map[node_id] = node_id # expected to overridden by primary id - merged_node = node.dict() + merged_node = node.dict() - equivalent_curies = await get_equivalent_curies(app, node_id) + equivalent_curies = await get_equivalent_curies(app, node_id) - if equivalent_curies[node_id]: - primary_id = equivalent_curies[node_id]['id']['identifier'] - node_id_map[node_id] = primary_id + if equivalent_curies[node_id]: + primary_id = equivalent_curies[node_id]['id']['identifier'] + node_id_map[node_id] = primary_id - if primary_id in primary_nodes_seen: - merged_node = _merge_node_attributes( - node_a=merged_kgraph['nodes'][primary_id], - node_b=node.dict(), - merged_count=node_merge_count[primary_id] - ) - merged_kgraph['nodes'][primary_id] = merged_node - node_merge_count[primary_id] += 1 - continue - else: - node_merge_count[primary_id] = 0 + if primary_id in primary_nodes_seen: + merged_node = _merge_node_attributes( + node_a=merged_kgraph['nodes'][primary_id], + node_b=node.dict(), + merged_count=node_merge_count[primary_id] + ) + merged_kgraph['nodes'][primary_id] = merged_node + node_merge_count[primary_id] += 1 + continue + else: + node_merge_count[primary_id] = 0 - primary_nodes_seen.add(primary_id) + primary_nodes_seen.add(primary_id) - if 'label' in equivalent_curies[node_id]['id']: - primary_label = equivalent_curies[node_id]['id']['label'] - elif 'name' in merged_node: - primary_label = merged_node['name'] + if 'label' in equivalent_curies[node_id]['id']: + primary_label = equivalent_curies[node_id]['id']['label'] + elif 'name' in merged_node: + primary_label = merged_node['name'] + else: + primary_label = '' + + merged_node['name'] = primary_label + + # Even if there's already a same_as attribute we add another + # since it is coming from a new source + if 'equivalent_identifiers' in equivalent_curies[node_id]: + same_as_attribute = { + 'attribute_type_id': 'biolink:same_as', + 'value': [ + node['identifier'] + for node in equivalent_curies[node_id]['equivalent_identifiers'] + ], + 'original_attribute_name': 'equivalent_identifiers', + "value_type_id": "EDAM:data_0006", + + # TODO, should we add the app version as the source + # or perhaps the babel/redis cache version + # This will make unit testing a little more tricky + # see https://stackoverflow.com/q/57624731 + + # 'source': f'{app.title} {app.version}', + } + if 'attributes' in merged_node and merged_node['attributes']: + merged_node['attributes'].append(same_as_attribute) else: - primary_label = '' - - merged_node['name'] = primary_label - - # Even if there's already a same_as attribute we add another - # since it is coming from a new source - if 'equivalent_identifiers' in equivalent_curies[node_id]: - same_as_attribute = { - 'attribute_type_id': 'biolink:same_as', - 'value': [ - node['identifier'] - for node in equivalent_curies[node_id]['equivalent_identifiers'] - ], - 'original_attribute_name': 'equivalent_identifiers', - "value_type_id": "EDAM:data_0006", - - # TODO, should we add the app version as the source - # or perhaps the babel/redis cache version - # This will make unit testing a little more tricky - # see https://stackoverflow.com/q/57624731 - - # 'source': f'{app.title} {app.version}', - } - if 'attributes' in merged_node and merged_node['attributes']: - merged_node['attributes'].append(same_as_attribute) - else: - merged_node['attributes'] = [same_as_attribute] + merged_node['attributes'] = [same_as_attribute] - if 'type' in equivalent_curies[node_id]: - if type(equivalent_curies[node_id]['type']) is list: - merged_node['categories'] = equivalent_curies[node_id]['type'] - else: - merged_node['categories'] = [equivalent_curies[node_id]['type']] + if 'type' in equivalent_curies[node_id]: + if type(equivalent_curies[node_id]['type']) is list: + merged_node['categories'] = equivalent_curies[node_id]['type'] + else: + merged_node['categories'] = [equivalent_curies[node_id]['type']] - # get the information content value - ic_attrib = await get_info_content_attribute(app, node_id) + # get the information content value + ic_attrib = await get_info_content_attribute(app, node_id) - # did we get a good attribute dict - if ic_attrib: - # add the attribute to the node - merged_node['attributes'].append(ic_attrib) + # did we get a good attribute dict + if ic_attrib: + # add the attribute to the node + merged_node['attributes'].append(ic_attrib) - merged_kgraph['nodes'][primary_id] = merged_node - else: - merged_kgraph['nodes'][node_id] = merged_node - - for edge_id, edge in kgraph.edges.items(): - # Accessing __root__ directly seems wrong, - # https://github.com/samuelcolvin/pydantic/issues/730 - # could also do str(edge.subject) - if edge.subject in node_id_map: - primary_subject = node_id_map[edge.subject] - else: - # should we throw a validation error here? - primary_subject = edge.subject + merged_kgraph['nodes'][primary_id] = merged_node + else: + merged_kgraph['nodes'][node_id] = merged_node + + for edge_id, edge in kgraph.edges.items(): + # Accessing __root__ directly seems wrong, + # https://github.com/samuelcolvin/pydantic/issues/730 + # could also do str(edge.subject) + if edge.subject in node_id_map: + primary_subject = node_id_map[edge.subject] + else: + # should we throw a validation error here? + primary_subject = edge.subject - if edge.object in node_id_map: - primary_object = node_id_map[edge.object] - else: - primary_object = edge.object + if edge.object in node_id_map: + primary_object = node_id_map[edge.object] + else: + primary_object = edge.object - hashed_attributes = _hash_attributes(edge.attributes) + hashed_attributes = _hash_attributes(edge.attributes) - if hashed_attributes is False: - # we couldn't hash the attribute so assume unique - hashed_attributes = uuid.uuid4() + if hashed_attributes is False: + # we couldn't hash the attribute so assume unique + hashed_attributes = uuid.uuid4() - triple = ( - primary_subject, - edge.predicate, - primary_object, - hashed_attributes - ) + triple = ( + primary_subject, + edge.predicate, + primary_object, + hashed_attributes + ) - if triple in edges_seen: - edge_id_map[edge_id] = primary_edges[triple] - continue - else: - primary_edges[triple] = edge_id - edge_id_map[edge_id] = edge_id + if triple in edges_seen: + edge_id_map[edge_id] = primary_edges[triple] + continue + else: + primary_edges[triple] = edge_id + edge_id_map[edge_id] = edge_id - edges_seen.add(triple) - merged_edge = edge.dict() + edges_seen.add(triple) + merged_edge = edge.dict() - merged_edge['subject'] = primary_subject - merged_edge['object'] = primary_object - merged_kgraph['edges'][edge_id] = merged_edge - except Exception as e: - logger.error(f'normalize_kgraph Exception: {e}') + merged_edge['subject'] = primary_subject + merged_edge['object'] = primary_object + merged_kgraph['edges'][edge_id] = merged_edge return KnowledgeGraph.parse_obj(merged_kgraph), node_id_map, edge_id_map @@ -486,71 +470,68 @@ async def get_normalized_nodes( # conflation_redis = 5 upper_curies = [c.upper() for c in curies] - try: - canonical_ids = await app.state.redis_connection0.mget(*upper_curies, encoding='utf-8') - canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] - info_contents = {} + canonical_ids = await app.state.redis_connection0.mget(*upper_curies, encoding='utf-8') + canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] + info_contents = {} - # did we get some canonical ids - if canonical_nonan: - # get the information content values - info_contents = await get_info_content(app, canonical_nonan) + # did we get some canonical ids + if canonical_nonan: + # get the information content values + info_contents = await get_info_content(app, canonical_nonan) - # Get the equivalent_ids and types - eqids, types = await get_eqids_and_types(app, canonical_nonan) + # Get the equivalent_ids and types + eqids, types = await get_eqids_and_types(app, canonical_nonan) - # are we looking for conflated values - if conflate: - # TODO: filter to just types that have Gene or Protein? I'm not sure it's worth it when we have pipelining - other_ids = await app.state.redis_connection5.mget(*canonical_nonan, encoding='utf8') + # are we looking for conflated values + if conflate: + # TODO: filter to just types that have Gene or Protein? I'm not sure it's worth it when we have pipelining + other_ids = await app.state.redis_connection5.mget(*canonical_nonan, encoding='utf8') - # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, - # they're not necessarily first. For instance if what came in and got canonicalized was a protein id - # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. - other_ids = [json.loads(oids) if oids is not None else [] for oids in other_ids] - dereference_others = dict(zip(canonical_nonan, other_ids)) + # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, + # they're not necessarily first. For instance if what came in and got canonicalized was a protein id + # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. + other_ids = [json.loads(oids) if oids is not None else [] for oids in other_ids] + dereference_others = dict(zip(canonical_nonan, other_ids)) - all_other_ids = sum(other_ids, []) - eqids2, types2 = await get_eqids_and_types(app, all_other_ids) + all_other_ids = sum(other_ids, []) + eqids2, types2 = await get_eqids_and_types(app, all_other_ids) - final_eqids = [] - final_types = [] + final_eqids = [] + final_types = [] - deref_others_eqs = dict(zip(all_other_ids, eqids2)) - deref_others_typ = dict(zip(all_other_ids, types2)) + deref_others_eqs = dict(zip(all_other_ids, eqids2)) + deref_others_typ = dict(zip(all_other_ids, types2)) - zipped = zip(canonical_nonan, eqids, types) + zipped = zip(canonical_nonan, eqids, types) - for canonical_id, e, t in zipped: - # here's where we replace the eqids, types - if len(dereference_others[canonical_id]) > 0: - e = [] - t = [] + for canonical_id, e, t in zipped: + # here's where we replace the eqids, types + if len(dereference_others[canonical_id]) > 0: + e = [] + t = [] - for other in dereference_others[canonical_id]: - e += deref_others_eqs[other] - t += deref_others_typ[other] + for other in dereference_others[canonical_id]: + e += deref_others_eqs[other] + t += deref_others_typ[other] - final_eqids.append(e) - final_types.append(uniquify_list(t)) + final_eqids.append(e) + final_types.append(uniquify_list(t)) - dereference_ids = dict(zip(canonical_nonan, final_eqids)) - dereference_types = dict(zip(canonical_nonan, final_types)) - else: - dereference_ids = dict(zip(canonical_nonan, eqids)) - dereference_types = dict(zip(canonical_nonan, types)) + dereference_ids = dict(zip(canonical_nonan, final_eqids)) + dereference_types = dict(zip(canonical_nonan, final_types)) else: - dereference_ids = dict() - dereference_types = dict() + dereference_ids = dict(zip(canonical_nonan, eqids)) + dereference_types = dict(zip(canonical_nonan, types)) + else: + dereference_ids = dict() + dereference_types = dict() - # output the final result - normal_nodes = { - input_curie: await create_node(canonical_id, dereference_ids, dereference_types, info_contents) - for input_curie, canonical_id in zip(curies, canonical_ids) - } + # output the final result + normal_nodes = { + input_curie: await create_node(canonical_id, dereference_ids, dereference_types, info_contents) + for input_curie, canonical_id in zip(curies, canonical_ids) + } - except Exception as e: - logger.error(f'Exception: {e}') return normal_nodes @@ -620,38 +601,35 @@ async def get_curie_prefixes( """ ret_val: dict = {} # storage for the returned data - try: - # was an arg passed in - if semantic_types: - for item in semantic_types: - # get the curies for this type - curies = await app.state.redis_connection3.get(item, encoding='utf-8') + # was an arg passed in + if semantic_types: + for item in semantic_types: + # get the curies for this type + curies = await app.state.redis_connection3.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - else: - types = await app.state.redis_connection3.lrange('semantic_types', 0, -1, encoding='utf-8') + # set the return data + ret_val[item] = {'curie_prefix': curies} + else: + types = await app.state.redis_connection3.lrange('semantic_types', 0, -1, encoding='utf-8') - for item in types: - # get the curies for this type - curies = await app.state.redis_connection3.get(item, encoding='utf-8') + for item in types: + # get the curies for this type + curies = await app.state.redis_connection3.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - except Exception as e: - logger.error(f'get_curie_prefixes Exception: {e}') + # set the return data + ret_val[item] = {'curie_prefix': curies} return ret_val @@ -663,34 +641,31 @@ def _merge_node_attributes(node_a: Dict, node_b, merged_count: int) -> Dict: :param merged_count: the number of nodes merged into node_a **upon entering this fx** """ - try: - if not ('attributes' in node_b and node_b['attributes']): - return node_a - - if merged_count == 0: - if 'attributes' in node_a and node_a['attributes']: - new_attribute_list = [] - for attribute in node_a['attributes']: - new_dict = {} - for k, v in attribute.items(): - new_dict[f"{k}.1"] = v - new_attribute_list.append(new_dict) - - node_a['attributes'] = new_attribute_list - - # Need to DRY this off - b_attr_id = merged_count + 2 - if 'attributes' in node_b and node_b['attributes']: + if not ('attributes' in node_b and node_b['attributes']): + return node_a + + if merged_count == 0: + if 'attributes' in node_a and node_a['attributes']: new_attribute_list = [] - for attribute in node_b['attributes']: + for attribute in node_a['attributes']: new_dict = {} for k, v in attribute.items(): - new_dict[f"{k}.{b_attr_id}"] = v + new_dict[f"{k}.1"] = v new_attribute_list.append(new_dict) - node_a['attributes'] = node_a['attributes'] + new_attribute_list - except Exception as e: - logger.error(f'_merge_node_attributes Exception {e}') + node_a['attributes'] = new_attribute_list + + # Need to DRY this off + b_attr_id = merged_count + 2 + if 'attributes' in node_b and node_b['attributes']: + new_attribute_list = [] + for attribute in node_b['attributes']: + new_dict = {} + for k, v in attribute.items(): + new_dict[f"{k}.{b_attr_id}"] = v + new_attribute_list.append(new_dict) + + node_a['attributes'] = node_a['attributes'] + new_attribute_list return node_a diff --git a/node_normalizer/server.py b/node_normalizer/server.py index ffcec48..1d21f61 100644 --- a/node_normalizer/server.py +++ b/node_normalizer/server.py @@ -117,35 +117,32 @@ async def async_query(async_query: reasoner_pydantic.AsyncQuery): async def async_query_task(async_query: reasoner_pydantic.AsyncQuery): - try: - async_query.message = await normalize_message(app, async_query.message) - session = requests.Session() - retries = Retry( - total=3, - backoff_factor=3, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=[ - "HEAD", - "GET", - "PUT", - "DELETE", - "OPTIONS", - "TRACE", - "POST", - ], - ) - session.mount("http://", HTTPAdapter(max_retries=retries)) - session.mount("https://", HTTPAdapter(max_retries=retries)) - logger.info(f"sending callback to: {async_query.callback}") - - post_response = session.post( - url=async_query.callback, - headers={"Content-Type": "application/json", "Accept": "application/json"}, - data=async_query.json(), - ) - logger.info(f"async_query post status code: {post_response.status_code}") - except BaseException as e: - logger.exception(e) + async_query.message = await normalize_message(app, async_query.message) + session = requests.Session() + retries = Retry( + total=3, + backoff_factor=3, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=[ + "HEAD", + "GET", + "PUT", + "DELETE", + "OPTIONS", + "TRACE", + "POST", + ], + ) + session.mount("http://", HTTPAdapter(max_retries=retries)) + session.mount("https://", HTTPAdapter(max_retries=retries)) + logger.info(f"sending callback to: {async_query.callback}") + + post_response = session.post( + url=async_query.callback, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + data=async_query.json(), + ) + logger.info(f"async_query post status code: {post_response.status_code}") @app.get(