Source code for pypath.inputs.reaction

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from future.utils import iteritems

# Py 2/3
try:
    input = raw_input
except NameError:
    pass

try:
    import cPickle as pickle
except:
    import pickle

import sys
import os
import itertools
import gzip
import bs4
from lxml import etree
import copy
import struct

import pypath.share.curl as curl
import pypath.resources.urls as urls
import pypath.share.progress as progress
import pypath.share.common as common
import pypath.share.cache as cache_mod
import pypath.inputs.acsn as acsn_input
from pypath.resources import data_formats


[docs] def reactome_sbml(): """ Downloads Reactome human reactions in SBML format. Returns a dict of file objects. """ url = urls.urls['reactome']['sbml'] c = curl.Curl(url, silent = False, large = True, slow = True) return c.result
[docs] def reactome_biopax(organism = 9606, cache = True): """ Downloads Reactome human reactions in SBML format. Returns File object. """ organisms = {9606: 'Homo_sapiens'} unzipped = os.path.join( cache_mod.get_cachedir(), 'reactome_biopax_%s.owl' % organisms[organism] ) if not os.path.exists(unzipped) or not cache: fname = '%s.owl' % organisms[organism] url = urls.urls['reactome']['biopax_l3'] c = curl.Curl( url, silent = False, large = True, files_needed = [fname] ) fileobj = c.result[fname] with open(unzipped, 'w') as _unzipped: while True: chunk = fileobj.read(4096) if not chunk: break _unzipped.write(chunk) fileobj.close() _unzipped = open(unzipped, 'r') return _unzipped
[docs] def pid_biopax(): url = urls.urls['nci-pid']['biopax_l3'] c = curl.Curl(url, silent = False, large = True) return c.result
[docs] def panther_biopax(): url = urls.urls['panther']['biopax_l3'] c = curl.Curl(url, silent = False, large = True) return c.result
[docs] def acsn_biopax(): url = urls.urls['acsn']['biopax_l3'] c = curl.Curl(url, silent = False, large = True) return c.result
[docs] def reactome_bs(): """ Reactome pathways in SBML format. Yields tuples of pathway IDs (string) and SBML representationa of the pathwaya as a `bs4.BeautifulSoup` objects. """ sbml = reactome_sbml() for k, v in sbml.items(): yield k[:-5], bs4.BeautifulSoup(v.read(), 'html.parser')
# Process Reactome BioPAX level 3
[docs] def get_soup(elem): return bs4.BeautifulSoup(etree.tostring(elem), 'html.parser')
def _bp_collect_resources(elem, tag, restype = None): rdfpref = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}' rdfres = '%sresource' % rdfpref return [ x.get(rdfres).replace('#', '') for x in elem.iterfind(tag) if rdfres in x.attrib and (restype is None or x.get(rdfres).replace( '#', '').startswith(restype)) ]
[docs] def reactions_biopax(biopax_file, organism = 9606, protein_name_type = 'UniProt', clean = True): """ Processes a BioPAX file and extracts binary interactions. """ cachefile = os.path.join( cache_mod.get_cachedir(), '%s.processed.pickle' % os.path.split(biopax_file.name)[1] ) if os.path.exists(cachefile): sys.stdout.write('\t:: Loading already processed data\n') sys.stdout.flush() return pickle.load(open(cachefile, 'rb')) # string constants bppref = '{http://www.biopax.org/release/biopax-level3.owl#}' rdfpref = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}' rdfid = '%sID' % rdfpref rdfab = '%sabout' % rdfpref rdfres = '%sresource' % rdfpref bpprot = '%sProtein' % bppref bpcplx = '%sComplex' % bppref bpprre = '%sProteinReference' % bppref bpreac = '%sBiochemicalReaction' % bppref bpcata = '%sCatalysis' % bppref bpctrl = '%sControl' % bppref bpcoma = '%sComplexAssembly' % bppref bppstp = '%sPathwayStep' % bppref bpuxrf = '%sUnificationXref' % bppref bpstoi = '%sStoichiometry' % bppref bppubr = '%sPublicationXref' % bppref bppath = '%sPathway' % bppref bpfrfe = '%sFragmentFeature' % bppref bpseqi = '%sSequenceInterval' % bppref bpseqs = '%sSequenceSite' % bppref bpmodf = '%sModificationFeature' % bppref bpmodv = '%sSequenceModificationVocabulary' % bppref bpmphe = '%smemberPhysicalEntity' % bppref bperef = '%sentityReference' % bppref bpxref = '%sxref' % bppref bpdnam = '%sdisplayName' % bppref bprelr = '%sRelationshipXref' % bppref bpcsto = '%scomponentStoichiometry' % bppref bpstoc = '%sstoichiometricCoefficient' % bppref bpphye = '%sphysicalEntity' % bppref bpcted = '%scontrolled' % bppref bpcter = '%scontroller' % bppref bpctyp = '%scontrolType' % bppref bpleft = '%sleft' % bppref bprgth = '%sright' % bppref bpsprc = '%sstepProcess' % bppref bpfeat = '%sfeature' % bppref bpfelo = '%sfeatureLocation' % bppref bpibeg = '%ssequenceIntervalBegin' % bppref bpiend = '%ssequenceIntervalEnd' % bppref bpseqp = '%ssequencePosition' % bppref bpmoty = '%smodificationType' % bppref bppcom = '%spathwayComponent' % bppref bpterm = '%sterm' % bppref bpdb = '%sdb' % bppref bpid = '%sid' % bppref upStr = 'UniProt' modvoc = data_formats.reactome_modifications # intermediate results proteins = {} proteinfamilies = {} uniprots = {} proteinreferences = {} complexes = {} complexvariations = {} stoichiometries = {} reactions = {} complexassemblies = {} catalyses = {} controls = {} pathways = {} pathwaysteps = {} publications = {} fragmentfeatures = {} sequenceintervals = {} sequencesites = {} modificationfeatures = {} modificationvocabulary = {} protein_name_type = protein_name_type.lower() # processing the XML bpf = reactome_biopax(organism = organism) bp_filesize = 0 if hasattr(biopax_file, 'name') and os.path.exists(biopax_file.name): bp_filesize = os.path.getsize(biopax_file.name) if biopax_file.mode == 'r': biopax_file.close() biopax_file = open(biopax_file.name, 'rb') elif type(biopax_file) is tarfile.ExFileObject: bp_filesize = biopax_file.size elif type(biopax_file) is gzip.GzipFile: f = open(biopax_file.name, 'rb') f.seek(-4, 2) bp_filesize = struct.unpack('<I', f.read())[0] f.close() prg = progress.Progress(bp_filesize, 'Processing BioPAX XML', 1) fpos = biopax_file.tell() bp = etree.iterparse(biopax_file, events = ('end', )) used_elements = [] try: for ev, elem in bp: new_fpos = biopax_file.tell() prg.step(new_fpos - fpos) fpos = new_fpos _id = elem.get(rdfid) if rdfid in elem.attrib else elem.get(rdfab) # Protein if elem.tag == bpprot: entref = elem.find(bperef) if entref is not None: proteins[_id] = { 'protein': entref.get(rdfres).replace('#', ''), 'seqfeatures': _bp_collect_resources(elem, bpfeat), 'modfeatures': _bp_collect_resources(elem, bpfeat) } else: proteinfamilies[_id] = _bp_collect_resources(elem, bpmphe) # ProteinReference elif elem.tag == bpprre: proteinreferences[_id] = _bp_collect_resources(elem, bpxref) # UnificationXref elif elem.tag == bpuxrf or elem.tag == bprelr: db = elem.find(bpdb) if db is not None: if elem.find(bpdb).text.lower().startswith( protein_name_type): i = elem.find(bpid) if i is not None: uniprots[_id] = i.text # Complex elif elem.tag == bpcplx: if elem.find(bpcsto) is not None: complexes[_id] = _bp_collect_resources(elem, bpcsto) else: complexvariations[_id] = _bp_collect_resources(elem, bpmphe) # Stoichiometry elif elem.tag == bpstoi: stoichiometries[_id] = (elem.find(bpphye).get(rdfres).replace( '#', ''), int(float(elem.find(bpstoc).text))) # BiochemicalReaction elif elem.tag == bpreac: reactions[_id] = { 'refs': _bp_collect_resources(elem, bpxref), 'left': _bp_collect_resources(elem, bpleft), 'right': _bp_collect_resources(elem, bprgth) } # ComplexAssembly elif elem.tag == bpcoma: complexassemblies[_id] = { 'refs': _bp_collect_resources(elem, bpxref), 'left': _bp_collect_resources(elem, bpleft), 'right': _bp_collect_resources(elem, bprgth) } # Catalysis elif elem.tag == bpcata: cter = elem.find(bpcter) cted = elem.find(bpcted) if cter is not None and cted is not None: typ = elem.find(bpctyp) catalyses[_id] = { 'controller': cter.get(rdfres).replace('#', ''), 'controlled': cted.get(rdfres).replace('#', ''), 'type': '' if typ is None else typ.text } # Control elif elem.tag == bpctrl: cter = elem.find(bpcter) cted = elem.find(bpcted) if cter is not None and cted is not None: typ = elem.find(bpctyp) controls[_id] = { 'refs': _bp_collect_resources(elem, bpxref), 'type': typ.text if typ is not None else '', 'controller': cter.get(rdfres).replace('#', ''), 'controlled': cted.get(rdfres).replace('#', '') } # PathwayStep elif elem.tag == bppstp: pathwaysteps[_id] = _bp_collect_resources(elem, bppstp) # PublicationXref elif elem.tag == bppubr: pmid = elem.find(bpid) if pmid is not None: publications[_id] = pmid.text # FragmentFeature elif elem.tag == bpfrfe: fragmentfeatures[_id] = elem.find(bpfelo).get(rdfres).replace( '#', '') # SequenceInterval elif elem.tag == bpseqi: beg = elem.find(bpibeg) end = elem.find(bpiend) sequenceintervals[_id] = ( beg.get(rdfres).replace('#', '') if beg is not None else None, elem.find(bpiend).get(rdfres).replace('#', '') if end is not None else None) # SequenceSite elif elem.tag == bpseqs: seqp = elem.find(bpseqp) if seqp is not None: sequencesites[_id] = int(seqp.text) # ModificationFeature elif elem.tag == bpmodf: felo = elem.find(bpfelo) moty = elem.find(bpmoty) if felo is not None and moty is not None: modificationfeatures[_id] = ( elem.find(bpfelo).get(rdfres).replace('#', ''), elem.find(bpmoty).get(rdfres).replace('#', '')) # SequenceModificationVocabulary elif elem.tag == bpmodv: term = elem.find(bpterm) if term is not None: modificationvocabulary[_id] = term.text # Pathway elif elem.tag == bppath: try: pathways[_id] = { 'reactions': _bp_collect_resources(elem, bppcom), 'pathways': _bp_collect_resources(elem, bppcom) } except TypeError: sys.stdout.write('Wrong type at element:\n') sys.stdout.write(etree.tostring(elem)) sys.stdout.flush() if clean: used_elements.append(elem) if len(used_elements) > 800: for e in used_elements[:400]: e.clear() used_elements = used_elements[400:] except etree.XMLSyntaxError as e: sys.stdout.write('\n\tWARNING: XML processing error: %s\n' % str(e)) sys.stdout.flush() prg.terminate() del bp biopax_file.close() # # # # # # # # # # # # # # # # # # # from intermediate to final results prg = progress.Progress(len(proteins), 'Processing proteins', 11) proteins_uniprots = {} # return proteinreferences, uniprots for pref, protein in iteritems(proteins): prg.step() if protein['protein'] in proteinreferences: for prref in proteinreferences[protein['protein']]: if prref in uniprots: proteins_uniprots[pref] = uniprots[prref] prg.terminate() prg = progress.Progress(len(proteins), 'Processing PTMs', 11) proteins_modifications = {} for pref, protein in iteritems(proteins): prg.step() for modf in protein['modfeatures']: if modf in modificationfeatures: if modificationfeatures[modf][0] in sequencesites: if modificationfeatures[modf][1] in modificationvocabulary: if modificationvocabulary[modificationfeatures[modf][ 1]] in modvoc: if pref not in proteins_modifications: proteins_modifications[pref] = set([]) proteins_modifications[pref].add( (sequencesites[modificationfeatures[modf][0]], modvoc[modificationvocabulary[ modificationfeatures[modf][1]]][1], modvoc[modificationvocabulary[ modificationfeatures[modf][1]]][0])) prg.terminate() # build a uniform dict to handle all protein based entities # including complexes and variations/families entity_uniprot = {} prg = progress.Progress(len(proteins_uniprots), 'Processing proteins', 11) for pref, protein in iteritems(proteins_uniprots): prg.step() entity_uniprot[pref] = [{ 'members': [protein], 'ptms': {} if protein not in proteins_modifications else { protein: proteins_modifications[pref] } }] prg.terminate() prg = progress.Progress( len(proteinfamilies), 'Processing protein families', 11) for pfref, prefs in iteritems(proteinfamilies): prg.step() entity_uniprot[pfref] = [] for pref in prefs: if pref in proteins_uniprots: entity_uniprot[pfref].append({ 'members': [proteins_uniprots[pref]], 'ptms': {} if pref not in proteins_modifications else { proteins_uniprots[pref]: proteins_modifications[pref] } }) prg.terminate() # return entity_uniprot, complexes, proteins, proteinreferences, uniprots, # proteinfamilies, proteins_uniprots, reactions, controls, catalyses, # complexassemblies del proteins del proteinfamilies del proteinreferences prg = progress.Progress(len(complexes), 'Processing complexes', 11) for cref, cplex in iteritems(complexes): prg.step() if cref not in entity_uniprot: process_complex(0, cref, entity_uniprot, complexes, complexvariations, cplex, stoichiometries) prg.terminate() del complexes del stoichiometries del proteins_uniprots # return entity_uniprot, proteins, proteinreferences, uniprots, complexes, stoichiometries # # # prg = progress.Progress( len(reactions) + len(complexassemblies), 'Processing reactions', 11) reactions_uniprots = \ process_reactions(reactions, entity_uniprot, publications) complexassemblies_uniprots = \ process_reactions(complexassemblies, entity_uniprot, publications) del reactions del complexassemblies # # # prg = progress.Progress( len(controls) + len(catalyses), 'Processing controls and catalyses', 11) controls_uniprots = _process_controls( dict(itertools.chain.from_iterable( iteritems(d) for d in (controls, catalyses) )), entity_uniprot, dict(itertools.chain.from_iterable( iteritems(d) for d in ( reactions_uniprots, complexassemblies_uniprots, ) )), publications, ) for caref, ca in iteritems(complexassemblies_uniprots): controls_uniprots[caref] = { 'type': 'BINDING', 'refs': [publications[r] for r in ca['refs'] if r in publications], 'controller': None, 'controlled': ca } del entity_uniprot pickle.dump(controls_uniprots, open(cachefile, 'wb')) # return controls_uniprots, entity_uniprot, proteins, proteinreferences, # uniprots, complexes, stoichiometries return controls_uniprots
[docs] def process_reactions(reactions, entity_uniprot, publications): result = {} for rref, rea in iteritems(reactions): result[rref] = { 'refs': [publications[r] for r in rea['refs'] if r in publications], 'left': [entity_uniprot[l] for l in rea['left'] if l in entity_uniprot], 'right': [entity_uniprot[r] for r in rea['right'] if r in entity_uniprot] } return result
def _process_controls(controls, entity_uniprot, reactions_uniprots, publications): result = {} for cref, ctrl in iteritems(controls): result[cref] = { 'type': ctrl['type'], 'refs': [publications[r] for r in ctrl['refs'] if r in publications] if 'refs' in ctrl else [], 'controller': entity_uniprot[ctrl['controller']] if ctrl['controller'] in entity_uniprot else None, 'controlled': reactions_uniprots[ctrl['controlled']] if ctrl['controlled'] in reactions_uniprots else None } return result
[docs] def process_complex(depth, cref, entity_uniprot, complexes, complexvariations, cplex, stoichiometries): log = open('reactome.log', 'a') tabs = '\t' * (depth + 1) log.write('%sStarting processing %s, depth = %u\n' % (tabs[1:], cref, depth)) this_cplex = [{'members': [], 'ptms': {}}] log.write('%sComplex %s have %u member entities\n' % (tabs, cref, len(cplex))) for stoi in cplex: if stoi in stoichiometries: ref, num = stoichiometries[stoi] log.write('%sNew member entity: %s, stoichiometric coeff: %u\n' % (tabs, ref, num)) if ref.startswith('Complex') \ and ref not in entity_uniprot: if ref in complexes: log.write( '%s%s is a complex with %u subentities, and hasn\'t been processed yet\n' % (tabs, ref, len(complexes[ref]))) process_complex(depth + 1, ref, entity_uniprot, complexes, complexvariations, complexes[ref], stoichiometries) if ref in complexvariations: log.write( '%s%s is a complex group with %u variations, and hasn\'t been processed yet\n' % (tabs, ref, len(complexvariations[ref]))) entity_uniprot[ref] = [] for mref in complexvariations[ref]: if mref not in entity_uniprot and mref in complexes: log.write( '%s%s is a complex with %u subentities, and hasn\'t been processed yet\n' % (tabs, mref, len(complexes[mref]))) process_complex(depth + 1, mref, entity_uniprot, complexes, complexvariations, complexes[mref], stoichiometries) if mref in entity_uniprot: log.write( '%s%s is now processed, adding it as an instance of %s\n' % (tabs, mref, ref)) entity_uniprot[ref].extend(entity_uniprot[mref]) if ref in entity_uniprot: log.write( '%s%s is an already processed entity, with %u variants and %u members\n' % (tabs, ref, len(entity_uniprot[ref]), len(entity_uniprot[ref][0]['members']) if len(entity_uniprot[ref]) > 0 else 0)) log.write( '%sNumber of variants after processing %s: %u x %u = %u\n' % (tabs, ref, len(this_cplex), len(entity_uniprot[ref]), len(this_cplex) * len(entity_uniprot[ref]))) this_cplex_new = [] for var in this_cplex: i = 0 for new_member in entity_uniprot[ref]: var_new = copy.deepcopy(var) var_new['members'].extend(new_member['members'] * num) for u, ptm in iteritems(new_member['ptms']): if u not in var_new['ptms']: var_new['ptms'][u] = set([]) var_new['ptms'][u] = var_new['ptms'][ u] | new_member['ptms'][u] this_cplex_new.append(var_new) i += 1 this_cplex = this_cplex_new log.write('%sNumber of variants after processing %s: %u\n' % (tabs, ref, len(this_cplex))) log.write('%sNumber of members in %s: %u\n' % (tabs, cref, len(this_cplex[0]['members']) if len(this_cplex) > 0 else 0)) else: log.write('%sPermanently missing: %s\n' % (tabs, ref)) log.write('%sFinished processing %s, found %u variants with %u members\n' % (tabs[1:], cref, len(this_cplex), len(this_cplex[0]['members']) if len(this_cplex) > 0 else 0)) if cref not in entity_uniprot: entity_uniprot[cref] = [] entity_uniprot[cref].extend(this_cplex)
[docs] def reactome_interactions(cacheFile = None, ask = True, **kwargs): """ Downloads and processes Reactome BioPAX. Extracts binary interactions. The applied criteria are very stringent, yields very few interactions. Requires large free memory, approx. 2G. """ cacheFile = os.path.join( cache_mod.get_cachedir(), 'reactome.interactions.pickle' ) if cacheFile is None else cacheFile if os.path.exists(cacheFile): interactions = pickle.load(open(cacheFile, 'rb')) elif ask: while True: sys.stdout.write( '\n\tProcessing Reactome requires huge memory.\n' '\tPlease hit `y` if you have at least 2G free memory,\n' '\tor `n` to omit Reactome.\n' '\tAfter processing once, it will be saved in \n' '\t%s, so next time can be loaded quickly.\n\n' '\tProcess Reactome now? [y/n]\n' % cacheFile) sys.stdout.flush() answer = input().lower() if answer in {'y', 'n'}: break else: answer = 'y' if answer == 'y': return get_interactions('reactome', **kwargs) else: return []
[docs] def acsn_interactions_2(**kwargs): return get_interactions('acsn', **kwargs)
[docs] def pid_interactions(**kwargs): return get_interactions('pid', **kwargs)
[docs] def panther_interactions(**kwargs): return get_interactions('panther', **kwargs)
[docs] def get_interactions(source, mandatory_refs = True): ctrls = get_controls(source) return process_controls(ctrls, mandatory_refs)[0]
[docs] def get_controls(source, protein_name_type = None): name_types = { 'acsn': 'HGNC', 'reactome': 'UniProt', 'pid': 'UniProt', 'panther': 'UniProt' } if protein_name_type is None and source in name_types: protein_name_type = name_types[source] biopax = globals()['%s_biopax' % source] bpfile = biopax() if type(bpfile) is list: result = {} for bpf in bpfile: result = dict( reactions_biopax( bpf, protein_name_type = protein_name_type).items() + result.items()) else: result = reactions_biopax(bpfile, protein_name_type = protein_name_type) return result
[docs] def process_controls(controls, mandatory_refs = True): interactions = set([]) ptms = [] regulations = [] prg = progress.Progress(len(controls), 'Processing interactions', 11) for c in controls.values(): prg.step() if len(c['refs']) > 0 or not mandatory_refs: if c['controller'] is not None and len(c['controller']) > 0: for ctr in c['controller']: if len(common.unique_list(ctr['members'])) == 1: this_ctr = ctr['members'][0].split('-')[0] ctd = c['controlled'] if ctd is not None: # ctd['left'] is not None and ctd['right'] is not # None: for leftInst in itertools.product(*ctd['left']): for rightInst in itertools.product( *ctd['right']): lr = common.unique_list( common.flat_list([ l['members'] for l in leftInst ] + [r['members'] for r in rightInst])) if len(lr) == 1: this_ctd = lr[0].split('-')[0] interactions.add(( this_ctr, this_ctd, c['type'], ';'.join(c['refs'] if len(c[ 'refs']) > 0 else ctd['refs']), 'directed')) else: modDiff = {} ptmsLeft = set( [(ptms[0], ptm) for l in leftInst for ptms in l['ptms'].items() for ptm in ptms[1]]) ptmsRight = set( [(ptms[0], ptm) for r in rightInst for ptms in r['ptms'].items() for ptm in ptms[1]]) ptmsDiff = ptmsLeft ^ ptmsRight diffUniProts = common.unique_list( [ptm[0] for ptm in ptmsDiff]) if len(diffUniProts) == 1: this_ctd = diffUniProts[0].split( '-')[0] interactions.add( (this_ctr, this_ctd, c['type'], ';'.join(c['refs'] if len(c[ 'refs']) > 0 else ctd[ 'refs']), 'directed')) else: lefts = [ set(l['members']) for l in leftInst ] rights = [ set(r['members']) for r in rightInst ] onlyLefts = [ l for l in lefts if l not in rights ] onlyRights = [ r for r in rights if r not in lefts ] diffs = [] for l in onlyLefts: for r in onlyRights: diff = l ^ r if len(diff) == 1: diffs.append( list(diff)) diffs = common.unique_list( common.flat_list(diffs)) if len(diffs) == 1: this_ctd = diffs[0].split('-')[ 0] interactions.add( (this_ctr, this_ctd, c['type'], ';'.join(c['refs'] if len( c['refs']) > 0 else ctd['refs']), 'undirected')) # if the controller is unknown # and the reaction has only 2 proteins # these most probably bind each other # to form a complex else: ctd = c['controlled'] if ctd is not None: for leftInst in itertools.product(*ctd['left']): for rightInst in itertools.product(*ctd['right']): lr = common.unique_list( common.flat_list([ l['members'] for l in leftInst ] + [r['members'] for r in rightInst])) if len(lr) == 2: interactions.add( (lr[0].split('-')[0], lr[1].split('-')[0], c['type'], ';'.join(ctd['refs']))) prg.terminate() return list(interactions), ptms, regulations
# Process Reactome SBML def _reactome_id(obj, attr): return _reactome_extract_id(obj.attrs[attr]) def _reactome_extract_id(value): return int(value.split('_')[1]) def _reactome_res(obj): return _reactome_extract_res(obj.attrs['rdf:resource']) def _reactome_extract_res(value): return value.split(':')[-1] def _reactome_reactions(): species = {} compartments = {} reactions = {} soups = reactome_bs() for pw, soup in soups: m = soup.find('model') for cp in m.find('listofcompartments').find_all('compartment'): compartments[_reactome_id(cp, 'id')] = cp.attrs['name'] for sp in m.find('listofspecies').find_all('species'): cp = _reactome_id(sp, 'compartment') si = _reactome_id(sp, 'id') nm = sp.attrs['name'] ids = [] for i in sp.find('bqbiol:haspart').find_all('rdf:li'): ids.append(_reactome_res(i)) ids = sorted(common.unique_list(ids)) species[si] = {'name': nm, 'comp': cp, 'ids': ids} for rea in m.find('listofreactions').find_all('reaction'): ri = _reactome_id(rea, 'id') refs = [] for r in rea.find('bqbiol:isdescribedby').find_all('rdf:li'): refs.append(_reactome_res(r)) refs = sorted(common.unique_list(refs)) reas = [] for r in rea.find('listofreactants').find_all('speciesreference'): reas.append(_reactome_id(r, 'species')) reas = sorted(common.unique_list(reas)) prds = [] for p in rea.find('listofproducts').find_all('speciesreference'): prds.append(_reactome_id(p, 'species')) prds = sorted(common.unique_list(prds)) note = rea.find('notes').text reactions[ri] = { 'refs': refs, 'reas': reas, 'prds': prds, 'note': note } return compartments, species, reactions def _reactome_reactions_et(): sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}' compStr = '%scompartment' % sbmlPfx reacStr = '%sreaction' % sbmlPfx specStr = '%sspecies' % sbmlPfx species = {} compartments = {} reactions = {} sbml = reactome_sbml() for pw_sbml in sbml.values(): ctx = etree.iterparse(pw_sbml, events = ('end', )) for ev, elem in ctx: if elem.tag == compStr: k, v = _reactome_compartment(elem) compartments[k] = v elif elem.tag == reacStr: k, v = _reactome_reaction(elem) reactions[k] = v elif elem.tag == specStr: k, v = _reactome_species(elem) species[k] = v elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] return compartments, species, reactions def _reactome_compartment(elem): ci = _reactome_extract_id(elem.get('id')) nm = elem.get('name') return ci, nm def _reactome_species(elem): bqBiolPfx = '{http://biomodels.net/biology-qualifiers/}' rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}' hasPartStr = '%shasPart' % bqBiolPfx resStr = '%sresource' % rdfPfx si = _reactome_extract_id(elem.get('id')) cp = _reactome_extract_id(elem.get('compartment')) nm = elem.get('name') ids = sorted( common.unique_list(_reactome_collect_resources(elem, hasPartStr))) return si, {'name': nm, 'comp': cp, 'ids': ids} def _reactome_reaction(elem): bqBiolPfx = '{http://biomodels.net/biology-qualifiers/}' rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}' sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}' specStr = 'species' spRefStr = '%sspeciesReference' % sbmlPfx isDescStr = '%sisDescribedBy' % bqBiolPfx resStr = '%sresource' % rdfPfx lofReaStr = '%slistOfReactants' % sbmlPfx lofPrdStr = '%slistOfProducts' % sbmlPfx ri = _reactome_extract_id(elem.get('id')) refs = _reactome_collect_resources(elem, isDescStr) reas = _reactome_collect_species(elem, lofReaStr) prds = _reactome_collect_species(elem, lofPrdStr) note = elem.find('note').text # prefix? return ri, {'refs': refs, 'reas': reas, 'prds': prds, 'note': note} def _reactome_collect_resources(elem, tag): rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}' resStr = '%sresource' % rdfPfx liStr = '%sli' % rdfPfx res = [] for i in elem.find('.//%s' % tag).iterfind('.//%s' % liStr): res.append(_reactome_extract_res(i.get(resStr))) return res def _reactome_collect_species(elem, tag): sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}' spRefStr = '%sspeciesReference' % sbmlPfx specStr = 'species' res = [] for sp in elem.find('.//%s' % tag).iterfind('.//%s' % spRefStr): res.apped(_reactome_extract_id(sp.get(specStr))) return res
[docs] def get_acsn_effects(): """ Processes ACSN data, returns list of effects. """ negatives = set(['NEGATIVE_INFLUENCE', 'UNKNOWN_NEGATIVE_INFLUENCE']) positives = set( ['TRIGGER', 'POSITIVE_INFLUENCE', 'UNKNOWN_POSITIVE_INFLUENCE']) directed = set([ 'UNKNOWN_TRANSITION', 'INTERACTION_TYPE', 'KNOWN_TRANSITION_OMITTED', 'INHIBITION', 'UNKNOWN_POSITIVE_INFLUENCE', 'PROTEIN_INTERACTION', 'UNKNOWN_CATALYSIS', 'POSITIVE_INFLUENCE', 'STATE_TRANSITION', 'TRANSLATION', 'UNKNOWN_NEGATIVE_INFLUENCE', 'NEGATIVE_INFLUENCE', 'MODULATION', 'TRANSCRIPTION', 'COMPLEX_EXPANSION', 'TRIGGER', 'CATALYSIS', 'PHYSICAL_STIMULATION', 'UNKNOWN_INHIBITION', 'TRANSPORT' ]) data = acsn_input.acsn_interactions() effects = [] for l in data: if len(l) == 4: eff = set(l[2].split(';')) if len(eff & negatives) > 0: effects.append([l[0], l[1], '-']) elif len(eff & positives) > 0: effects.append([l[0], l[1], '+']) elif len(eff & directed) > 0: effects.append([l[0], l[1], '*']) return effects
[docs] def get_reactions(types = None, sources = None): if type(types) is list: types = set(types) if type(sources) is list: sources = set(sources) cachefile = os.path.join( cache_mod.get_cachedir(), 'reaction_interactions_by_source.pickle' ) if os.path.exists(cachefile): interactions = pickle.load(open(cachefile, 'rb')) else: import pypath.utils.pyreact as pyreact rea = pyreact.PyReact() rea.load_all() rea.expand_by_source() interactions = rea.interactions_by_source for i in interactions: if (sources is None or i[4] in sources) and \ (types is None or len(i[2] & types)): yield [ i[0], i[1], ';'.join(list(i[2] if types is None else i[2] & types)), str(int(i[3])), i[4], ';'.join(list(i[5])) ]