from collections import Counter
from .TCT import sele_predicates_API, parse_KG, rank_by_primary_infores
from .TCT_pathfinder import generate_score_results, build_query_graph
from .translator_query import format_query_json
[docs]
def parse_results_for_neighborhood_finder(start_node_id:str, results:dict,
start_node_categories=None, end_node_categories=None,
get_node_info=True,
scoring_method='infores'):
"""
Converts the results of two TRAPI queries into the same general json format as the other pathfinder APIs.
scoring_method is how the node scores are generated, and could be 'infores' or 'edges'.
"""
# nodes
node_info = {}
# edges is a dict of intermediate nodes
node_edges = {}
for k, v in results.items():
i1 = v['subject']
i2 = v['object']
s_o = 'object'
if i1 == start_node_id:
intermediate_node_id = i2
s_o = 'object'
elif i2 == start_node_id:
intermediate_node_id = i1
s_o = 'subject'
else:
continue
if (i1 == start_node_id or i2 == start_node_id) and intermediate_node_id in node_edges:
node_edges[intermediate_node_id].append((k, v))
else:
node_edges[intermediate_node_id] = [(k, v)]
# add node dict
if intermediate_node_id not in node_info:
node_dict = {
}
node_info[intermediate_node_id] = node_dict
else:
node_dict = node_info[intermediate_node_id]
if 'attributes' not in v:
v['attributes'] = []
for attribute in v['attributes']:
if attribute['attribute_type_id'] == f'{s_o}_category':
if 'categories' not in node_dict:
node_dict['categories'] = set([attribute['value']])
else:
node_dict['categories'].add(attribute['value'])
if attribute['attribute_type_id'] == f'{s_o}_name' and 'name' not in node_dict:
node_dict['name'] = attribute['value']
node_info[intermediate_node_id] = node_dict
for k, v in node_info.items():
if 'categories' in v:
v['categories'] = list(v['categories'])
all_edges = {}
all_auxiliary_graphs = {}
i = 1
# sort connecting_intermediate_nodes by total number of connections
connection_counts = Counter({k: len(v) for k, v in node_edges.items()})
for i1, count in connection_counts.most_common():
edges = node_edges[i1]
all_edges.update({k: v for k, v in edges})
keys = [x[0] for x in edges]
all_auxiliary_graphs[f'aux_{i}_{i1}'] = keys
i += 1
# generate output json
output = {
'query_graph': build_query_graph(start_node_id, '', start_node_categories, end_node_categories),
'knowledge_graph': {'nodes': {x: node_info[x] for x in connection_counts.keys()},
'edges': all_edges,
},
'results': [{'analyses': []}],
'auxiliary_graphs': all_auxiliary_graphs
}
graph_scores, graph_scores_formatted = generate_score_results(output, method=scoring_method)
output['results'][0]['analyses'] = graph_scores_formatted
if get_node_info:
from .node_normalizer import get_normalized_nodes
nodes_to_add = []
for k, v in output['knowledge_graph']['nodes'].items():
if 'name' not in v or 'categories' not in v:
nodes_to_add.append(k)
if nodes_to_add:
batch_limit = 1000
all_normalized_nodes = {}
for idx in range(0, len(nodes_to_add), batch_limit):
batch = nodes_to_add[idx:idx + batch_limit]
batch_result = get_normalized_nodes(batch, mode='post')
all_normalized_nodes.update(batch_result)
for node_id in nodes_to_add:
nn = all_normalized_nodes.get(node_id)
if nn is not None:
output['knowledge_graph']['nodes'][node_id] = {'name': nn.label, 'categories': nn.types}
return output
[docs]
def neighborhood_finder(input_node, node2_categories, APInames, metaKG, API_predicates,
input_node_category = [],
predicates_subset=None,
attribute_constraints=None):
"""
This function is used to find the neighborhood of a given input node with intermediate categories.
Parameters
----------
input_node (str)
The input node - should be a CURIE id.
node2_categories (list)
A list of intermediate categories to be used in the neighborhood finding process.
APInames (dict)
A dictionary containing the names of the APIs to be used.
metaKG (DataFrame)
The metadata knowledge graph containing information about the APIs and their predicates.
API_predicates (dict)
A dictionary containing the predicates for each API.
input_node_category (list)
Optional. A list of categories for the input node. If empty, it will be derived from the input node's types.
attribute_constraints (list)
Optional. List of outputs of translator_query.build_attribute_constraint
Returns
--------------
input_node_id (str)
The curie id of the input node.
result (dict)
The result of the query for the input node.
result_parsed (DataFrame)
The parsed results for the input node.
result_ranked_by_primary_infores (DataFrame)
The ranked results based on primary infores.
--------------
Example:
>>> input_node_id, result, result_parsed, result_ranked_by_primary_infores1 = neighborhood_finder('MONDO:0008170', #Ovarian Cancer
node2_categories = ['biolink:SmallMolecule', 'biolink:Drug', 'biolink:ChemicalEntity'],
APInames = APInames,
metaKG = metaKG,
API_predicates = API_predicates)
--------------
"""
from . import node_normalizer
from . import translator_query
input_node_id = input_node
# Step 1: Resolve the input node to get its curie id and categories
input_node_info = node_normalizer.get_normalized_nodes(input_node_id)
print(input_node_id)
if len(input_node_category) == 0:
input_node_category = input_node_info.types
else:
input_node_category = list(set(input_node_category).intersection(set(input_node_info.types)))
if len(input_node_category) == 0:
input_node_category = input_node_info.types
# Step 2: Select predicates and APIs based on the intermediate categories
sele_predicates, sele_APIs, API_URLs = sele_predicates_API(input_node_category,
node2_categories,
metaKG, APInames)
if len(sele_predicates) ==0:
sele_predicates = ["biolink:related_to"]
# Step 3: Format the query JSON for the input node
#query_json = format_query_json([input_node_id],
# None,
# None,
# node2_categories,
# sele_predicates,
# attribute_constraints=attribute_constraints)
query_json = format_query_json(subject_ids = [input_node_id],
object_ids = None,
subject_categories = None,
object_categories = node2_categories,
predicates = sele_predicates,
attribute_constraints = None,
)
# Step 4: Query the APIs in parallel
result = translator_query.parallel_api_query(query_json=query_json,
select_APIs= sele_APIs,
APInames=APInames,
API_predicates=API_predicates,
max_workers=len(sele_APIs))
result_parsed = parse_KG(result)
# Step 7: Ranking the results. This ranking method is based on the number of unique
# primary infores. It can only be used to rank the results with one defined node.
result_ranked_by_primary_infores1 = rank_by_primary_infores(result_parsed, input_node_id) # input_node1_id is the curie id of the
parsed_results = parse_results_for_neighborhood_finder(input_node_id, result, input_node_category, node2_categories)
return input_node_id, result, parsed_results, result_ranked_by_primary_infores1