#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#fromfuture.utilsimportiteritemsimportreimportitertoolsimportcollectionsimportbs4importwarningsimportpypath.share.curlascurlimportpypath.resources.urlsasurlsimportpypath.share.progressasprogressimportpypath.share.commonascommonimportpypath.utils.mappingasmappingimportpypath.internals.interaasinteraimportpypath.core.entityasentityKeggPathway=collections.namedtuple('KeggPathway',['pathway'],)
[docs]defkegg_interactions():""" Downloads and processes KEGG Pathways. Returns list of interactions. """positive_terms={'activation','expression'}negative_terms={'inhibition','repression'}transc_terms={'expression','repression'}mechanism_terms={'phosphorylation','binding/association','dissociation','ubiquitination','dephosphorylation','glycosylation','state change','methylation',}direct_terms={'indirect effect'}KeggInteraction=collections.namedtuple('KeggInteraction',['id_a','id_b','effect','pathway','mechanism','is_direct','transcriptional',],)rehsa=re.compile(r'.*(hsa[0-9]+).*')req_hdrs=['Referer: http://www.genome.jp/kegg-bin/show_pathway''?map=hsa04710&show_description=show']hsa_list=[]interactions=[]c=curl.Curl(urls.urls['kegg_pws']['list_url'],silent=True)htmllst=c.resultlstsoup=bs4.BeautifulSoup(htmllst,'html.parser')forainlstsoup.find_all('a',href=True):m=rehsa.match(a['href'])ifm:hsa_list.append((m.groups(0)[0],a.text))prg=progress.Progress(len(hsa_list),'Processing KEGG Pathways',1,percent=False)forhsa,pwinhsa_list:prg.step()c=curl.Curl(urls.urls['kegg_pws']['kgml_url_2']%hsa,silent=True,req_headers=req_hdrs)kgml=c.resultwithwarnings.catch_warnings():warnings.simplefilter('ignore')kgmlsoup=bs4.BeautifulSoup(kgml,'html.parser')entries={}forentinkgmlsoup.find_all('entry'):gr=ent.find('graphics')ifgrand'name'ingr.attrs:entries[ent.attrs['id']]=[n.strip()forningr.attrs['name'].replace('...','').split(',')]uentries=dict([(eid,common.unique_list(common.flat_list([mapping.map_name(gn,'genesymbol','uniprot',strict=True)forgningns])))foreid,gnsiniteritems(entries)])forrelinkgmlsoup.find_all('relation'):subtypes={st.attrs['name']forstinrel.find_all('subtype')}if(rel.attrs['entry1']inuentriesandrel.attrs['entry2']inuentriesandsubtypes):is_direct='indirect effect'notinsubtypeseffect=('inhibition'ifnegative_terms&subtypeselse'activation'ifpositive_terms&subtypeselse'unknown')mechanism=';'.join(mechanism_terms&subtypes)transcriptional=bool(transc_terms&subtypes)foru1inuentries[rel.attrs['entry1']]:foru2inuentries[rel.attrs['entry2']]:interactions.append(KeggInteraction(id_a=u1,id_b=u2,effect=effect,pathway=pw,mechanism=mechanism,is_direct=is_direct,transcriptional=transcriptional,))prg.terminate()returncommon.unique_list(interactions)
[docs]defkegg_medicus(max_entity_variations=10):""" Retrieves and preprocesses the KEGG MEDICUS database. Returns a set of raw interaction records (with the original identifiers and some further attributes). Nested complexes and protein families are flattened which means each interacting pair is either a single protein or a protein complex. Then the combination of all variants of each interacting partner yields a separate record. E.g. if a family of 3 proteins interacts with a protein complex where one of the members can be 2 alternative proteins then this interaction yields 6 records. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. """reentity=re.compile(r'[,\+\(\)]|\w+')renminus2=re.compile(r'\(n(?:-2)?\)')renetref=re.compile(r'\[(N|nt)\d{5}\]')KeggMedicusRawInteraction=collections.namedtuple('KeggMedicusRawInteraction',['id_a','id_b','name_a','name_b','effect','itype','pw_type','type_a','type_b','network_id',],)i_code={'->':('post_translational','stimulation'),'=>':('transcriptional','stimulation'),'//':('post_translational','missing'),'-|':('post_translational','inhibition'),'=|':('transcriptional','inhibition'),'--':('post_translational','undirected'),'>>':('post_translational','enzyme_enzyme'),'==':('post_translational','missing'),}defprocess_entity(e):ifisinstance(e,str):e=renminus2.sub('',e)e=reentity.findall(e)sub=0stack=[]cplex=Falseforitine:ifit==',':continueelifit==')':sub-=1ifnotsub:stack.append(process_entity(this_stack))else:this_stack.append(it)elifsub:this_stack.append(it)ifit=='(':sub+=1elifit=='(':ifnotsub:this_stack=[]sub+=1elifit=='+':cplex=Trueelse:stack.append(it)ifcplex:stack=tuple(stack)returnstackdefflatten_entity(e):flat=[]ifisinstance(e,str):flat.append(e)elifisinstance(e,tuple):flat.extend(itertools.product(*((c,)ifisinstance(c,str)else(flatten_entity(c),)ifisinstance(c,tuple)elsecforcine)))elifisinstance(e,list):flat.extend(itertools.chain(*(flatten_entity(c)forcine)))ifany(any(isinstance(c,list)forcinflate)forflateinflat):flat=list(itertools.chain(*(flatten_entity(flate)forflateinflat)))flat=[flatten_nested_complex(flate)forflateinflat]returnflatdefflatten_nested_complex(cplex):ifis_nested_complex(cplex):cplex=tuple(memberformembersincplexformemberin(membersifisinstance(members,tuple)else(members,)))ifis_nested_complex(cplex):cplex=flatten_nested_complex(cplex)returncplexdefis_nested_complex(cplex):return(isinstance(cplex,tuple)andany(isinstance(member,tuple)formemberincplex))defget_interactions(connections,enames,pw_type,network_id):entities=dict((i,flatten_entity(process_entity(connections[i])))foriinrange(0,len(connections),2))foriinrange(0,len(connections)-1,2):itype,effect=i_code[connections[i+1]]if(len(entities[i])>max_entity_variationsorlen(entities[i+2])>max_entity_variations):continueforid_a,id_binitertools.product(entities[i],entities[i+2]):name_a,type_a=get_name_type(id_a,enames)name_b,type_b=get_name_type(id_b,enames)yieldKeggMedicusRawInteraction(id_a=id_a,id_b=id_b,name_a=name_a,name_b=name_b,effect=effect,itype=itype,pw_type=pw_type,type_a=type_a,type_b=type_b,network_id=network_id,)defget_name_type(_id,enames):return(tuple(zip(*(_get_name_type(i,enames)foriin_id)))ifisinstance(_id,tuple)else_get_name_type(_id,enames))def_get_name_type(_id,enames):if_idnotinenames:dbget=kegg_dbget(_id)ifnotdbget:name,entity_type=(None,None)else:name=(dbget['Name'][-1]ifisinstance(dbget['Name'],list)elsedbget['Name'])entity_type=dbget['Type'].lower()enames[_id]=(name,entity_type)returnenames[_id]recollect=re.compile(r'^(GENE|PERTURBANT|VARIANT|METABOLITE)')recon=re.compile(r'(->|--|//|-\||=>|>>|=\||==)')rewrongspace=re.compile(r'(\d+) (?=\d+)')result=set()url=urls.urls['kegg_pws']['medicus']c=curl.Curl(url,silent=False,large=True)enames={}collecting=Noneforrowinc.result:begin_coll=recollect.match(row)ifbegin_coll:collecting=begin_coll.group()row=row.split(maxsplit=1)[-1]ifcollecting:ifnotbegin_collandrow[0]!=' ':collecting=Nonecontinueifcollecting=='GENE':row=row.split(';')[0]id_name=row.split(maxsplit=1)iflen(id_name)==2:_id,name=id_nameelse:_id=id_name[0]dbget=kegg_dbget(_id)name=(dbget['Name']if'Name'indbgetelsedbget['Composition'])ifisinstance(name,list):name=name[-1]enames[_id]=(name.strip(),collecting.lower())c.fileobj.seek(0)forrowinc.fileobj:ifrow.startswith('ENTRY'):pw_type=Nonecollecting=Nonenetwork_id=row.split()[1]elifrow.startswith('TYPE'):pw_type=row.strip().split()[-1].lower()elifrow.startswith(' EXPANDED'):connections=renetref.sub('',row)connections=recon.sub(' \g<1> ',connections)connections=rewrongspace.sub('\g<1>,',connections)connections=connections.split()[1:]elifrow.startswith('///'):result.update(set(get_interactions(connections,enames,pw_type,network_id)))returnresult
[docs]defkegg_medicus_interactions(max_entity_variations=10,complexes=False):""" Retrieves and preprocesses human protein-protein and transcriptional regulatory interactions from the KEGG MEDICUS database. Optionally it returns protein complexes instead of interactions. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. complexes : bool Return a set of protein complexes instead of a list of molecular interactions. """KeggMedicusInteraction=collections.namedtuple('KeggMedicusInteraction',['id_a','id_b','entity_type_a','entity_type_b','interaction_type','effect',])result=[]cplexes={}defprocess_complex(ids,symbols,types):ifidsnotincplexes:ifnotall(t=='gene'fortintypes):cplexes[ids]=set()uniprots=[process_protein(id_,symbol)forid_,symbolinzip(ids,symbols)]this_cplexes={intera.Complex(components=components,sources='KEGG-MEDICUS',)forcomponentsinitertools.product(*uniprots)}cplexes[ids]=this_cplexesreturncplexes[ids]defprocess_protein(id_,symbol):return(mapping.map_name(id_,'entrez','uniprot')ormapping.map_name(id_,'genesymbol','uniprot'))defprocess_partner(ids,symbols,types=None):return(process_protein(ids,symbols)ifisinstance(ids,str)elseprocess_complex(ids,symbols,types))forrecinkegg_medicus(max_entity_variations=max_entity_variations):forid_a,id_binitertools.product(process_partner(rec.id_a,rec.name_a,rec.type_a),process_partner(rec.id_b,rec.name_b,rec.type_b),):ifnotcomplexes:result.append(KeggMedicusInteraction(id_a=id_a,id_b=id_b,entity_type_a=entity.Entity._get_entity_type(id_a),entity_type_b=entity.Entity._get_entity_type(id_b),interaction_type=rec.itype,effect=rec.effect,))returnset.union(*cplexes.values())ifcomplexeselseresult
[docs]defkegg_medicus_complexes(max_entity_variations=10):""" Extracts a `dict` of protein complexes from the KEGG MEDICUS database. max_entity_variations : int In KEGG MEDICUS many molecular entities are protein families or families of often large and nested protein complexes. By this option you can limit largest number of variants a single entity might yield, so you won't end up with one complex yielding hundreds of combinatiorial variants. """cplexes=kegg_medicus_interactions(max_entity_variations=max_entity_variations,complexes=True,)cplexes=dict((cplex.__str__(),cplex)forcplexincplexes)returncplexes
[docs]defkegg_dbget(entry):""" Retrieves an entry (e.g. compounds, network modules) by the KEGG DBGET interface (kegg.jp/dbget-bin/www_bget). """rexa=re.compile(r'\xa0+')stripchars='\r\n; 'reffields={'Authors','Title','Journal'}result={}ifisinstance(entry,int):entry='hsa:%u'%entryifentry.isdigit():entry='hsa:%s'%entryurl=urls.urls['kegg_pws']['dbget']%entryc=curl.Curl(url,silent=True,large=False)soup=bs4.BeautifulSoup(c.result,'html.parser')tbl=soup.find_all('table',limit=4)ifnottbl:returnNonetbl=tbl[-1]collecting_ref=Falselast_ref={}forrowintbl.findChildren('tr',recursive=False):key=row.find('th').text.strip()td=row.find('td')ifcollecting_ref:ifkeyinreffields:last_ref[key]=td.textcontinueelse:if'References'notinresult:result['References']=[]result['References'].append(last_ref)last_ref={}collecting_ref=Falseifkey=='Reference':collecting_ref=Truelast_ref['PMID']=re.findall(r'\d+',td.text)[-1]continuesubtbl=td.find_all('table')ifsubtbl:value={}forstinsubtbl:forsubrowinst.find_all('tr'):subtd=subrow.find_all('td')iflen(subtd)>1andsubtd[1].text:value[rexa.sub('',subtd[0].text)]=(subtd[1].text.strip(stripchars))else:subcontent=rexa.sub(' ',subtd[0].text).split()iflen(subcontent)>1:value[subcontent[0]]=(subcontent[1].strip(stripchars))else:value=rexa.sub(' ',td.text).strip(stripchars)if'\n'invalue:value=[lval.strip(stripchars)forlvalinre.split(r'\s*[\n\r]+\s*',value)]ifkey=='Entry':value,result['Type']=next(value.items().__iter__())result[key]=valuereturnresult