Source code for TCT.TCT_pathfinder

# TCT Pathfinder...
import requests

from collections import Counter

from . import node_normalizer
from . import translator_query
from .TCT import sele_predicates_API

def format_query_json_for_pathfinder_with_constraints(subject_ids, 
                                     object_ids=None,
        subject_categories=None,
        object_categories=None,
        predicates=None,
        constraints=None,
        ):
    if constraints is None or len(constraints) == 0:
        constraints_intermediate_category = None
    else:
        constraints_intermediate_category = constraints
    q =  {
        "message": {
            "query_graph": {
            "nodes": {
                "n0": {
                "ids": [
                    subject_ids
                ]
                },
                "n1": {
                "ids": [
                    object_ids
                ]
                }
            },
            "paths": {
                "p0": {
                "subject": "n0",
                "object": "n1",
                "predicates": [
                    "biolink:related_to"
                ],
                "constraints": [
                    {
                    "intermediate_categories": constraints_intermediate_category
                    }
                ]
                }
            }
            }
        },
        "submitter": "TCT",
        #"stream_progress": True,
        "query_options": {
            "kp_timeout": "30",
            "prune_threshold": "50",
            "max_pathfinder_paths": "500",
            "max_path_length": "4"
        }
        }
  
    return q

[docs] def build_query_graph(start_node_id, end_node_id, start_node_categories=None, end_node_categories=None, constraints_path=None): """ start_node_categories and end_node_categories are lists of categories. """ q = { "nodes": { "on": { "categories": end_node_categories, "constraints": [], "ids": [ end_node_id ], "is_set": False, "option_group_id": None, "set_id": None, "set_interpretation": "BATCH" }, "sn": { "categories": start_node_categories, "constraints": [], "ids": [ start_node_id ], "is_set": False, "option_group_id": None, "set_id": None, "set_interpretation": "BATCH" } }, "paths": { "p0": { "constraints": constraints_path, "object": "on", "predicates": None, "subject": "sn" } } } return q
[docs] def generate_score_results(results, method='infores'): """ Generates a score dict, and a list of "analyses". method can be 'infores' or 'edges' """ graph_scores = {} max_score = 0 auxiliary_graphs = results['auxiliary_graphs'] for k, graph in auxiliary_graphs.items(): if method == 'infores': sources = set() for edge_index in graph: edge = results['knowledge_graph']['edges'][edge_index] for resource in edge['sources']: sources.add(resource['resource_id']) score = len(sources) if score > max_score: max_score = score else: score = len(graph) if score > max_score: max_score = score graph_scores[k] = score graph_scores_formatted = [] for k in graph_scores.keys(): graph_scores[k] = graph_scores[k]/max_score graph_scores_formatted.append({ 'attributes': None, 'path_bindings': { 'p0': [{'id': k}]}, 'resource_id': 'infores:tct', 'score': graph_scores[k], 'scoring_method': None, 'support_graphs': None }) return graph_scores, graph_scores_formatted
[docs] def parse_results_for_pathfinder(start_node_id:str, end_node_id:str, result1:dict, result2:dict, start_node_categories=None, end_node_categories=None, get_node_info=True, scoring_method='infores'): """ Converts the results of two TRAPI queries into the same general json format as the other pathfinder APIs. scoring_method is how the node scores are generated, and could be 'infores' or 'edges'. """ # nodes # TODO: get some node info? node attributes node_info = {} # edges is a dict of intermediate nodes intermediate_node_edges = {} for k, v in result1.items(): i1 = v['subject'] i2 = v['object'] s_o = 'object' if i1 == start_node_id: intermediate_node_id = i2 s_o = 'object' elif i2 == start_node_id: intermediate_node_id = i1 s_o = 'subject' else: continue if (i1 == start_node_id or i2 == start_node_id) and intermediate_node_id in intermediate_node_edges: intermediate_node_edges[intermediate_node_id].append((k, v)) else: intermediate_node_edges[intermediate_node_id] = [(k, v)] # add node dict if intermediate_node_id not in node_info: node_dict = { } node_info[intermediate_node_id] = node_dict else: node_dict = node_info[intermediate_node_id] for attribute in v['attributes']: if attribute['attribute_type_id'] == f'{s_o}_category': if 'categories' not in node_dict: node_dict['categories'] = set([attribute['value']]) else: node_dict['categories'].add(attribute['value']) if attribute['attribute_type_id'] == f'{s_o}_name' and 'name' not in node_dict: node_dict['name'] = attribute['value'] node_info[intermediate_node_id] = node_dict connecting_intermediate_nodes = {} for k, v in result2.items(): i1 = v['subject'] i2 = v['object'] if i1 == end_node_id: intermediate_node_id = i2 s_o = 'object' elif i2 == end_node_id: intermediate_node_id = i1 s_o = 'subject' else: continue if (i1 == end_node_id or i2 == end_node_id) and intermediate_node_id in intermediate_node_edges: if intermediate_node_id in connecting_intermediate_nodes: connecting_intermediate_nodes[intermediate_node_id]['e2'].append((k, v)) else: connecting_intermediate_nodes[intermediate_node_id] = {'e1': intermediate_node_edges[intermediate_node_id], 'e2' : [(k, v)]} if intermediate_node_id not in node_info: node_dict = { } node_info[intermediate_node_id] = node_dict else: node_dict = node_info[intermediate_node_id] for attribute in v['attributes']: if attribute['attribute_type_id'] == f'{s_o}_category': if 'categories' not in node_dict: node_dict['categories'] = set([attribute['value']]) else: node_dict['categories'].add(attribute['value']) if attribute['attribute_type_id'] == f'{s_o}_name' and 'name' not in node_dict: node_dict['name'] = attribute['value'] node_info[intermediate_node_id] = node_dict for k, v in node_info.items(): if 'categories' in v: v['categories'] = list(v['categories']) all_edges = {} all_auxiliary_graphs = {} i = 1 # sort connecting_intermediate_nodes by total number of connections connection_counts = Counter({k: len(v['e1'])*len(v['e2']) for k, v in connecting_intermediate_nodes.items()}) for i1, count in connection_counts.most_common(): kv = connecting_intermediate_nodes[i1] e1s = kv['e1'] e2s = kv['e2'] edges = {k: v for k, v in e1s} edges.update({k: v for k, v in e2s}) all_edges.update(edges) keys = [x[0] for x in e1s] + [x[0] for x in e2s] all_auxiliary_graphs[f'aux_{i}_{i1}'] = keys i += 1 # generate output json output = { 'query_graph': build_query_graph(start_node_id, end_node_id, start_node_categories, end_node_categories), # TODO: don't drop the nodes 'knowledge_graph': {'nodes': {x: node_info[x] for x in connection_counts.keys()}, 'edges': all_edges, }, 'results': [{'analyses': []}], 'auxiliary_graphs': all_auxiliary_graphs } graph_scores, graph_scores_formatted = generate_score_results(output, method=scoring_method) output['results'][0]['analyses'] = graph_scores_formatted if get_node_info: from .node_normalizer import get_normalized_nodes nodes_to_add = [] for k, v in output['knowledge_graph']['nodes'].items(): if 'name' not in v or 'categories' not in v: nodes_to_add.append(k) if nodes_to_add: normalized_nodes = get_normalized_nodes(nodes_to_add, mode='post') for node_id in nodes_to_add: nn = normalized_nodes.get(node_id) if nn is not None: output['knowledge_graph']['nodes'][node_id] = {'name': nn.label, 'categories': nn.types} return output
[docs] def pathfinder(input_node1_id:str, input_node2_id:str, intermediate_categories:list, APInames, metaKG, API_predicates, scoring_method='infores'): """ Returns a Pathfinder output for the given pair of nodes. scoring_method could be 'infores' or 'edges'. """ # get categories for input nodes normalized_node_dict = node_normalizer.get_normalized_nodes([input_node1_id, input_node2_id]) input_node1_info = normalized_node_dict[input_node1_id] input_node1_list = [input_node1_id] input_node1_category = input_node1_info.types input_node2_info = normalized_node_dict[input_node2_id] print(input_node2_id) input_node2_list = [input_node2_id] input_node2_category = input_node2_info.types # Select predicates and APIs based on the intermediate categories sele_predicates1, sele_APIs1, API_URLs1 = sele_predicates_API(input_node1_category, intermediate_categories, metaKG, APInames) sele_predicates2, sele_APIs2, API_URLs2 = sele_predicates_API(intermediate_categories, input_node2_category, metaKG, APInames) query_json1 = translator_query.format_query_json(input_node1_list, # a list of identifiers for input node1 [], # id list for the intermediate node, it can be empty list if only want to query node1 input_node1_category, # a list of categories of input node1 intermediate_categories, # a list of categories of the intermediate node sele_predicates1) # a list of predicates # for the second hop, we want the predicates to be... query_json2 = translator_query.format_query_json([], input_node2_list, intermediate_categories, # a list of categories of input node2 input_node2_category, # a list of categories of the intermediate node sele_predicates2) # a list of predicates result1 = translator_query.parallel_api_query(query_json=query_json1, select_APIs = sele_APIs1, APInames=APInames, API_predicates=API_predicates, max_workers=len(sele_APIs1)) result2 = translator_query.parallel_api_query(query_json=query_json2, select_APIs = sele_APIs2, APInames=APInames, API_predicates=API_predicates, max_workers=len(sele_APIs2)) output = parse_results_for_pathfinder(input_node1_id, input_node2_id, result1, result2, start_node_categories=input_node1_category, end_node_categories=input_node2_category, scoring_method=scoring_method, get_node_info=True) return result1, result2, output
# define a function that uses the query_json as an template and change the ids and categories of the nodes def format_pathfinder_query(node1_id, node1_category, node2_id, node2_category): query_json = { "message": { "query_graph": { "nodes": { "SN": { "ids": [ node1_id ], "categories": [ node1_category ] }, "ON": { "ids": [ node2_id ], "categories": [ node2_category ] } }, "paths": { "p0": { "subject": "SN", "object": "ON" } } } }, "submitter": "TCT", } return query_json def query_aragorn_pathfinder(node1_id, node1_category, node2_id, node2_category): aragorn_endpoint = 'https://shepherd.renci.org/aragorn/query' query_current = format_pathfinder_query(node1_id, node1_category, node2_id, node2_category) response = requests.post(aragorn_endpoint, json=query_current) return response def query_aragorn_pathfinder_with_constraints(node1_id, node2_id, constraints): aragorn_endpoint = 'https://shepherd.renci.org/aragorn/query' query_current = format_query_json_for_pathfinder_with_constraints(node1_id, node2_id, constraints) response = requests.post(aragorn_endpoint, json=query_current) return response def query_aragorn_pathfinder_with_constraints(node1_id, node1_category, node2_id, node2_category, constraints): aragorn_endpoint = 'https://shepherd.renci.org/aragorn/query' query_current = format_query_json_for_pathfinder_with_constraints(node1_id, node2_id, node1_category, node2_category, constraints) response = requests.post(aragorn_endpoint, json=query_current) return response def query_arax_pathfinder(node1_id, node1_category, node2_id, node2_category): ARAX_endpoint = 'https://arax.ci.transltr.io/api/arax/v1.4/query' query_current = format_pathfinder_query(node1_id, node1_category, node2_id, node2_category) response = requests.post(ARAX_endpoint, json=query_current) return response def query_arax_pathfinder_with_constraints(node1_id, node1_category, node2_id, node2_category, constraints): ARAX_endpoint = 'https://arax.ci.transltr.io/api/arax/v1.4/query' query_current = format_query_json_for_pathfinder_with_constraints(node1_id, node2_id, node1_category, node2_category, constraints) response = requests.post(ARAX_endpoint, json=query_current) return response