#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the `pypath` python module
#
# Copyright 2014-2023
# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
# Authors: see the file `README.rst`
# Contact: Dénes Türei (turei.denes@gmail.com)
#
# Distributed under the GPLv3 License.
# See accompanying file LICENSE.txt or copy at
# https://www.gnu.org/licenses/gpl-3.0.html
#
# Website: https://pypath.omnipathdb.org/
#
from future.utils import iteritems
# Py 2/3
try:
input = raw_input
except NameError:
pass
try:
import cPickle as pickle
except:
import pickle
import sys
import os
import itertools
import gzip
import bs4
from lxml import etree
import copy
import struct
import pypath.share.curl as curl
import pypath.resources.urls as urls
import pypath.share.progress as progress
import pypath.share.common as common
import pypath.share.cache as cache_mod
import pypath.inputs.acsn as acsn_input
from pypath.resources import data_formats
[docs]
def reactome_sbml():
"""
Downloads Reactome human reactions in SBML format.
Returns a dict of file objects.
"""
url = urls.urls['reactome']['sbml']
c = curl.Curl(url, silent = False, large = True, slow = True)
return c.result
[docs]
def reactome_biopax(organism = 9606, cache = True):
"""
Downloads Reactome human reactions in SBML format.
Returns File object.
"""
organisms = {9606: 'Homo_sapiens'}
unzipped = os.path.join(
cache_mod.get_cachedir(),
'reactome_biopax_%s.owl' % organisms[organism]
)
if not os.path.exists(unzipped) or not cache:
fname = '%s.owl' % organisms[organism]
url = urls.urls['reactome']['biopax_l3']
c = curl.Curl(
url,
silent = False,
large = True,
files_needed = [fname]
)
fileobj = c.result[fname]
with open(unzipped, 'w') as _unzipped:
while True:
chunk = fileobj.read(4096)
if not chunk:
break
_unzipped.write(chunk)
fileobj.close()
_unzipped = open(unzipped, 'r')
return _unzipped
[docs]
def pid_biopax():
url = urls.urls['nci-pid']['biopax_l3']
c = curl.Curl(url, silent = False, large = True)
return c.result
[docs]
def panther_biopax():
url = urls.urls['panther']['biopax_l3']
c = curl.Curl(url, silent = False, large = True)
return c.result
[docs]
def acsn_biopax():
url = urls.urls['acsn']['biopax_l3']
c = curl.Curl(url, silent = False, large = True)
return c.result
[docs]
def reactome_bs():
"""
Reactome pathways in SBML format. Yields tuples of pathway IDs (string)
and SBML representationa of the pathwaya as a `bs4.BeautifulSoup` objects.
"""
sbml = reactome_sbml()
for k, v in sbml.items():
yield k[:-5], bs4.BeautifulSoup(v.read(), 'html.parser')
# Process Reactome BioPAX level 3
[docs]
def get_soup(elem):
return bs4.BeautifulSoup(etree.tostring(elem), 'html.parser')
def _bp_collect_resources(elem, tag, restype = None):
rdfpref = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
rdfres = '%sresource' % rdfpref
return [
x.get(rdfres).replace('#', '') for x in elem.iterfind(tag)
if rdfres in x.attrib and (restype is None or x.get(rdfres).replace(
'#', '').startswith(restype))
]
[docs]
def reactions_biopax(biopax_file,
organism = 9606,
protein_name_type = 'UniProt',
clean = True):
"""
Processes a BioPAX file and extracts binary interactions.
"""
cachefile = os.path.join(
cache_mod.get_cachedir(), '%s.processed.pickle' %
os.path.split(biopax_file.name)[1]
)
if os.path.exists(cachefile):
sys.stdout.write('\t:: Loading already processed data\n')
sys.stdout.flush()
return pickle.load(open(cachefile, 'rb'))
# string constants
bppref = '{http://www.biopax.org/release/biopax-level3.owl#}'
rdfpref = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
rdfid = '%sID' % rdfpref
rdfab = '%sabout' % rdfpref
rdfres = '%sresource' % rdfpref
bpprot = '%sProtein' % bppref
bpcplx = '%sComplex' % bppref
bpprre = '%sProteinReference' % bppref
bpreac = '%sBiochemicalReaction' % bppref
bpcata = '%sCatalysis' % bppref
bpctrl = '%sControl' % bppref
bpcoma = '%sComplexAssembly' % bppref
bppstp = '%sPathwayStep' % bppref
bpuxrf = '%sUnificationXref' % bppref
bpstoi = '%sStoichiometry' % bppref
bppubr = '%sPublicationXref' % bppref
bppath = '%sPathway' % bppref
bpfrfe = '%sFragmentFeature' % bppref
bpseqi = '%sSequenceInterval' % bppref
bpseqs = '%sSequenceSite' % bppref
bpmodf = '%sModificationFeature' % bppref
bpmodv = '%sSequenceModificationVocabulary' % bppref
bpmphe = '%smemberPhysicalEntity' % bppref
bperef = '%sentityReference' % bppref
bpxref = '%sxref' % bppref
bpdnam = '%sdisplayName' % bppref
bprelr = '%sRelationshipXref' % bppref
bpcsto = '%scomponentStoichiometry' % bppref
bpstoc = '%sstoichiometricCoefficient' % bppref
bpphye = '%sphysicalEntity' % bppref
bpcted = '%scontrolled' % bppref
bpcter = '%scontroller' % bppref
bpctyp = '%scontrolType' % bppref
bpleft = '%sleft' % bppref
bprgth = '%sright' % bppref
bpsprc = '%sstepProcess' % bppref
bpfeat = '%sfeature' % bppref
bpfelo = '%sfeatureLocation' % bppref
bpibeg = '%ssequenceIntervalBegin' % bppref
bpiend = '%ssequenceIntervalEnd' % bppref
bpseqp = '%ssequencePosition' % bppref
bpmoty = '%smodificationType' % bppref
bppcom = '%spathwayComponent' % bppref
bpterm = '%sterm' % bppref
bpdb = '%sdb' % bppref
bpid = '%sid' % bppref
upStr = 'UniProt'
modvoc = data_formats.reactome_modifications
# intermediate results
proteins = {}
proteinfamilies = {}
uniprots = {}
proteinreferences = {}
complexes = {}
complexvariations = {}
stoichiometries = {}
reactions = {}
complexassemblies = {}
catalyses = {}
controls = {}
pathways = {}
pathwaysteps = {}
publications = {}
fragmentfeatures = {}
sequenceintervals = {}
sequencesites = {}
modificationfeatures = {}
modificationvocabulary = {}
protein_name_type = protein_name_type.lower()
# processing the XML
bpf = reactome_biopax(organism = organism)
bp_filesize = 0
if hasattr(biopax_file, 'name') and os.path.exists(biopax_file.name):
bp_filesize = os.path.getsize(biopax_file.name)
if biopax_file.mode == 'r':
biopax_file.close()
biopax_file = open(biopax_file.name, 'rb')
elif type(biopax_file) is tarfile.ExFileObject:
bp_filesize = biopax_file.size
elif type(biopax_file) is gzip.GzipFile:
f = open(biopax_file.name, 'rb')
f.seek(-4, 2)
bp_filesize = struct.unpack('<I', f.read())[0]
f.close()
prg = progress.Progress(bp_filesize, 'Processing BioPAX XML', 1)
fpos = biopax_file.tell()
bp = etree.iterparse(biopax_file, events = ('end', ))
used_elements = []
try:
for ev, elem in bp:
new_fpos = biopax_file.tell()
prg.step(new_fpos - fpos)
fpos = new_fpos
_id = elem.get(rdfid) if rdfid in elem.attrib else elem.get(rdfab)
# Protein
if elem.tag == bpprot:
entref = elem.find(bperef)
if entref is not None:
proteins[_id] = {
'protein': entref.get(rdfres).replace('#', ''),
'seqfeatures': _bp_collect_resources(elem, bpfeat),
'modfeatures': _bp_collect_resources(elem, bpfeat)
}
else:
proteinfamilies[_id] = _bp_collect_resources(elem, bpmphe)
# ProteinReference
elif elem.tag == bpprre:
proteinreferences[_id] = _bp_collect_resources(elem, bpxref)
# UnificationXref
elif elem.tag == bpuxrf or elem.tag == bprelr:
db = elem.find(bpdb)
if db is not None:
if elem.find(bpdb).text.lower().startswith(
protein_name_type):
i = elem.find(bpid)
if i is not None:
uniprots[_id] = i.text
# Complex
elif elem.tag == bpcplx:
if elem.find(bpcsto) is not None:
complexes[_id] = _bp_collect_resources(elem, bpcsto)
else:
complexvariations[_id] = _bp_collect_resources(elem,
bpmphe)
# Stoichiometry
elif elem.tag == bpstoi:
stoichiometries[_id] = (elem.find(bpphye).get(rdfres).replace(
'#', ''), int(float(elem.find(bpstoc).text)))
# BiochemicalReaction
elif elem.tag == bpreac:
reactions[_id] = {
'refs': _bp_collect_resources(elem, bpxref),
'left': _bp_collect_resources(elem, bpleft),
'right': _bp_collect_resources(elem, bprgth)
}
# ComplexAssembly
elif elem.tag == bpcoma:
complexassemblies[_id] = {
'refs': _bp_collect_resources(elem, bpxref),
'left': _bp_collect_resources(elem, bpleft),
'right': _bp_collect_resources(elem, bprgth)
}
# Catalysis
elif elem.tag == bpcata:
cter = elem.find(bpcter)
cted = elem.find(bpcted)
if cter is not None and cted is not None:
typ = elem.find(bpctyp)
catalyses[_id] = {
'controller': cter.get(rdfres).replace('#', ''),
'controlled': cted.get(rdfres).replace('#', ''),
'type': '' if typ is None else typ.text
}
# Control
elif elem.tag == bpctrl:
cter = elem.find(bpcter)
cted = elem.find(bpcted)
if cter is not None and cted is not None:
typ = elem.find(bpctyp)
controls[_id] = {
'refs': _bp_collect_resources(elem, bpxref),
'type': typ.text if typ is not None else '',
'controller': cter.get(rdfres).replace('#', ''),
'controlled': cted.get(rdfres).replace('#', '')
}
# PathwayStep
elif elem.tag == bppstp:
pathwaysteps[_id] = _bp_collect_resources(elem, bppstp)
# PublicationXref
elif elem.tag == bppubr:
pmid = elem.find(bpid)
if pmid is not None:
publications[_id] = pmid.text
# FragmentFeature
elif elem.tag == bpfrfe:
fragmentfeatures[_id] = elem.find(bpfelo).get(rdfres).replace(
'#', '')
# SequenceInterval
elif elem.tag == bpseqi:
beg = elem.find(bpibeg)
end = elem.find(bpiend)
sequenceintervals[_id] = (
beg.get(rdfres).replace('#', '') if beg is not None else
None, elem.find(bpiend).get(rdfres).replace('#', '')
if end is not None else None)
# SequenceSite
elif elem.tag == bpseqs:
seqp = elem.find(bpseqp)
if seqp is not None:
sequencesites[_id] = int(seqp.text)
# ModificationFeature
elif elem.tag == bpmodf:
felo = elem.find(bpfelo)
moty = elem.find(bpmoty)
if felo is not None and moty is not None:
modificationfeatures[_id] = (
elem.find(bpfelo).get(rdfres).replace('#', ''),
elem.find(bpmoty).get(rdfres).replace('#', ''))
# SequenceModificationVocabulary
elif elem.tag == bpmodv:
term = elem.find(bpterm)
if term is not None:
modificationvocabulary[_id] = term.text
# Pathway
elif elem.tag == bppath:
try:
pathways[_id] = {
'reactions': _bp_collect_resources(elem, bppcom),
'pathways': _bp_collect_resources(elem, bppcom)
}
except TypeError:
sys.stdout.write('Wrong type at element:\n')
sys.stdout.write(etree.tostring(elem))
sys.stdout.flush()
if clean:
used_elements.append(elem)
if len(used_elements) > 800:
for e in used_elements[:400]:
e.clear()
used_elements = used_elements[400:]
except etree.XMLSyntaxError as e:
sys.stdout.write('\n\tWARNING: XML processing error: %s\n' % str(e))
sys.stdout.flush()
prg.terminate()
del bp
biopax_file.close()
# # # # # # # # # # # # # # # # # #
# from intermediate to final results
prg = progress.Progress(len(proteins), 'Processing proteins', 11)
proteins_uniprots = {}
# return proteinreferences, uniprots
for pref, protein in iteritems(proteins):
prg.step()
if protein['protein'] in proteinreferences:
for prref in proteinreferences[protein['protein']]:
if prref in uniprots:
proteins_uniprots[pref] = uniprots[prref]
prg.terminate()
prg = progress.Progress(len(proteins), 'Processing PTMs', 11)
proteins_modifications = {}
for pref, protein in iteritems(proteins):
prg.step()
for modf in protein['modfeatures']:
if modf in modificationfeatures:
if modificationfeatures[modf][0] in sequencesites:
if modificationfeatures[modf][1] in modificationvocabulary:
if modificationvocabulary[modificationfeatures[modf][
1]] in modvoc:
if pref not in proteins_modifications:
proteins_modifications[pref] = set([])
proteins_modifications[pref].add(
(sequencesites[modificationfeatures[modf][0]],
modvoc[modificationvocabulary[
modificationfeatures[modf][1]]][1],
modvoc[modificationvocabulary[
modificationfeatures[modf][1]]][0]))
prg.terminate()
# build a uniform dict to handle all protein based entities
# including complexes and variations/families
entity_uniprot = {}
prg = progress.Progress(len(proteins_uniprots), 'Processing proteins', 11)
for pref, protein in iteritems(proteins_uniprots):
prg.step()
entity_uniprot[pref] = [{
'members': [protein],
'ptms': {} if protein not in proteins_modifications else {
protein: proteins_modifications[pref]
}
}]
prg.terminate()
prg = progress.Progress(
len(proteinfamilies), 'Processing protein families', 11)
for pfref, prefs in iteritems(proteinfamilies):
prg.step()
entity_uniprot[pfref] = []
for pref in prefs:
if pref in proteins_uniprots:
entity_uniprot[pfref].append({
'members': [proteins_uniprots[pref]],
'ptms': {} if pref not in proteins_modifications else {
proteins_uniprots[pref]: proteins_modifications[pref]
}
})
prg.terminate()
# return entity_uniprot, complexes, proteins, proteinreferences, uniprots,
# proteinfamilies, proteins_uniprots, reactions, controls, catalyses,
# complexassemblies
del proteins
del proteinfamilies
del proteinreferences
prg = progress.Progress(len(complexes), 'Processing complexes', 11)
for cref, cplex in iteritems(complexes):
prg.step()
if cref not in entity_uniprot:
process_complex(0, cref, entity_uniprot, complexes,
complexvariations, cplex, stoichiometries)
prg.terminate()
del complexes
del stoichiometries
del proteins_uniprots
# return entity_uniprot, proteins, proteinreferences, uniprots, complexes, stoichiometries
# # #
prg = progress.Progress(
len(reactions) + len(complexassemblies), 'Processing reactions', 11)
reactions_uniprots = \
process_reactions(reactions, entity_uniprot, publications)
complexassemblies_uniprots = \
process_reactions(complexassemblies, entity_uniprot, publications)
del reactions
del complexassemblies
# # #
prg = progress.Progress(
len(controls) + len(catalyses), 'Processing controls and catalyses',
11)
controls_uniprots = _process_controls(
dict(itertools.chain.from_iterable(
iteritems(d) for d in (controls, catalyses)
)),
entity_uniprot,
dict(itertools.chain.from_iterable(
iteritems(d) for d in (
reactions_uniprots,
complexassemblies_uniprots,
)
)),
publications,
)
for caref, ca in iteritems(complexassemblies_uniprots):
controls_uniprots[caref] = {
'type': 'BINDING',
'refs':
[publications[r] for r in ca['refs'] if r in publications],
'controller': None,
'controlled': ca
}
del entity_uniprot
pickle.dump(controls_uniprots, open(cachefile, 'wb'))
# return controls_uniprots, entity_uniprot, proteins, proteinreferences,
# uniprots, complexes, stoichiometries
return controls_uniprots
[docs]
def process_reactions(reactions, entity_uniprot, publications):
result = {}
for rref, rea in iteritems(reactions):
result[rref] = {
'refs':
[publications[r] for r in rea['refs'] if r in publications],
'left':
[entity_uniprot[l] for l in rea['left'] if l in entity_uniprot],
'right':
[entity_uniprot[r] for r in rea['right'] if r in entity_uniprot]
}
return result
def _process_controls(controls, entity_uniprot, reactions_uniprots,
publications):
result = {}
for cref, ctrl in iteritems(controls):
result[cref] = {
'type': ctrl['type'],
'refs':
[publications[r] for r in ctrl['refs'] if r in publications]
if 'refs' in ctrl else [],
'controller': entity_uniprot[ctrl['controller']]
if ctrl['controller'] in entity_uniprot else None,
'controlled': reactions_uniprots[ctrl['controlled']]
if ctrl['controlled'] in reactions_uniprots else None
}
return result
[docs]
def process_complex(depth, cref, entity_uniprot, complexes, complexvariations,
cplex, stoichiometries):
log = open('reactome.log', 'a')
tabs = '\t' * (depth + 1)
log.write('%sStarting processing %s, depth = %u\n' %
(tabs[1:], cref, depth))
this_cplex = [{'members': [], 'ptms': {}}]
log.write('%sComplex %s have %u member entities\n' %
(tabs, cref, len(cplex)))
for stoi in cplex:
if stoi in stoichiometries:
ref, num = stoichiometries[stoi]
log.write('%sNew member entity: %s, stoichiometric coeff: %u\n' %
(tabs, ref, num))
if ref.startswith('Complex') \
and ref not in entity_uniprot:
if ref in complexes:
log.write(
'%s%s is a complex with %u subentities, and hasn\'t been processed yet\n'
% (tabs, ref, len(complexes[ref])))
process_complex(depth + 1, ref, entity_uniprot, complexes,
complexvariations, complexes[ref],
stoichiometries)
if ref in complexvariations:
log.write(
'%s%s is a complex group with %u variations, and hasn\'t been processed yet\n'
% (tabs, ref, len(complexvariations[ref])))
entity_uniprot[ref] = []
for mref in complexvariations[ref]:
if mref not in entity_uniprot and mref in complexes:
log.write(
'%s%s is a complex with %u subentities, and hasn\'t been processed yet\n'
% (tabs, mref, len(complexes[mref])))
process_complex(depth + 1, mref, entity_uniprot,
complexes, complexvariations,
complexes[mref], stoichiometries)
if mref in entity_uniprot:
log.write(
'%s%s is now processed, adding it as an instance of %s\n'
% (tabs, mref, ref))
entity_uniprot[ref].extend(entity_uniprot[mref])
if ref in entity_uniprot:
log.write(
'%s%s is an already processed entity, with %u variants and %u members\n'
% (tabs, ref, len(entity_uniprot[ref]),
len(entity_uniprot[ref][0]['members'])
if len(entity_uniprot[ref]) > 0 else 0))
log.write(
'%sNumber of variants after processing %s: %u x %u = %u\n'
% (tabs, ref, len(this_cplex), len(entity_uniprot[ref]),
len(this_cplex) * len(entity_uniprot[ref])))
this_cplex_new = []
for var in this_cplex:
i = 0
for new_member in entity_uniprot[ref]:
var_new = copy.deepcopy(var)
var_new['members'].extend(new_member['members'] * num)
for u, ptm in iteritems(new_member['ptms']):
if u not in var_new['ptms']:
var_new['ptms'][u] = set([])
var_new['ptms'][u] = var_new['ptms'][
u] | new_member['ptms'][u]
this_cplex_new.append(var_new)
i += 1
this_cplex = this_cplex_new
log.write('%sNumber of variants after processing %s: %u\n' %
(tabs, ref, len(this_cplex)))
log.write('%sNumber of members in %s: %u\n' %
(tabs, cref, len(this_cplex[0]['members'])
if len(this_cplex) > 0 else 0))
else:
log.write('%sPermanently missing: %s\n' % (tabs, ref))
log.write('%sFinished processing %s, found %u variants with %u members\n' %
(tabs[1:], cref, len(this_cplex), len(this_cplex[0]['members'])
if len(this_cplex) > 0 else 0))
if cref not in entity_uniprot:
entity_uniprot[cref] = []
entity_uniprot[cref].extend(this_cplex)
[docs]
def reactome_interactions(cacheFile = None, ask = True, **kwargs):
"""
Downloads and processes Reactome BioPAX.
Extracts binary interactions.
The applied criteria are very stringent, yields very few interactions.
Requires large free memory, approx. 2G.
"""
cacheFile = os.path.join(
cache_mod.get_cachedir(),
'reactome.interactions.pickle'
) if cacheFile is None else cacheFile
if os.path.exists(cacheFile):
interactions = pickle.load(open(cacheFile, 'rb'))
elif ask:
while True:
sys.stdout.write(
'\n\tProcessing Reactome requires huge memory.\n'
'\tPlease hit `y` if you have at least 2G free memory,\n'
'\tor `n` to omit Reactome.\n'
'\tAfter processing once, it will be saved in \n'
'\t%s, so next time can be loaded quickly.\n\n'
'\tProcess Reactome now? [y/n]\n' % cacheFile)
sys.stdout.flush()
answer = input().lower()
if answer in {'y', 'n'}:
break
else:
answer = 'y'
if answer == 'y':
return get_interactions('reactome', **kwargs)
else:
return []
[docs]
def acsn_interactions_2(**kwargs):
return get_interactions('acsn', **kwargs)
[docs]
def pid_interactions(**kwargs):
return get_interactions('pid', **kwargs)
[docs]
def panther_interactions(**kwargs):
return get_interactions('panther', **kwargs)
[docs]
def get_interactions(source, mandatory_refs = True):
ctrls = get_controls(source)
return process_controls(ctrls, mandatory_refs)[0]
[docs]
def get_controls(source, protein_name_type = None):
name_types = {
'acsn': 'HGNC',
'reactome': 'UniProt',
'pid': 'UniProt',
'panther': 'UniProt'
}
if protein_name_type is None and source in name_types:
protein_name_type = name_types[source]
biopax = globals()['%s_biopax' % source]
bpfile = biopax()
if type(bpfile) is list:
result = {}
for bpf in bpfile:
result = dict(
reactions_biopax(
bpf, protein_name_type = protein_name_type).items() +
result.items())
else:
result = reactions_biopax(bpfile, protein_name_type = protein_name_type)
return result
[docs]
def process_controls(controls, mandatory_refs = True):
interactions = set([])
ptms = []
regulations = []
prg = progress.Progress(len(controls), 'Processing interactions', 11)
for c in controls.values():
prg.step()
if len(c['refs']) > 0 or not mandatory_refs:
if c['controller'] is not None and len(c['controller']) > 0:
for ctr in c['controller']:
if len(common.unique_list(ctr['members'])) == 1:
this_ctr = ctr['members'][0].split('-')[0]
ctd = c['controlled']
if ctd is not None:
# ctd['left'] is not None and ctd['right'] is not
# None:
for leftInst in itertools.product(*ctd['left']):
for rightInst in itertools.product(
*ctd['right']):
lr = common.unique_list(
common.flat_list([
l['members'] for l in leftInst
] + [r['members'] for r in rightInst]))
if len(lr) == 1:
this_ctd = lr[0].split('-')[0]
interactions.add((
this_ctr, this_ctd, c['type'],
';'.join(c['refs'] if len(c[
'refs']) > 0 else ctd['refs']),
'directed'))
else:
modDiff = {}
ptmsLeft = set(
[(ptms[0], ptm)
for l in leftInst
for ptms in l['ptms'].items()
for ptm in ptms[1]])
ptmsRight = set(
[(ptms[0], ptm)
for r in rightInst
for ptms in r['ptms'].items()
for ptm in ptms[1]])
ptmsDiff = ptmsLeft ^ ptmsRight
diffUniProts = common.unique_list(
[ptm[0] for ptm in ptmsDiff])
if len(diffUniProts) == 1:
this_ctd = diffUniProts[0].split(
'-')[0]
interactions.add(
(this_ctr, this_ctd, c['type'],
';'.join(c['refs'] if len(c[
'refs']) > 0 else ctd[
'refs']), 'directed'))
else:
lefts = [
set(l['members'])
for l in leftInst
]
rights = [
set(r['members'])
for r in rightInst
]
onlyLefts = [
l for l in lefts
if l not in rights
]
onlyRights = [
r for r in rights
if r not in lefts
]
diffs = []
for l in onlyLefts:
for r in onlyRights:
diff = l ^ r
if len(diff) == 1:
diffs.append(
list(diff))
diffs = common.unique_list(
common.flat_list(diffs))
if len(diffs) == 1:
this_ctd = diffs[0].split('-')[
0]
interactions.add(
(this_ctr, this_ctd,
c['type'],
';'.join(c['refs'] if len(
c['refs']) > 0 else
ctd['refs']),
'undirected'))
# if the controller is unknown
# and the reaction has only 2 proteins
# these most probably bind each other
# to form a complex
else:
ctd = c['controlled']
if ctd is not None:
for leftInst in itertools.product(*ctd['left']):
for rightInst in itertools.product(*ctd['right']):
lr = common.unique_list(
common.flat_list([
l['members'] for l in leftInst
] + [r['members'] for r in rightInst]))
if len(lr) == 2:
interactions.add(
(lr[0].split('-')[0], lr[1].split('-')[0],
c['type'], ';'.join(ctd['refs'])))
prg.terminate()
return list(interactions), ptms, regulations
# Process Reactome SBML
def _reactome_id(obj, attr):
return _reactome_extract_id(obj.attrs[attr])
def _reactome_extract_id(value):
return int(value.split('_')[1])
def _reactome_res(obj):
return _reactome_extract_res(obj.attrs['rdf:resource'])
def _reactome_extract_res(value):
return value.split(':')[-1]
def _reactome_reactions():
species = {}
compartments = {}
reactions = {}
soups = reactome_bs()
for pw, soup in soups:
m = soup.find('model')
for cp in m.find('listofcompartments').find_all('compartment'):
compartments[_reactome_id(cp, 'id')] = cp.attrs['name']
for sp in m.find('listofspecies').find_all('species'):
cp = _reactome_id(sp, 'compartment')
si = _reactome_id(sp, 'id')
nm = sp.attrs['name']
ids = []
for i in sp.find('bqbiol:haspart').find_all('rdf:li'):
ids.append(_reactome_res(i))
ids = sorted(common.unique_list(ids))
species[si] = {'name': nm, 'comp': cp, 'ids': ids}
for rea in m.find('listofreactions').find_all('reaction'):
ri = _reactome_id(rea, 'id')
refs = []
for r in rea.find('bqbiol:isdescribedby').find_all('rdf:li'):
refs.append(_reactome_res(r))
refs = sorted(common.unique_list(refs))
reas = []
for r in rea.find('listofreactants').find_all('speciesreference'):
reas.append(_reactome_id(r, 'species'))
reas = sorted(common.unique_list(reas))
prds = []
for p in rea.find('listofproducts').find_all('speciesreference'):
prds.append(_reactome_id(p, 'species'))
prds = sorted(common.unique_list(prds))
note = rea.find('notes').text
reactions[ri] = {
'refs': refs,
'reas': reas,
'prds': prds,
'note': note
}
return compartments, species, reactions
def _reactome_reactions_et():
sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}'
compStr = '%scompartment' % sbmlPfx
reacStr = '%sreaction' % sbmlPfx
specStr = '%sspecies' % sbmlPfx
species = {}
compartments = {}
reactions = {}
sbml = reactome_sbml()
for pw_sbml in sbml.values():
ctx = etree.iterparse(pw_sbml, events = ('end', ))
for ev, elem in ctx:
if elem.tag == compStr:
k, v = _reactome_compartment(elem)
compartments[k] = v
elif elem.tag == reacStr:
k, v = _reactome_reaction(elem)
reactions[k] = v
elif elem.tag == specStr:
k, v = _reactome_species(elem)
species[k] = v
elem.clear()
while elem.getprevious() is not None:
del elem.getparent()[0]
return compartments, species, reactions
def _reactome_compartment(elem):
ci = _reactome_extract_id(elem.get('id'))
nm = elem.get('name')
return ci, nm
def _reactome_species(elem):
bqBiolPfx = '{http://biomodels.net/biology-qualifiers/}'
rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
hasPartStr = '%shasPart' % bqBiolPfx
resStr = '%sresource' % rdfPfx
si = _reactome_extract_id(elem.get('id'))
cp = _reactome_extract_id(elem.get('compartment'))
nm = elem.get('name')
ids = sorted(
common.unique_list(_reactome_collect_resources(elem, hasPartStr)))
return si, {'name': nm, 'comp': cp, 'ids': ids}
def _reactome_reaction(elem):
bqBiolPfx = '{http://biomodels.net/biology-qualifiers/}'
rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}'
specStr = 'species'
spRefStr = '%sspeciesReference' % sbmlPfx
isDescStr = '%sisDescribedBy' % bqBiolPfx
resStr = '%sresource' % rdfPfx
lofReaStr = '%slistOfReactants' % sbmlPfx
lofPrdStr = '%slistOfProducts' % sbmlPfx
ri = _reactome_extract_id(elem.get('id'))
refs = _reactome_collect_resources(elem, isDescStr)
reas = _reactome_collect_species(elem, lofReaStr)
prds = _reactome_collect_species(elem, lofPrdStr)
note = elem.find('note').text # prefix?
return ri, {'refs': refs, 'reas': reas, 'prds': prds, 'note': note}
def _reactome_collect_resources(elem, tag):
rdfPfx = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
resStr = '%sresource' % rdfPfx
liStr = '%sli' % rdfPfx
res = []
for i in elem.find('.//%s' % tag).iterfind('.//%s' % liStr):
res.append(_reactome_extract_res(i.get(resStr)))
return res
def _reactome_collect_species(elem, tag):
sbmlPfx = '{http://www.sbml.org/sbml/level2/version4}'
spRefStr = '%sspeciesReference' % sbmlPfx
specStr = 'species'
res = []
for sp in elem.find('.//%s' % tag).iterfind('.//%s' % spRefStr):
res.apped(_reactome_extract_id(sp.get(specStr)))
return res
[docs]
def get_acsn_effects():
"""
Processes ACSN data, returns list of effects.
"""
negatives = set(['NEGATIVE_INFLUENCE', 'UNKNOWN_NEGATIVE_INFLUENCE'])
positives = set(
['TRIGGER', 'POSITIVE_INFLUENCE', 'UNKNOWN_POSITIVE_INFLUENCE'])
directed = set([
'UNKNOWN_TRANSITION', 'INTERACTION_TYPE', 'KNOWN_TRANSITION_OMITTED',
'INHIBITION', 'UNKNOWN_POSITIVE_INFLUENCE', 'PROTEIN_INTERACTION',
'UNKNOWN_CATALYSIS', 'POSITIVE_INFLUENCE', 'STATE_TRANSITION',
'TRANSLATION', 'UNKNOWN_NEGATIVE_INFLUENCE', 'NEGATIVE_INFLUENCE',
'MODULATION', 'TRANSCRIPTION', 'COMPLEX_EXPANSION', 'TRIGGER',
'CATALYSIS', 'PHYSICAL_STIMULATION', 'UNKNOWN_INHIBITION', 'TRANSPORT'
])
data = acsn_input.acsn_interactions()
effects = []
for l in data:
if len(l) == 4:
eff = set(l[2].split(';'))
if len(eff & negatives) > 0:
effects.append([l[0], l[1], '-'])
elif len(eff & positives) > 0:
effects.append([l[0], l[1], '+'])
elif len(eff & directed) > 0:
effects.append([l[0], l[1], '*'])
return effects
[docs]
def get_reactions(types = None, sources = None):
if type(types) is list:
types = set(types)
if type(sources) is list:
sources = set(sources)
cachefile = os.path.join(
cache_mod.get_cachedir(),
'reaction_interactions_by_source.pickle'
)
if os.path.exists(cachefile):
interactions = pickle.load(open(cachefile, 'rb'))
else:
import pypath.utils.pyreact as pyreact
rea = pyreact.PyReact()
rea.load_all()
rea.expand_by_source()
interactions = rea.interactions_by_source
for i in interactions:
if (sources is None or i[4] in sources) and \
(types is None or len(i[2] & types)):
yield [
i[0], i[1],
';'.join(list(i[2] if types is None else i[2] & types)),
str(int(i[3])), i[4], ';'.join(list(i[5]))
]