Source code for pypath.inputs.kegg

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from future.utils import iteritems

import re
import itertools
import collections
import bs4
import warnings

import pypath.share.curl as curl
import pypath.resources.urls as urls
import pypath.share.progress as progress
import pypath.share.common as common
import pypath.utils.mapping as mapping
import pypath.internals.intera as intera
import pypath.core.entity as entity


KeggPathway = collections.namedtuple(
    'KeggPathway',
    ['pathway'],
)


[docs] def kegg_interactions(): """ Downloads and processes KEGG Pathways. Returns list of interactions. """ positive_terms = {'activation', 'expression'} negative_terms = {'inhibition', 'repression'} transc_terms = {'expression', 'repression'} mechanism_terms = { 'phosphorylation', 'binding/association', 'dissociation', 'ubiquitination', 'dephosphorylation', 'glycosylation', 'state change', 'methylation', } direct_terms = {'indirect effect'} KeggInteraction = collections.namedtuple( 'KeggInteraction', [ 'id_a', 'id_b', 'effect', 'pathway', 'mechanism', 'is_direct', 'transcriptional', ], ) rehsa = re.compile(r'.*(hsa[0-9]+).*') req_hdrs = [ 'Referer: http://www.genome.jp/kegg-bin/show_pathway' '?map=hsa04710&show_description=show' ] hsa_list = [] interactions = [] c = curl.Curl(urls.urls['kegg_pws']['list_url'], silent = True) htmllst = c.result lstsoup = bs4.BeautifulSoup(htmllst, 'html.parser') for a in lstsoup.find_all('a', href = True): m = rehsa.match(a['href']) if m: hsa_list.append((m.groups(0)[0], a.text)) prg = progress.Progress( len(hsa_list), 'Processing KEGG Pathways', 1, percent = False ) for hsa, pw in hsa_list: prg.step() c = curl.Curl( urls.urls['kegg_pws']['kgml_url_2'] % hsa, silent = True, req_headers = req_hdrs ) kgml = c.result with warnings.catch_warnings(): warnings.simplefilter('ignore') kgmlsoup = bs4.BeautifulSoup(kgml, 'html.parser') entries = {} for ent in kgmlsoup.find_all('entry'): gr = ent.find('graphics') if gr and 'name' in gr.attrs: entries[ent.attrs['id']] = [ n.strip() for n in gr.attrs['name'].replace('...', '').split(',') ] uentries = dict([(eid, common.unique_list( common.flat_list([ mapping.map_name( gn, 'genesymbol', 'uniprot', strict = True) for gn in gns ]))) for eid, gns in iteritems(entries)]) for rel in kgmlsoup.find_all('relation'): subtypes = {st.attrs['name'] for st in rel.find_all('subtype')} if ( rel.attrs['entry1'] in uentries and rel.attrs['entry2'] in uentries and subtypes ): is_direct = 'indirect effect' not in subtypes effect = ( 'inhibition' if negative_terms & subtypes else 'activation' if positive_terms & subtypes else 'unknown' ) mechanism = ';'.join(mechanism_terms & subtypes) transcriptional = bool(transc_terms & subtypes) for u1 in uentries[rel.attrs['entry1']]: for u2 in uentries[rel.attrs['entry2']]: interactions.append( KeggInteraction( id_a = u1, id_b = u2, effect = effect, pathway = pw, mechanism = mechanism, is_direct = is_direct, transcriptional = transcriptional, ) ) prg.terminate() return common.unique_list(interactions)
[docs] def kegg_pathways(): data = kegg_interactions() pws = common.unique_list(map(lambda i: i[3], data)) proteins_pws = dict(map(lambda pw: (pw, set([])), pws)) interactions_pws = dict(map(lambda pw: (pw, set([])), pws)) for rec in data: u1, u2, eff, pw = rec[:4] proteins_pws[pw].add(u1) proteins_pws[pw].add(u2) interactions_pws[pw].add((u1, u2)) return proteins_pws, interactions_pws
[docs] def kegg_pathway_annotations(): result = collections.defaultdict(set) proteins, interactions = kegg_pathways() for pathway, uniprots in iteritems(proteins): record = KeggPathway(pathway = pathway) for uniprot in uniprots: result[uniprot].add(record) return dict(result)
[docs] def kegg_pathway_annotations_pathwaycommons(): result = collections.defaultdict(set) url = urls.urls['kegg_pws']['pw_annot'] c = curl.Curl(url, large = True, silent = False) for row in c.result: row = row.strip().split('\t') name = row[1].split(';', maxsplit = 1)[0] name = name.split(':', maxsplit = 1)[1].strip() uniprots = row[2:] annot = KeggPathway(pathway = name) for uniprot in uniprots: result[uniprot].add(annot) return dict(result)
[docs] def kegg_medicus(max_entity_variations = 10): """ Retrieves and preprocesses the KEGG MEDICUS database. Returns a set of raw interaction records (with the original identifiers and some further attributes). Nested complexes and protein families are flattened which means each interacting pair is either a single protein or a protein complex. Then the combination of all variants of each interacting partner yields a separate record. E.g. if a family of 3 proteins interacts with a protein complex where one of the members can be 2 alternative proteins then this interaction yields 6 records. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. """ reentity = re.compile(r'[,\+\(\)]|\w+') renminus2 = re.compile(r'\(n(?:-2)?\)') renetref = re.compile(r'\[(N|nt)\d{5}\]') KeggMedicusRawInteraction = collections.namedtuple( 'KeggMedicusRawInteraction', [ 'id_a', 'id_b', 'name_a', 'name_b', 'effect', 'itype', 'pw_type', 'type_a', 'type_b', 'network_id', ], ) i_code = { '->': ('post_translational', 'stimulation'), '=>': ('transcriptional', 'stimulation'), '//': ('post_translational', 'missing'), '-|': ('post_translational', 'inhibition'), '=|': ('transcriptional', 'inhibition'), '--': ('post_translational', 'undirected'), '>>': ('post_translational', 'enzyme_enzyme'), '==': ('post_translational', 'missing'), } def process_entity(e): if isinstance(e, str): e = renminus2.sub('', e) e = reentity.findall(e) sub = 0 stack = [] cplex = False for it in e: if it == ',': continue elif it == ')': sub -= 1 if not sub: stack.append(process_entity(this_stack)) else: this_stack.append(it) elif sub: this_stack.append(it) if it == '(': sub += 1 elif it == '(': if not sub: this_stack = [] sub += 1 elif it == '+': cplex = True else: stack.append(it) if cplex: stack = tuple(stack) return stack def flatten_entity(e): flat = [] if isinstance(e, str): flat.append(e) elif isinstance(e, tuple): flat.extend( itertools.product(*( (c,) if isinstance(c, str) else (flatten_entity(c),) if isinstance(c, tuple) else c for c in e )) ) elif isinstance(e, list): flat.extend(itertools.chain(*(flatten_entity(c) for c in e))) if any( any(isinstance(c, list) for c in flate) for flate in flat ): flat = list( itertools.chain(*( flatten_entity(flate) for flate in flat )) ) flat = [flatten_nested_complex(flate) for flate in flat] return flat def flatten_nested_complex(cplex): if is_nested_complex(cplex): cplex = tuple( member for members in cplex for member in ( members if isinstance(members, tuple) else (members,) ) ) if is_nested_complex(cplex): cplex = flatten_nested_complex(cplex) return cplex def is_nested_complex(cplex): return ( isinstance(cplex, tuple) and any(isinstance(member, tuple) for member in cplex) ) def get_interactions(connections, enames, pw_type, network_id): entities = dict( ( i, flatten_entity(process_entity(connections[i])) ) for i in range(0, len(connections), 2) ) for i in range(0, len(connections) - 1, 2): itype, effect = i_code[connections[i + 1]] if ( len(entities[i]) > max_entity_variations or len(entities[i + 2]) > max_entity_variations ): continue for id_a, id_b in itertools.product(entities[i], entities[i + 2]): name_a, type_a = get_name_type(id_a, enames) name_b, type_b = get_name_type(id_b, enames) yield KeggMedicusRawInteraction( id_a = id_a, id_b = id_b, name_a = name_a, name_b = name_b, effect = effect, itype = itype, pw_type = pw_type, type_a = type_a, type_b = type_b, network_id = network_id, ) def get_name_type(_id, enames): return ( tuple(zip(*(_get_name_type(i, enames) for i in _id))) if isinstance(_id, tuple) else _get_name_type(_id, enames) ) def _get_name_type(_id, enames): if _id not in enames: dbget = kegg_dbget(_id) if not dbget: name, entity_type = (None, None) else: name = ( dbget['Name'][-1] if isinstance(dbget['Name'], list) else dbget['Name'] ) entity_type = dbget['Type'].lower() enames[_id] = (name, entity_type) return enames[_id] recollect = re.compile(r'^(GENE|PERTURBANT|VARIANT|METABOLITE)') recon = re.compile(r'(->|--|//|-\||=>|>>|=\||==)') rewrongspace = re.compile(r'(\d+) (?=\d+)') result = set() url = urls.urls['kegg_pws']['medicus'] c = curl.Curl(url, silent = False, large = True) enames = {} collecting = None for row in c.result: begin_coll = recollect.match(row) if begin_coll: collecting = begin_coll.group() row = row.split(maxsplit = 1)[-1] if collecting: if not begin_coll and row[0] != ' ': collecting = None continue if collecting == 'GENE': row = row.split(';')[0] id_name = row.split(maxsplit = 1) if len(id_name) == 2: _id, name = id_name else: _id = id_name[0] dbget = kegg_dbget(_id) name = ( dbget['Name'] if 'Name' in dbget else dbget['Composition'] ) if isinstance(name, list): name = name[-1] enames[_id] = (name.strip(), collecting.lower()) c.fileobj.seek(0) for row in c.fileobj: if row.startswith('ENTRY'): pw_type = None collecting = None network_id = row.split()[1] elif row.startswith('TYPE'): pw_type = row.strip().split()[-1].lower() elif row.startswith(' EXPANDED'): connections = renetref.sub('', row) connections = recon.sub(' \g<1> ', connections) connections = rewrongspace.sub('\g<1>,', connections) connections = connections.split()[1:] elif row.startswith('///'): result.update( set(get_interactions( connections, enames, pw_type, network_id )) ) return result
[docs] def kegg_medicus_interactions(max_entity_variations = 10, complexes = False): """ Retrieves and preprocesses human protein-protein and transcriptional regulatory interactions from the KEGG MEDICUS database. Optionally it returns protein complexes instead of interactions. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. complexes : bool Return a set of protein complexes instead of a list of molecular interactions. """ KeggMedicusInteraction = collections.namedtuple( 'KeggMedicusInteraction', [ 'id_a', 'id_b', 'entity_type_a', 'entity_type_b', 'interaction_type', 'effect', ] ) result = [] cplexes = {} def process_complex(ids, symbols, types): if ids not in cplexes: if not all(t == 'gene' for t in types): cplexes[ids] = set() uniprots = [ process_protein(id_, symbol) for id_, symbol in zip(ids, symbols) ] this_cplexes = { intera.Complex( components = components, sources = 'KEGG-MEDICUS', ) for components in itertools.product(*uniprots) } cplexes[ids] = this_cplexes return cplexes[ids] def process_protein(id_, symbol): return ( mapping.map_name(id_, 'entrez', 'uniprot') or mapping.map_name(id_, 'genesymbol', 'uniprot') ) def process_partner(ids, symbols, types = None): return ( process_protein(ids, symbols) if isinstance(ids, str) else process_complex(ids, symbols, types) ) for rec in kegg_medicus(max_entity_variations = max_entity_variations): for id_a, id_b in itertools.product( process_partner(rec.id_a, rec.name_a, rec.type_a), process_partner(rec.id_b, rec.name_b, rec.type_b), ): if not complexes: result.append( KeggMedicusInteraction( id_a = id_a, id_b = id_b, entity_type_a = entity.Entity._get_entity_type(id_a), entity_type_b = entity.Entity._get_entity_type(id_b), interaction_type = rec.itype, effect = rec.effect, ) ) return set.union(*cplexes.values()) if complexes else result
[docs] def kegg_medicus_complexes(max_entity_variations = 10): """ Extracts a `dict` of protein complexes from the KEGG MEDICUS database. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. """ cplexes = kegg_medicus_interactions( max_entity_variations = max_entity_variations, complexes = True, ) cplexes = dict((cplex.__str__(), cplex) for cplex in cplexes) return cplexes
[docs] def kegg_dbget(entry): """ Retrieves an entry (e.g. compounds, network modules) by the KEGG DBGET interface (kegg.jp/dbget-bin/www_bget). """ rexa = re.compile(r'\xa0+') stripchars = '\r\n; ' reffields = {'Authors', 'Title', 'Journal'} result = {} if isinstance(entry, int): entry = 'hsa:%u' % entry if entry.isdigit(): entry = 'hsa:%s' % entry url = urls.urls['kegg_pws']['dbget'] % entry c = curl.Curl(url, silent = True, large = False) soup = bs4.BeautifulSoup(c.result, 'html.parser') tbl = soup.find_all('table', limit = 4) if not tbl: return None tbl = tbl[-1] collecting_ref = False last_ref = {} for row in tbl.findChildren('tr', recursive = False): key = row.find('th').text.strip() td = row.find('td') if collecting_ref: if key in reffields: last_ref[key] = td.text continue else: if 'References' not in result: result['References'] = [] result['References'].append(last_ref) last_ref = {} collecting_ref = False if key == 'Reference': collecting_ref = True last_ref['PMID'] = re.findall(r'\d+', td.text)[-1] continue subtbl = td.find_all('table') if subtbl: value = {} for st in subtbl: for subrow in st.find_all('tr'): subtd = subrow.find_all('td') if len(subtd) > 1 and subtd[1].text: value[rexa.sub('', subtd[0].text)] = ( subtd[1].text.strip(stripchars) ) else: subcontent = rexa.sub(' ', subtd[0].text).split() if len(subcontent) > 1: value[subcontent[0]] = ( subcontent[1].strip(stripchars) ) else: value = rexa.sub(' ', td.text).strip(stripchars) if '\n' in value: value = [ lval.strip(stripchars) for lval in re.split(r'\s*[\n\r]+\s*', value) ] if key == 'Entry': value, result['Type'] = next(value.items().__iter__()) result[key] = value return result