#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the `pypath` python module
#
# Copyright
# 2014-2020
# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
# File author(s): Dénes Türei (turei.denes@gmail.com)
# Nicolàs Palacio
# Olga Ivanova
#
# Distributed under the GPLv3 License.
# See accompanying file LICENSE.txt or copy at
# http://www.gnu.org/licenses/gpl-3.0.html
#
# Website: http://pypath.omnipathdb.org/
#
#
# This module makes possible
# dynamic data integration, downloads
# files from various resources, in standard
# or non-standard text based and xml formats,
# processes them, can parse html.
#
#TODO make a cleaner separation, remove/refactor repeating code
from __future__ import print_function
from future.utils import iteritems
from past.builtins import xrange, range
import pypath.session_mod as session_mod
_logger = session_mod.Logger(name = 'dataio')
_log = _logger._log
_console = _logger._console
import urllib
try:
import urllib2
except ImportError:
# this works seemless in Py3:
import urllib.request
urllib2 = urllib.request
import httplib2
try:
import urlparse
except:
# this works seemless in Py3:
import urllib.parse
urlparse = urllib.parse
try:
import cPickle as pickle
except:
import pickle
try:
from cStringIO import StringIO
except:
try:
from StringIO import StringIO
from StringIO import StringIO as BytesIO
except:
from io import BytesIO
from io import StringIO
import sys
import os
import io
import re
import time
import itertools
import collections
from collections import Counter
import functools
import gzip
import xlrd
import bs4
import xml.etree.cElementTree as ET
from lxml import etree
import time
import copy
import struct
import json
import csv
import pycurl
import webbrowser
import requests
import codecs
import base64
try:
import bioservices
except ImportError:
_log('Module `bioservices` not available.', -1)
from xlrd import open_workbook
from xlrd.biffh import XLRDError
# from this module
import pypath.mapping as mapping
import pypath.reflists as reflists
import pypath.uniprot_input as uniprot_input
import pypath.curl as curl
import pypath.urls as urls
import pypath.progress as progress
import pypath.common as common
import pypath.intera as intera
# from pypath import reaction
import pypath.residues as residues
import pypath.settings as settings
import pypath.taxonomy as taxonomy
import pypath.homology as homology_mod
if 'long' not in __builtins__:
long = int
if 'unicode' not in __builtins__:
unicode = str
CURSOR_UP_ONE = '\x1b[1A'
ERASE_LINE = '\x1b[2K'
# UniProt ID with isoform e.g. O14754-1
reupi = re.compile(r'([\w]{6,10})(?:-([0-9]{1,2}))?')
#
# thanks for http://stackoverflow.com/a/3239248/854988
#
CellPhoneDBAnnotation = collections.namedtuple(
'CellPhoneDBAnnotation',
(
'receptor',
'receptor_class',
'peripheral',
'secreted',
'secreted_class',
'transmembrane',
'integrin',
)
)
def read_xls(xls_file, sheet = '', csv_file = None, return_table = True):
try:
book = open_workbook(xls_file, on_demand = True)
try:
sheet = book.sheet_by_name(sheet)
except XLRDError:
sheet = book.sheet_by_index(0)
table = [[str(c.value) for c in sheet.row(i)]
for i in xrange(sheet.nrows)]
if csv_file:
with open(csv_file, 'w') as csv:
csv.write('\n'.join(['\t'.join(r) for r in table]))
if return_table:
return table
except IOError:
_console('No such file: %s\n' % xls_file)
[docs]def read_table(cols,
fileObject = None,
data = None,
sep = '\t',
sep2 = None,
rem = [],
hdr = None,
encoding = 'ascii'):
"""
Generic function to read data tables.
fileObject : file-like
Any file like object: file opened for read, or StringIO buffer
cols : dict
Dictionary of columns to read. Keys identifying fields are returned
in the result. Values are column numbers.
sepLevel1 : str
Field separator of the file.
sepLevel2 : dict
Subfield separators and prefixes.
E.g. {2: ',', 3: '|'}
hdr : int
Number of header lines. If None, no headers assumed.
rem : list
Strings to remove. For each line these elements will be replaced with ''.
"""
if data is None:
if hasattr(fileObject, 'readline'):
fileObject.seek(0)
if hdr:
for h in xrange(0, hdr):
_ = next(fileObject)
data = fileObject
else:
data = [l.strip() for l in data.split('\n') if len(l) > 0][hdr:]
res = []
for l in data:
if type(l) is bytes:
l = l.decode(encoding)
for r in rem:
l = l.replace(r, '')
l = [f.strip() for f in l.split(sep)]
if len(l) > max(cols.values()):
dic = {}
for name, col in iteritems(cols):
field = l[col].strip()
if sep2 is not None:
field = [
sf.strip() for sf in field.split(sep2) if len(sf) > 0
]
dic[name] = field
res.append(dic)
if fileObject is not None:
fileObject.close()
return res
def all_uniprots(organism = 9606, swissprot = None):
return uniprot_input.all_uniprots(organism, swissprot)
def get_pdb():
c = curl.Curl(urls.urls['uniprot_pdb']['url'], silent = False)
data = c.result
if data is None:
return None, None
data = data.split('\n')
u_pdb = {}
pdb_u = {}
pdb = None
pdb_re = re.compile(r'[0-9A-Z]{4}')
for l in data:
l = re.split('[ ]{2,}',
re.sub('[ ]+,[ ]+', ',', re.sub(r'[ ]*\(', '(', l)))
if len(l[0]) == 4 and pdb_re.match(l[0]):
pdb = l[0].lower()
res = None if l[2] == '-' else float(l[2].replace(' A', ''))
met = l[1]
if pdb is not None and len(l) > 1:
uniprots = l[1] if len(l) < 4 else l[3]
uniprots = [
u.split('(')[1].replace(')', '') for u in uniprots.split(',')
if '(' in u
]
pdb_u[pdb] = uniprots
for u in uniprots:
if u not in u_pdb:
u_pdb[u] = []
u_pdb[u].append((pdb, met, res))
return u_pdb, pdb_u
def pdb_complexes(organism = None):
complexes = {}
uniprot_pdb, pdb_uniprot = get_pdb_chains()
del uniprot_pdb
for pdb_id, chains in iteritems(pdb_uniprot):
uniprots = tuple(chain['uniprot'] for chain in chains.values())
if len(uniprots) == 1:
continue
# if the organism set and any of the UniProt IDs does not
# belong to this organism we drop the complex
if organism and reflists.is_not(uniprots, 'uniprot', organism):
continue
cplex = intera.Complex(
components = uniprots,
sources = 'PDB',
ids = pdb_id,
)
if cplex.__str__() in complexes:
complexes[cplex.__str__()] += cplex
else:
complexes[cplex.__str__()] = cplex
return complexes
def get_pfam(uniprots = None, organism = 9606):
if uniprots is None:
uniprots = all_uniprots(organism = organism, swissprot = True)
u_pfam = {}
pfam_u = {}
if uniprots is not None:
prg = progress.Progress(
len(uniprots) / 30, 'Downloading data from UniProt', 1)
data_all = []
for i in xrange(0, len(uniprots), 30):
to = i + 30
thisPart = uniprots[i:to]
thisPart = ' OR '.join(['accession:%s' % u for u in thisPart])
get = {
'query': thisPart,
'format': 'tab',
'columns': 'id,database(Pfam)'
}
for j in xrange(3):
c = curl.Curl(urls.urls['uniprot_basic']['url'], get = get)
data = c.result
if data is not None:
break
if data is None:
return None, None
data = data.split('\n')
del data[0]
del data[-1]
data_all += data
prg.step()
prg.terminate()
else:
if type(organism) is not int:
try:
organism = int(organism)
except:
return None, None
organismQuery = 'organism:%u AND reviewed:yes' % organism
get = {
'query': organismQuery,
'format': 'tab',
'columns': 'id,database(Pfam)'
}
for j in xrange(3):
c = curl.Curl(
urls.urls['uniprot_basic']['url'],
get = get,
silent = False,
outf = 'uniprot-pfam-%u.tab' % organism)
data_all = c.result
if data_all is not None:
break
if data_all is None:
return None
data_all = data_all.split('\n')
del data_all[0]
for l in data_all:
l = l.split('\t')
pfams = [] if len(l) < 2 else re.sub(';$', '', l[1]).split(';')
if l[0] not in u_pfam:
u_pfam[l[0]] = []
u_pfam[l[0]] += pfams
for pfam in pfams:
if pfam not in pfam_u:
pfam_u[pfam] = []
pfam_u[pfam].append(l[0])
return u_pfam, pfam_u
def get_pfam_regions(uniprots = [], pfams = [], keepfile = False, dicts = 'both'):
url = urls.urls['pfam_up']['url']
outf = url.split('/')[-1]
urlmd5 = common.md5(url)
if not os.path.exists(settings.get('cachedir')):
os.makedirs(settings.get('cachedir'))
cachefile = os.path.join(settings.get('cachedir'), urlmd5 + '-' + outf)
u_pfam = {}
pfam_u = {}
uniprots = set(uniprots)
pfams = set(pfams)
if not os.path.exists(cachefile):
sys.stdout.write(
'\t:: Downloading data from %s' %
url.replace('http://', '').replace('ftp://', '').split('/')[0])
sys.stdout.flush()
if hasattr(urllib, 'urlretrieve'):
urllib.urlretrieve(url, cachefile)
else:
urllib.request.urlretrieve(url, cachefile)
sys.stdout.write('\n')
with open(cachefile, 'rb') as f:
f.seek(-4, 2)
gzsize = struct.unpack('<I', f.read())[0]
prg = progress.Progress(gzsize, 'Processing Pfam domains', 11)
with gzip.open(cachefile, 'r') as f: # FIXME: Something went wrong here
for l in f:
prg.step(len(l))
l = l.strip().split()
if l[0] in uniprots or l[4] in pfams:
if dicts in ['uniprot', 'both']:
if l[0] not in u_pfam:
u_pfam[l[0]] = {}
if l[4] not in u_pfam[l[0]]:
u_pfam[l[0]][l[4]] = []
u_pfam[l[0]][l[4]].append({
'isoform': int(l[1]),
'start': int(l[5]),
'end': int(l[6])
})
if dicts in ['pfam', 'both']:
if l[4] not in pfam_u:
pfam_u[l[4]] = {}
if l[0] not in pfam_u[l[4]]:
pfam_u[l[4]][l[0]] = []
pfam_u[l[4]][l[0]].append({
'isoform': int(l[1]),
'start': int(l[5]),
'end': int(l[6])
})
prg.terminate()
if not keepfile:
os.remove(cachefile)
if dicts == 'uniprot':
return u_pfam
elif dicts == 'pfam':
return pfam_u
else:
return u_pfam, pfam_u
def get_pfam_names():
c = curl.Curl(urls.urls['pfam_pdb']['url'], silent = False)
data = c.result
if data is None:
return None, None
dname_pfam = {}
pfam_dname = {}
data = data.replace('\r', '').split('\n')
del data[0]
for l in data:
l = l.split('\t')
if len(l) > 5:
pfam = l[4].split('.')[0]
name = l[5]
if pfam not in pfam_dname:
pfam_dname[pfam] = []
if name not in dname_pfam:
dname_pfam[name] = []
pfam_dname[pfam].append(name)
dname_pfam[name].append(pfam)
for k, v in iteritems(pfam_dname):
pfam_dname[k] = list(set(v))
for k, v in iteritems(dname_pfam):
dname_pfam[k] = list(set(v))
return dname_pfam, pfam_dname
def get_pfam_pdb():
non_digit = re.compile(r'[^\d.-]+')
c = curl.Curl(urls.urls['pfam_pdb']['url'], silent = False)
data = c.result
if data is None:
return None, None
pdb_pfam = {}
pfam_pdb = {}
data = data.replace('\r', '').split('\n')
del data[0]
for l in data:
l = l.split('\t')
if len(l) > 4:
pfam = l[4].split('.')[0]
pdb = l[0].lower()
chain = l[1]
start = int(non_digit.sub('', l[2]))
end = int(non_digit.sub('', l[3]))
if pdb not in pdb_pfam:
pdb_pfam[pdb] = {}
if pfam not in pfam_pdb:
pfam_pdb[pfam] = {}
pdb_pfam[pdb][pfam] = [chain, start, end]
pfam_pdb[pfam][pdb] = [chain, start, end]
return pdb_pfam, pfam_pdb
def corum_complexes(organism = 9606):
annots = (
'mithocondr',
'nucleus',
'endoplasmic reticulum',
'cytoplasm',
'transcriptional control',
'vesicle docking',
'extracellular matrix component',
'cell-matrix adhesion',
'cytokines',
'cell death',
'integrin receptor signalling pathway',
'eukaryotic plasma membrane',
'nuclear membrane',
'cellular export and secretion',
'cell-substrate adherens junction',
'cytoskeleton',
'receptor binding',
'nucleolus',
'transmembrane signal transduction',
'transcription',
'modification by phosphorylation',
'cell-cell adhesion',
'intercellular junction',
'ion transport',
'cell adhesion',
'cell junction',
'endocytosis',
)
organism = taxonomy.ensure_ncbi_tax_id(organism)
complexes = {}
c = curl.Curl(
urls.urls['corum']['url'],
silent = False,
large = True,
files_needed = ['allComplexes.txt'],
)
tab = csv.DictReader(c.result['allComplexes.txt'], delimiter = '\t')
for rec in tab:
cplex_organism = rec['Organism']
if taxonomy.ensure_ncbi_tax_id(cplex_organism) != organism:
continue
uniprots = rec['subunits(UniProt IDs)'].split(';')
pubmeds = rec['PubMed ID'].split(';')
name = rec['ComplexName']
cplex = intera.Complex(
name = name,
components = uniprots,
sources = 'CORUM',
references = pubmeds,
ids = rec['ComplexID'],
attrs = {
'funcat': set(rec['FunCat description'].split(';')),
'go': set(rec['GO description'].split(';')),
},
)
if cplex.__str__() in complexes:
complexes[cplex.__str__()].references.update(set(pubmeds))
else:
complexes[cplex.__str__()] = cplex
return complexes
[docs]def complexportal_complexes(organism = 9606, return_details = False):
"""
Complex dataset from IntAct.
See more:
http://www.ebi.ac.uk/intact/complex/
http://nar.oxfordjournals.org/content/early/2014/10/13/nar.gku975.full.pdf
"""
spec = {9606: 'Homo_sapiens'}
zipurl = '%s/%s.zip' % (
urls.urls['complex_portal']['url'],
spec[organism],
)
c = curl.Curl(zipurl, large = True, silent = False)
files = c.result
errors = []
complexes = {}
details = []
name_key = 'complex recommended name'
for xmlname, xml in iteritems(c.result):
soup = bs4.BeautifulSoup(xml, 'html.parser')
interactors_xml = soup.find_all('interactor')
interactors = {}
interactions = {}
for i in interactors_xml:
if i.find('primaryref').attrs['db'] == 'uniprotkb':
interactors[i.attrs['id']] = i.find('primaryref').attrs['id']
interactions_xml = soup.find_all('interaction')
for i in interactions_xml:
description = ''
pubmeds = []
fullname = ''
names = {}
pdbs = []
uniprots = []
ids = collections.defaultdict(set)
for a in i.find_all('attribute'):
if a.attrs['name'] == 'curated-complex':
description = a.text
for sr in i.find_all('secondaryref'):
if sr.attrs['db'] == 'pubmed':
pubmeds.append(sr.attrs['id'])
if sr.attrs['db'] == 'wwpdb':
pdbs.append(sr.attrs['id'])
for pr in i.find_all('primaryref'):
if pr.attrs['db'] in {'wwpdb', 'rcsb pdb', 'pdbe'}:
pdbs.append(pr.attrs['id'])
for sr in i.find('xref').find_all('secondaryref'):
if (
'reftype' in sr.attrs and
sr.attrs['db'] in {'intact', 'reactome'} and
sr.attrs['reftype'] == 'identity'
):
ids[sr.attrs['db']].add(sr.attrs['id'])
pubmeds = list(set(pubmeds))
pdbs = list(set(pdbs))
fullname = (
None
if i.find('fullname') is None else
i.find('fullname').text
)
for a in i.find_all('alias'):
names[a.attrs['type']] = a.text
for intref in i.find_all('interactorref'):
int_id = intref.text
if int_id in interactors:
uniprot = interactors[int_id]
if uniprot.startswith('PRO'):
continue
uniprot = uniprot.split('-')[0]
uniprots.append(uniprot)
if uniprots:
if pdbs:
ids['PDB'].update(set(pdbs))
cplex = intera.Complex(
components = uniprots,
name = names[name_key] if name_key in names else None,
references = set(pubmeds),
sources = 'ComplexPortal',
ids = ids,
)
if cplex.__str__() in complexes:
complexes[cplex.__str__()] += cplex
else:
complexes[cplex.__str__()] = cplex
details.append({
'uniprots': uniprots,
'pdbs': pdbs,
'pubmeds': pubmeds,
'fullname': fullname,
'names': names,
'description': description
})
if return_details:
return complexes, details
else:
return complexes
[docs]def get_havugimana():
"""
Downloads data from
Supplement Table S3/1 from Havugimana 2012
Cell. 150(5): 1068–1081.
"""
url = urls.urls['havugimana']['url']
c = curl.Curl(url, silent = False, large = True)
fname = c.fileobj.name
del c
table = read_xls(fname)
return table[3:]
[docs]def havugimana_complexes():
"""
Retrieves complexes from
Supplement Table S3/1 from Havugimana 2012
Cell. 150(5): 1068–1081.
"""
complexes = {}
for rec in get_havugimana():
cplex = intera.Complex(
components = rec[2].split(','),
sources = 'Havugimana2012',
ids = rec[0],
)
complexes[cplex.__str__()] = cplex
return complexes
[docs]def compleat_complexes(predicted = True):
"""
Retrieves complexes from the Compleat database.
"""
url = urls.urls['compleat']['url']
c = curl.Curl(url, large = True, silent = False)
tab = list(csv.DictReader(
c.result,
delimiter = '\t',
fieldnames = (
'compleat_id',
'member_count',
'predicted',
'functions',
'functions2',
'nothing',
'sources',
'name',
'method',
'organisms',
'pubmeds',
'members',
)
))
complexes = {}
for rec in tab:
is_predicted = (
rec['predicted'] and
rec['predicted'].strip() == 'Predicted'
)
if is_predicted and not predicted:
continue
if not rec['members']:
continue
uniprots = []
for entrez in rec['members'].split():
uniprot = mapping.map_name0(entrez.strip(), 'entrez', 'uniprot')
if uniprot:
uniprots.append(uniprot)
if not uniprots:
continue
name = rec['name']
references = rec['pubmeds'].split(',') if rec['pubmeds'] else None
sources = set(rec['sources'].split(',')) if is_predicted else set()
sources.add('Compleat')
cplex = intera.Complex(
components = uniprots,
sources = sources,
references = references,
name = name,
ids = {'Compleat': rec['compleat_id']},
)
if cplex.__str__() in complexes:
complexes[cplex.__str__()] += cplex
else:
complexes[cplex.__str__()] = cplex
return complexes
def humap_complexes():
url = urls.urls['proteincomplexes']['url']
c = curl.Curl(url, large = True)
complexes = {}
for l in c.result:
l = l.strip().split()
for uniprots in itertools.product(*(
mapping.map_name(entrez, 'entrez', 'uniprot') for entrez in l
)):
cplex = intera.Complex(
components = uniprots,
sources = 'hu.MAP',
)
complexes[cplex.__str__()] = cplex
return complexes
def get_pdb_chains():
def to_int(i):
if i == 'None':
return None
return int(non_digit.sub('', i))
c = curl.Curl(urls.urls['pdb_chains']['url'], silent = False)
chains = c.result
if chains is None:
return None, None
chains = chains.replace('\r', '').split('\n')
del chains[0]
del chains[0]
pdb_u = {}
u_pdb = {}
non_digit = re.compile(r'[^\d.-]+')
for l in chains:
l = l.split('\t')
if len(l) > 8:
if l[0] not in pdb_u:
pdb_u[l[0]] = {}
pdb_u[l[0]][l[1]] = {
'uniprot': l[2],
'chain_beg': to_int(l[3]),
'chain_end': to_int(l[4]),
'pdb_beg': to_int(l[5]),
'pdb_end': to_int(l[6]),
'uniprot_beg': to_int(l[7]),
'uniprot_end': to_int(l[8])
}
if (
pdb_u[l[0]][l[1]]['pdb_end'] is not None and
pdb_u[l[0]][l[1]]['pdb_beg'] is not None and
pdb_u[l[0]][l[1]]['uniprot_beg'] is not None and
pdb_u[l[0]][l[1]]['uniprot_end'] is not None and
pdb_u[l[0]][l[1]]['pdb_end'] - pdb_u[l[0]][l[1]]['pdb_beg'] == \
pdb_u[l[0]][l[1]]['uniprot_end'] - pdb_u[l[0]][l[1]]['uniprot_beg']
):
pdb_u[l[0]][l[1]]['offset'] = (pdb_u[l[0]][l[1]]['uniprot_beg']
- pdb_u[l[0]][l[1]]['pdb_beg'])
else:
pdb_u[l[0]][l[1]]['offset'] = None
if l[2] not in u_pdb:
u_pdb[l[2]] = []
u_pdb[l[2]].append({
'pdb': l[0],
'chain': l[1],
'chain_beg': to_int(l[3]),
'chain_end': to_int(l[4]),
'pdb_beg': to_int(l[5]),
'pdb_end': to_int(l[6]),
'uniprot_beg': to_int(l[7]),
'uniprot_end': to_int(l[8]),
'offset': pdb_u[l[0]][l[1]]['offset']
})
return u_pdb, pdb_u
[docs]def get_3dcomplex():
"""
Downloads and preprocesses data from the 3DComplex database.
Returns dict of dicts where top level keys are PDB IDs, second level
keys are pairs of tuples of UniProt IDs and values are list with the
number of amino acids in contact.
"""
c = curl.Curl(urls.urls['3dcomplexes_contact']['url'], silent = False)
contact = c.result
c = curl.Curl(urls.urls['3dcomplexes_correspondancy']['url'], silent = False)
corresp = c.result
u_pdb, pdb_u = get_pdb_chains()
del u_pdb
if contact is None or corresp is None or pdb_u is None:
return None
contact = contact.split('\n')
corresp = corresp.split('\n')
del contact[0]
corr_dict = {}
for l in corresp:
l = l.replace('\r', '').split('\t')
if len(l) > 2:
pdb = l[0].split('.')[0]
if pdb not in corr_dict:
corr_dict[pdb] = {}
corr_dict[pdb][l[1]] = l[2]
compl_dict = {}
for l in contact:
l = l.replace('\r', '').split('\t')
if len(l) > 11 and int(l[11]) == 0 and int(l[10]) == 0:
compl = l[0]
pdb = compl.split('_')[0]
if pdb in corr_dict:
if l[1] in corr_dict[pdb] and l[2] in corr_dict[pdb]:
ch1 = corr_dict[pdb][l[1]]
ch2 = corr_dict[pdb][l[2]]
if pdb in pdb_u and ch1 in pdb_u[pdb]:
up1 = pdb_u[pdb][ch1]['uniprot']
if pdb in pdb_u and ch2 in pdb_u[pdb]:
up2 = pdb_u[pdb][ch2]['uniprot']
if compl not in compl_dict:
compl_dict[compl] = {}
uniprots = [up1, up2]
uniprots.sort()
uniprots = tuple(uniprots)
if uniprots not in compl_dict[compl]:
compl_dict[compl][uniprots] = []
compl_dict[compl][uniprots].append(float(l[3]))
return compl_dict
def _3dcomplex_complexes():
pass
def domino_interactions():
domino = get_domino()
inter = []
for l in domino:
if (
l[0] and
l[1] and
''.join(l[5]) and
''.join([
l[i]
for i in range(10, 12) + range(14, 22) + range(24, 26)
]) and
l[28] != '1'
):
inter.append(l)
return inter
def get_domino_ddi():
domi = get_domino_ptms()
return domi['ddi']
[docs]def get_domino_ptms():
"""
The table comes from dataio.get_domino(), having the following fields:
header = ['uniprot-A', 'uniprot-B', 'isoform-A', 'isoform-B', #3
'exp. method', 'references', 'taxon-A', 'taxon-B', #7
'role-A', 'role-B', 'binding-site-range-A', 'binding-site-range-B', #11
'domains-A', 'domains-B', 'ptm-residue-A', 'ptm-residue-B', #15
'ptm-type-mi-A', 'ptm-type-mi-B', 'ptm-type-A', 'ptm-type-B', #19
'ptm-res-name-A', 'ptm-res-name-B', 'mutations-A', 'mutations-B', #23
'mutation-effects-A', 'mutation-effects-B', 'domains-interpro-A', #26
'domains-interpro-B', 'negative'] #28
"""
domino = get_domino()
try:
miont = get_ontology('MI')
except:
miont = {}
dmi = []
ddi = []
prg = progress.Progress(len(domino), 'Processing DOMINO', 11)
for l in domino:
prg.step()
if (l[14].strip() != '' or l[15].strip() != '' or
(l[10] != '' and l[11] != '')) and len(l[0]) > 0 and len(l[1]) > 0:
uniprot1 = l[0]
uniprot2 = l[1]
# ptms
if '-' not in l[14] and '-' not in l[15]:
ptmre12 = [] if len(l[14]) == 0 else \
[int(x) for x in l[14].split(';')]
ptmre21 = [] if len(l[15]) == 0 else \
[int(x) for x in l[15].split(';')]
ptmty12 = [None for _ in ptmre12] if len(l[16]) == 0 else \
l[16].split(';')
ptmty12 = [
None if x not in miont else miont[x] for x in ptmty12
]
ptmrn12 = [None for _ in ptmre12] if len(l[20]) == 0 else \
l[20].split(';')
ptmrn12 = [
None if x is None or x == '' or
len(x) < min(ptmre12[i] - 1, 11) else x[10]
if ptmre12[i] > 10 else x[ptmre12[i] - 1]
for i, x in enumerate(ptmrn12)
]
ptmty21 = [None for _ in ptmre21] if len(l[17]) == 0 else \
l[17].split(';')
ptmty21 = [
None if x not in miont else miont[x] for x in ptmty21
]
ptmrn21 = [None for _ in ptmre21] if len(l[21]) == 0 else \
l[21].split(';')
ptmrn21 = [
None if x is None or x == '' or
len(x) < min(ptmre21[i] - 1, 11) else x[10]
if ptmre21[i] > 10 else x[ptmre21[i] - 1]
for i, x in enumerate(ptmrn21)
]
for i, resnum in enumerate(ptmre12):
res = intera.Residue(resnum, ptmrn12[i], uniprot2)
ptm = intera.Ptm(uniprot2,
typ = ptmty12[i],
residue = res,
source = 'DOMINO')
dom = intera.Domain(uniprot1)
dm = intera.DomainMotif(
domain = dom,
ptm = ptm,
sources = 'DOMINO',
refs = l[5].split(';'))
# binding sites
if l[10] != '' and l[11] != '':
try:
bssrt1 = [
int(x.split('-')[0]) for x in l[10].split(';')
if x != '' and x != '0'
]
bsend1 = [
int(x.split('-')[1]) for x in l[10].split(';')
if x != '' and x != '0'
]
bssrt2 = [
int(x.split('-')[0]) for x in l[11].split(';')
if x != '' and x != '0'
]
bsend2 = [
int(x.split('-')[1]) for x in l[11].split(';')
if x != '' and x != '0'
]
except:
sys.stdout.write('Error processing line:\n')
sys.stdout.write(l)
sys.stdout.write('\n')
sys.stdout.flush()
return None
bs1 = []
bs2 = []
if l[26] != '':
for i, n in enumerate(bssrt1):
bs1.append(
intera.Domain(
protein = uniprot1,
domain = l[26],
start = bssrt1[i],
end = bsend1[i],
domain_id_type = 'interpro',
isoform = l[2]))
else:
for i, n in enumerate(bssrt1):
mot = intera.Motif(
protein = uniprot1,
start = bssrt1[i],
end = bsend1[i],
isoform = l[2])
bs1.append(
intera.Ptm(protein = uniprot1,
motif = mot,
source = 'DOMINO',
isoform = l[2]))
if l[27] != '':
for i, n in enumerate(bssrt2):
bs2.append(
intera.Domain(
protein = uniprot2,
domain = l[27],
start = bssrt2[i],
end = bsend2[i],
domain_id_type = 'interpro',
isoform = l[3]))
else:
for i, n in enumerate(bssrt2):
mot = intera.Motif(
protein = uniprot2,
start = bssrt2[i],
end = bsend2[i],
isoform = l[3])
bs2.append(
intera.Ptm(
protein = uniprot2, motif = mot, source = 'DOMINO'))
for one in bs1:
for two in bs2:
if one.__class__.__name__ == 'Domain' and \
two.__class__.__name__ == 'Domain':
dd = intera.DomainDomain(
one, two, sources = 'DOMINO')
ddi.append(dd)
if one.__class__.__name__ == 'Domain' and \
two.__class__.__name__ == 'Ptm':
dm = intera.DomainMotif(
domain = one,
ptm = two,
sources = 'DOMINO',
refs = l[6].split(';'))
dmi.append(dm)
if two.__class__.__name__ == 'Domain' and \
one.__class__.__name__ == 'Ptm':
dm = intera.DomainMotif(
domain = two,
ptm = one,
sources = 'DOMINO',
refs = l[6].split(';'))
dmi.append(dm)
prg.terminate()
return {'ddi': ddi, 'dmi': dmi}
def get_3dc_ddi():
c = curl.Curl(urls.urls['3dcomplexes_contact']['url'], silent = False)
contact = c.result
c = curl.Curl(urls.urls['3dcomplexes_correspondancy']['url'], silent = False)
corresp = c.result
u_pdb, pdb_u = get_pdb_chains()
del u_pdb
if contact is None or corresp is None or pdb_u is None:
return None
contact = contact.split('\n')
corresp = corresp.split('\n')
del contact[0]
corr_dict = {}
ddi = []
uniprots = []
for l in corresp:
l = l.replace('\r', '').split('\t')
if len(l) > 2:
pdb = l[0].split('.')[0]
if pdb not in corr_dict:
corr_dict[pdb] = {}
corr_dict[pdb][l[1]] = l[2]
prg = progress.Progress(len(contact), 'Collecting UniProts', 9)
for l in contact:
prg.step()
l = l.replace('\r', '').split('\t')
if len(l) > 11 and int(l[11]) == 0 and int(l[10]) == 0:
pdb = l[0].split('_')[0]
if pdb in corr_dict:
if l[1] in corr_dict[pdb] and l[2] in corr_dict[pdb]:
ch1 = corr_dict[pdb][l[1]]
ch2 = corr_dict[pdb][l[2]]
if pdb in pdb_u and ch1 in pdb_u[pdb]:
up1 = pdb_u[pdb][ch1]['uniprot']
if pdb in pdb_u and ch2 in pdb_u[pdb]:
up2 = pdb_u[pdb][ch2]['uniprot']
uniprots += [up1, up2]
prg.terminate()
uniprots = list(set(uniprots))
u_pfam = get_pfam_regions(uniprots, dicts = 'uniprot')
prg = progress.Progress(len(contact), 'Processing contact information', 9)
for l in contact:
prg.step()
l = l.replace('\r', '').split('\t')
if len(l) > 11 and int(l[11]) == 0 and int(l[10]) == 0:
pdb = l[0].split('_')[0]
pfams1 = list(set([x.split('.')[0] for x in l[7].split(';')]))
pfams2 = list(set([x.split('.')[0] for x in l[9].split(';')]))
if pdb in corr_dict:
if l[1] in corr_dict[pdb] and l[2] in corr_dict[pdb]:
ch1 = corr_dict[pdb][l[1]]
ch2 = corr_dict[pdb][l[2]]
if pdb in pdb_u and ch1 in pdb_u[pdb]:
up1 = pdb_u[pdb][ch1]['uniprot']
if pdb in pdb_u and ch2 in pdb_u[pdb]:
up2 = pdb_u[pdb][ch2]['uniprot']
for pfam1 in pfams1:
for pfam2 in pfams2:
pfam1_details = [{
'start': None,
'end': None,
'isoform': 1
}]
pfam2_details = [{
'start': None,
'end': None,
'isoform': 1
}]
if up1 in u_pfam and pfam1 in u_pfam[up1]:
pfam1_details = u_pfam[up1][pfam1]
if up2 in u_pfam and pfam2 in u_pfam[up2]:
pfam2_details = u_pfam[up2][pfam2]
for pfam1_d in pfam1_details:
for pfam2_d in pfam2_details:
dom1 = intera.Domain(
protein = up1,
domain = pfam1,
start = pfam1_d['start'],
end = pfam1_d['end'],
isoform = pfam1_d['isoform'],
chains = {pdb: ch1})
dom2 = intera.Domain(
protein = up2,
domain = pfam2,
start = pfam2_d['start'],
end = pfam2_d['end'],
isoform = pfam2_d['isoform'],
chains = {pdb: ch2})
dd = intera.DomainDomain(
dom1,
dom2,
pdbs = pdb,
sources = '3DComplex',
contact_residues = float(l[3]))
ddi.append(dd)
prg.terminate()
return ddi
def pisa_bonds(lst, chains):
non_digit = re.compile(r'[^\d.-]+')
bonds = []
for bond in lst.find_all('bond'):
seqnum1 = int(non_digit.sub('', bond.find('seqnum-1').text))
seqnum2 = int(non_digit.sub('', bond.find('seqnum-2').text))
res1 = bond.find('res-1').text
res1 = res1 if res1 not in common.aaletters else common.aaletters[res1]
res2 = bond.find('res-2').text
res2 = res2 if res2 not in common.aaletters else common.aaletters[res2]
chain1 = bond.find('chain-1').text
chain2 = bond.find('chain-2').text
uniprot1 = None if chain1 not in chains else chains[chain1]
uniprot2 = None if chain2 not in chains else chains[chain2]
if uniprot1 is not None and uniprot2 is not None:
bonds.append({
'chain_1': chain1,
'uniprot_1': uniprot1,
'res_1': res1,
'seqnum_1': seqnum1,
'chain_2': chain2,
'uniprot_2': uniprot2,
'res_2': res2,
'seqnum_2': seqnum2
})
return bonds
def get_pisa(pdblist):
non_digit = re.compile(r'[^\d.-]+')
bond_types = {
'hbonds': 'h-bonds',
'sbridges': 'salt-bridges',
'covbonds': 'cov-bonds',
'ssbonds': 'ss-bonds'
}
interfaces = {}
cachefile = os.path.join(settings.get('cachedir'), 'pisa.pickle')
u_pdb, pdb_u = get_pdb_chains()
if os.path.exists(cachefile):
try:
interfaces = pickle.load(open(cachefile, 'rb'))
except:
pass
errors = []
p = 5
pdblist = list(set(pdblist) - set(interfaces.keys()))
prg = progress.Progress(
len(pdblist) / p, 'Downloading data from PDBe PISA', 1)
for i in xrange(0, len(pdblist), p):
to = i + p
thisPart = pdblist[i:to]
url = urls.urls['pisa_interfaces']['url'] + ','.join(thisPart)
c = curl.Curl(url, cache = False)
data = c.result
if data is None:
msg = 'Could not download: \n\t\t%s' % url
errors.append(msg)
continue
soup = bs4.BeautifulSoup(data, 'html.parser')
unmapped_residues = []
for pdb in soup.find_all('pdb_entry'):
pdb_id = pdb.find('pdb_code').text.lower()
interfaces[pdb_id] = {}
chains = {}
resconv = ResidueMapper()
if pdb_id in pdb_u:
for chain, chain_data in iteritems(pdb_u[pdb_id]):
chains[chain] = chain_data['uniprot']
for interface in pdb.find_all('interface'):
for b, t in iteritems(bond_types):
lst = interface.find(t)
if lst is not None:
bonds = pisa_bonds(lst, chains)
for bond in bonds:
uniprots = (bond['uniprot_1'],
bond['uniprot_2'])
if uniprots not in interfaces[pdb_id]:
css = non_digit.sub(
'', interface.find('css').text)
css = None if len(css) == 0 else float(css)
area = non_digit.sub(
'', interface.find('int_area').text)
area = None if len(area) == 0 else float(
area)
solv_en = non_digit.sub(
'', interface.find('int_solv_en').text)
solv_en = None if len(
solv_en) == 0 else float(solv_en)
stab_en = non_digit.sub(
'', interface.find('stab_en').text)
stab_en = None if len(
stab_en) == 0 else float(stab_en)
interfaces[pdb_id][uniprots] = \
intera.Interface(uniprots[0],
uniprots[1], source = 'PISA', pdb = pdb_id,
css = css,
solv_en = solv_en,
area = area,
stab_en = stab_en)
res1 = resconv.get_residue(pdb_id,
bond['seqnum_1'])
res2 = resconv.get_residue(pdb_id,
bond['seqnum_2'])
if res1 is not None and res2 is not None and \
res1['uniprot'] == uniprots[0] and \
res2['uniprot'] == uniprots[1]:
interfaces[pdb_id][uniprots].add_residues(
(res1['resnum'], bond['res_1'],
uniprots[0]),
(res2['resnum'], bond['res_2'],
uniprots[1]),
typ = b)
else:
unmapped_residues.append(
(pdb_id, bond['seqnum_1'],
bond['seqnum_2'], uniprots[0],
uniprots[1]))
pickle.dump(interfaces, open(cachefile, 'wb'), 2)
prg.step()
prg.terminate()
if len(errors) > 0:
sys.stdout.write('\t:: Failed to download %u files of total %u:\n\n' %
(len(errors), len(lst)))
for e in errors:
sys.stdout.write('\t' + e + '\n')
sys.stdout.flush()
return interfaces, unmapped_residues
def get_3did_ddi(residues = False, ddi_flat = None, organism = 9606):
if ddi_flat is None:
c = curl.Curl(urls.urls['3did_ddi']['url'], silent = False)
data = c.result
tmpfile = '3did_flat_tmp'
if data is None:
return None
with open(tmpfile, 'w') as f:
f.write(data)
lnum = data.count('\n')
del data
else:
tmpfile = ddi_flat
u_pfam, pfam_u = get_pfam(organism = organism)
u_pdb, pdb_u = get_pdb_chains()
if pfam_u is None or pdb_u is None:
return None
ddi = {}
interfaces = {}
pdblist = {}
ddi_collect = False
con_collect = False
non_digit = re.compile(r'[^\d.-]+')
with open(tmpfile, 'r') as f:
prg = progress.Progress(lnum, 'Reading data', 33)
for l in f:
prg.step()
if l.startswith('#=') and con_collect:
interfaces[(uniprot1, uniprot2, pdb)].append(this_interface)
con_collect = False
if l.startswith('#=ID'):
# new domain pair: attach previous to results:
if ddi_collect:
for u1 in uniprots1:
for u2 in uniprots2:
if u1 != u2 and len(pdblist) > 0:
if (u1, u2) not in ddi:
ddi[(u1, u2)] = {}
if (pfam1, pfam2) not in ddi[(u1, u2)]:
ddi[(u1, u2)][(pfam1, pfam2)] = {
'pdbs': pdblist
}
ddi_collect = False
pdblist = {}
l = l.split('\t')
pfam1 = l[3].split('(')[1].split('.')[0]
pfam2 = l[4].split('.')[0]
uniprots1 = [] if pfam1 not in pfam_u else pfam_u[pfam1]
uniprots2 = [] if pfam2 not in pfam_u else pfam_u[pfam2]
if len(set(uniprots1 + uniprots2)) > 1:
ddi_collect = True
elif l.startswith('#=3D'):
l = l.split('\t')
pdb = l[1]
chain1 = l[2].split(':')[0]
chain2 = l[3].split(':')[0]
if pdb in pdb_u and \
chain1 in pdb_u[pdb] and \
chain2 in pdb_u[pdb]:
uniprot1 = pdb_u[pdb][chain1]['uniprot']
uniprot2 = pdb_u[pdb][chain2]['uniprot']
if uniprot1 != uniprot2:
if pdb not in pdblist:
pdblist[pdb] = []
pdblist[pdb] = common.add_to_list(pdblist[pdb],
(uniprot1, uniprot2))
if residues:
#res1 = [int(i) for i in l[2].split(':')[1].split('-')]
#res2 = [int(i) for i in l[3].split(':')[1].split('-')]
if chain1 != chain2:
if pdb_u[pdb][chain1]['offset'] is not None and \
pdb_u[pdb][chain2]['offset'] is not None and \
pdb_u[pdb][chain1]['uniprot'] != \
pdb_u[pdb][chain2]['uniprot']:
con_collect = True
offset1 = pdb_u[pdb][chain1]['offset']
offset2 = pdb_u[pdb][chain2]['offset']
this_interface = common.Interface(
uniprot1, uniprot2, source = '3DID', pdb = pdb)
if (uniprot1, uniprot2, pdb) not in interfaces:
interfaces[(uniprot1, uniprot2, pdb)] = []
else:
con_collect = False
elif not residues or not con_collect:
continue
else:
l = l.split('\t')
if len(l) > 3:
rnum1 = int(non_digit.sub('', l[2])) + offset1
rnum2 = int(non_digit.sub('', l[3])) + offset2
this_interface.add_residues((rnum1, l[0], uniprot1),
(rnum2, l[1], uniprot2))
prg.terminate()
prg = progress.Progress(len(ddi), 'Processing interfaces', 99)
if residues:
for u, v1 in iteritems(ddi):
prg.step()
for d, v2 in iteritems(v1):
for p in v2['pdbs'].keys():
if (u[0], u[1], p) in interfaces:
ddi[u][d]['interfaces'] = interfaces[(u[0], u[1],
p)]
prg.terminate()
if ddi_flat is None:
os.remove(tmpfile)
if residues:
return ddi, interfaces
else:
return ddi
def get_3did(ddi_flat = None, res = True, organism = 9606, pickl = True):
resultfile = os.path.join(settings.get('cachedir'), '3did_ddi.pickle')
if pickl and os.path.exists(resultfile):
result = pickle.load(open(resultfile, 'rb'))
if len(result) == 1:
return result
else:
return result[0], result[1]
if ddi_flat is None:
c = curl.Curl(urls.urls['3did_ddi']['url'], silent = False)
data = c.result
tmpfile = '3did_flat_tmp'
if data is None:
return None
with open(tmpfile, 'w') as f:
f.write(data)
lnum = data.count('\n')
del data
elif os.path.exists(ddi_flat):
tmpfile = ddi_flat
else:
return None
u_pdb, pdb_u = get_pdb_chains()
all_unip = set(uniprot_input.all_uniprots(organism = organism))
if all_unip is None or pdb_u is None:
return None
ddi = []
interfaces = []
pdb = pdb_prev = intf = None
skip = True
non_digit = re.compile(r'[^\d.-]+')
rmap = residues.ResidueMapper()
with open(tmpfile, 'r') as f:
prg = progress.Progress(
lnum, 'Processing 3DID domain-domain interactions', 33)
for l in f:
prg.step()
l = l.split('\t')
if l[0].startswith('#=ID'):
pfam1 = l[3].split('.')[0][2:]
pfam2 = l[4].split('.')[0]
elif l[0].startswith('#=3D'):
pdb_prev = pdb
skip = True
pdb = l[1]
chain1 = l[2][0]
chain2 = l[3][0]
uniprot1 = uniprot2 = None
if pdb != pdb_prev:
rmap.clean()
if pdb in pdb_u:
if chain1 in pdb_u[pdb]:
uniprot1 = pdb_u[pdb][chain1]['uniprot']
if chain2 in pdb_u[pdb]:
uniprot2 = pdb_u[pdb][chain2]['uniprot']
if uniprot1 is not None and uniprot2 is not None and \
uniprot1 in all_unip and uniprot2 in all_unip and \
uniprot1 != uniprot2:
skip = False
if intf is not None:
interfaces.append(intf)
intf = intera.Interface(uniprot1, uniprot2, '3DID', pdb)
u1start = u1end = u2start = u2end = {}
if l[2].count('-') == 1:
start1 = int(non_digit.sub('', l[2][2:].split('-')[0]))
end1 = int(non_digit.sub('', l[2][2:].split('-')[1]))
u1start = rmap.pdb2uniprot(pdb, start1, chains = chain1)
u1end = rmap.pdb2uniprot(pdb, end1, chains = chain1)
if l[3].count('-') == 1:
start2 = int(non_digit.sub('', l[3][2:].split('-')[0]))
end2 = int(non_digit.sub('', l[3][2:].split('-')[1]))
u2start = rmap.pdb2uniprot(pdb, start2, chains = chain2)
u2end = rmap.pdb2uniprot(pdb, end2, chains = chain2)
u1start = None if len (u1start) == 0 else \
u1start[chain1]['resnum']
u1end = None if len (u1end) == 0 else \
u1end[chain1]['resnum']
u2start = None if len (u2start) == 0 else \
u2start[chain2]['resnum']
u2end = None if len (u2end) == 0 else \
u2end[chain2]['resnum']
dom1 = intera.Domain(
uniprot1,
domain = pfam1,
start = u1start,
end = u1end,
isoform = 1)
dom2 = intera.Domain(
uniprot2,
domain = pfam2,
start = u2start,
end = u2end,
isoform = 1)
dd = intera.DomainDomain(dom1, dom2, [pdb], '3DID')
ddi.append(dd)
elif not skip and res and not l[0].startswith('//'):
conv1 = rmap.pdb2uniprot(
pdb, int(non_digit.sub('', l[2])), chains = chain1)
conv2 = rmap.pdb2uniprot(
pdb, int(non_digit.sub('', l[3])), chains = chain2)
if len(conv1) > 0 and len(conv2) > 0:
intf.add_residues(
(conv1[chain1]['resnum'], l[0], uniprot1),
(conv2[chain2]['resnum'], l[1], uniprot2))
interfaces.append(intf)
prg.terminate()
if ddi_flat is None:
os.remove(tmpfile)
if res:
pickle.dump([ddi, interfaces], open(resultfile, 'wb'))
return ddi, interfaces
else:
pickle.dump([ddi], open(resultfile, 'wb'))
return ddi
def get_3did_dmi(dmi_flat = None):
resultfile = os.path.join(settings.get('cachedir'), '3did_dmi.pickle')
if os.path.exists(resultfile):
return pickle.load(open(resultfile, 'rb'))
if dmi_flat is None:
c = curl.Curl(urls.urls['3did_dmi']['url'], silent = False)
data = c.result
tmpfile = '3did_dmi_flat_tmp'
if data is None:
return None
with open(tmpfile, 'w') as f:
f.write(data)
lnum = data.count('\n')
del data
elif os.path.exists(dmi_flat):
tmpfile = dmi_flat
else:
return None
u_pdb, pdb_u = get_pdb_chains()
if pdb_u is None:
return None
dmi = {}
non_digit = re.compile(r'[^\d.-]+')
rmap = residues.ResidueMapper()
with open(tmpfile, 'r') as f:
prg = progress.Progress(lnum,
'Processing 3DID domain-motif interactions', 1)
for l in f:
prg.step()
l = l.strip().split()
if l[0].startswith('#=ID'):
domain = l[3]
if l[0].startswith('#=PT'):
regex = l[1]
if l[0].startswith('#=3D'):
pdb = l[1]
chain1 = l[2].split(':')[0]
chain2 = l[3].split(':')[0]
if l[2].count('-') == 1 and l[3].count('-') == 1:
pdb_region1 = [
int(non_digit.sub('', x))
for x in l[2].split(':')[1].split('-')
]
pdb_region2 = [
int(non_digit.sub('', x))
for x in l[3].split(':')[1].split('-')
]
u1start = rmap.pdb2uniprot(
pdb, pdb_region1[0], chains = chain1)
u1end = rmap.pdb2uniprot(
pdb, pdb_region1[1], chains = chain1)
u2start = rmap.pdb2uniprot(
pdb, pdb_region2[0], chains = chain2)
u2end = rmap.pdb2uniprot(
pdb, pdb_region2[1], chains = chain2)
if len(u1start) != 0 and len(u2start) != 0 and \
len(u1end) != 0 and len(u2end) != 0:
uniprot_key = (u1start[chain1]['uniprot'],
u2start[chain2]['uniprot'])
residue_key = (
u1start[chain1]['resnum'], u1end[chain1]['resnum'],
u2start[chain2]['resnum'], u2end[chain2]['resnum'])
if uniprot_key not in dmi:
dmi[uniprot_key] = {}
if residue_key not in dmi[uniprot_key]:
dmi[uniprot_key][residue_key] = []
dmi[uniprot_key][residue_key].append({
'pdb': pdb,
'regex': regex,
'instance': l[4],
'domain': domain,
'contacts': int(non_digit.sub('', l[5])),
'topology': int(non_digit.sub('', l[6]))
})
prg.terminate()
if dmi_flat is None:
os.remove(tmpfile)
if len(rmap.download_errors) > 0:
sys.stdout.write('Failed to download PDB-UniProt mappings for:\n'
'%s\n' % ', '.join(rmap.download_errors))
pickle.dump(dmi, open(resultfile, 'wb'))
return dmi
def process_3did_dmi():
dmi = get_3did_dmi()
if dmi is None:
return None
dname_pfam, pfam_dname = get_pfam_names()
dname_re = re.compile(r'(.*)(_[A-Z]{3}_)(.*)')
dmi2 = {}
prg = progress.Progress(len(dmi), 'Processing data', 11)
for uniprots, dmis in iteritems(dmi):
prg.step()
if uniprots not in dmi2:
dmi2[uniprots] = []
for regions, dmi_list in iteritems(dmis):
new = True
for dm in dmi_list:
if new:
pfam = None
dname = None
mname = None
name_match = dname_re.match(dm['domain'])
if name_match:
dname = name_match.groups(0)[0]
mname = ''.join(name_match.groups(0)[1:])[1:]
if dname in dname_pfam:
pfam = dname_pfam[dname][0]
domain = pfam if pfam is not None else dname
domain_name = 'pfam' if pfam is not None else 'domain_name'
dom = intera.Domain(
uniprots[0],
domain = domain,
domain_id_type = domain_name,
start = regions[0],
end = regions[1])
mot = intera.Motif(
uniprots[1],
regions[2],
regions[3],
instance = dm['instance'],
regex = dm['regex'],
motif_name = mname)
ptm = intera.Ptm(uniprots[1], motif = mot, source = '3DID')
dommot = intera.DomainMotif(dom, ptm, sources = '3DID')
new = False
dommot.add_pdbs(dm['pdb'])
dmi2[uniprots].append(dommot)
prg.terminate()
return dmi2
[docs]def get_instruct():
"""
Instruct contains residue numbers in UniProt sequences, it means
no further calculations of offsets in chains of PDB structures needed.
Chains are not given, only a set of PDB structures supporting the
domain-domain // protein-protein interaction.
"""
non_digit = re.compile(r'[^\d.-]+')
c = curl.Curl(urls.urls['instruct_human']['url'], silent = False)
data = c.result
if data is None:
return None
data = data.replace('\r', '').split('\n')
del data[0]
instruct = []
for l in data:
l = l.split('\t')
if len(l) > 12:
domain1 = l[6]
domain2 = l[7]
pdb = l[12].split(';')
uniprot1 = l[0]
uniprot2 = l[1]
seq1 = [[non_digit.sub('', n) for n in s.split(',')]
for s in l[10].split(';')]
seq2 = [[non_digit.sub('', n) for n in s.split(',')]
for s in l[11].split(';')]
instruct.append({
uniprot1: {
'pfam': domain1,
'chain': None,
'seq': seq1
},
uniprot2: {
'pfam': domain2,
'chain': None,
'seq': seq2
},
'uniprots': [uniprot1, uniprot2],
'source': 'Instruct',
'pdb': pdb,
'references': l[13].split(';')
})
return instruct
[docs]def get_instruct_offsets():
"""
These offsets should be understood as from UniProt to PDB.
"""
non_digit = re.compile(r'[^\d.-]+')
c = curl.Curl(urls.urls['instruct_offsets']['url'], silent = False)
data = c.result
if data is None:
return None
data = data.replace('\r', '').split('\n')
del data[0]
offsets = {}
for l in data:
l = l.split('\t')
if len(l) > 2:
pdb = l[0].lower()
uniprot = l[1]
try:
offset = int(non_digit.sub('', l[2]))
offsets[(pdb, uniprot)] = offset
except:
sys.stdout.write('Error processing line:\n')
sys.stdout.write(l[2])
sys.stdout.write('\n')
sys.stdout.flush()
return offsets
[docs]def get_i3d():
"""
Interaction3D contains residue numbers in given chains in
given PDB stuctures, so we need to add an offset to get the residue
numbers valid for UniProt sequences. Offsets can be obtained from
Instruct, or from the Pfam PDB-chain-UniProt mapping table.
"""
dname_pfam, pfam_dname = get_pfam_names()
if dname_pfam is None:
sys.stdout.write('\n\t:: Could not get Pfam domain names\n\n')
non_digit = re.compile(r'[^\d.-]+')
c = curl.Curl(urls.urls['i3d_human']['url'], silent = False)
data = c.result
if data is None:
return None
data = data.replace('\r', '').split('\n')
del data[0]
i3d = []
prg = progress.Progress(
len(data), 'Processing domain-domain interactions', 11)
for l in data:
prg.step()
l = l.split('\t')
if len(l) > 20:
domain1 = None if l[13] not in dname_pfam else dname_pfam[l[13]]
domain2 = None if l[20] not in dname_pfam else dname_pfam[l[20]]
pdb = l[5]
uniprot1 = l[0]
uniprot2 = l[1]
chain1 = l[7]
seq1 = [[
int(non_digit.sub('', l[11])), int(non_digit.sub('', l[12]))
]]
chain2 = l[14]
seq2 = [[
int(non_digit.sub('', l[18])), int(non_digit.sub('', l[19]))
]]
i3d.append({
uniprot1: {
'pfam': domain1,
'chain': chain1,
'seq': seq1
},
uniprot2: {
'pfam': domain2,
'chain': chain2,
'seq': seq2
},
'uniprots': [uniprot1, uniprot2],
'source': 'I3D',
'pdb': [pdb],
'references': []
})
prg.terminate()
return i3d
[docs]def get_switches_elm():
"""
switches.elm is a resource containing functional switches in molecular regulation,
in domain-motif level resolution, classified into categories according to their
mechanism.
"""
residue = re.compile(r'(^[A-Z])([0-9]+)')
url = data.formats.urls['switches.elm']['url']
c = curl.Curl(url, silent = False)
data = c.result
if data is None:
return None
buff = StringIO()
buff.write(data)
cols = {
'intramol': 3,
'bindingsite_a': 5,
'bs_a_start': 6,
'bs_a_end': 7,
'uniprot_a': 4,
'uniprot_b': 8,
'bindingsite_b': 9,
'bs_b_start': 10,
'bs_b_end': 11,
'affected': 12,
'type': 13,
'subtype': 14,
'mechanism': 15,
'reversible': 16,
'outcome': 17,
'outcomedir': 18,
'modification': 19,
'modsites': 20,
'modifiers': 21,
'effectors': 22,
'references': 26
}
table = read_table(cols = cols, fileObject = buff, sep2 = subf, hdr = 1)
mod_ont = get_ontology('MOD')
for l in table:
if l['modification'].startswith('MOD'):
if l['modification'] in mod_ont:
l['modification'] = mod_ont[l['modification']]
l['references'] = [
x.replace('PMID:', '').strip() for x in l['references']
]
l['modsites'] = [
(m.group(2), m.group(1))
for m in
[residue.match(s.strip()) for s in l['modsites'].split(';')]
]
l['intramol'] = True if l['intramol'].strip() == 'TRUE' else False
l['bs_a_start'] = [x.split(';') for x in l['bs_a_start'].strip()]
l['bs_b_start'] = [x.split(';') for x in l['bs_b_start'].strip()]
l['bs_a_end'] = [x.split(';') for x in l['bs_a_end'].strip()]
l['bs_b_end'] = [x.split(';') for x in l['bs_b_end'].strip()]
l['bindingsite_a'] = [x.split(';') for x in l['bindingsite_a'].strip()]
l['bindingsite_b'] = [x.split(';') for x in l['bindingsite_b'].strip()]
l['modifiers'] = [
x.split(':') for x in l['modifiers'].strip().split(';')
]
bs_a_ids = {}
bs_b_ids = {}
mod_ids = {}
for bs in l['bindingsite_a'].split(';'):
if ':' in bs:
bs = bs.split(':')
if bs[0].lower() not in bs_a_ids:
bs_a_ids[bs[0].lower()] = []
bs_a_ids[bs[0].lower()].append(bs[1])
for bs in l['bindingsite_b'].split(';'):
if ':' in bs:
bs = bs.split(':')
if bs[0].lower() not in bs_b_ids:
bs_b_ids[bs[0].lower()] = []
bs_b_ids[bs[0].lower()].append(bs[1])
for mod in l['modifiers'].split(';'):
if ':' in mod:
mod = mod.split(':')
if mod[0].lower() not in mod_ids:
mod_ids[mod[0].lower()] = []
mod_ids[mod[0].lower()].append(mod[1])
l['bindingsite_a'] = bs_a_ids
l['bindingsite_b'] = bs_b_ids
l['modifiers'] = mod_ids
return table
[docs]def get_csa(uniprots = None):
"""
Downloads and preprocesses catalytic sites data.
This data tells which residues are involved in the catalytic
activity of one protein.
"""
url = urls.urls['catalytic_sites']['url']
c = curl.Curl(url, silent = False)
data = c.result
if data is None:
return None
u_pdb, pdb_u = get_pdb_chains()
buff = StringIO()
buff.write(data)
cols = {
'pdb': 0,
'id': 1,
'resname': 2,
'chain': 3,
'resnum': 4,
'chem_fun': 5,
'evidence': 6,
}
table = read_table(cols = cols, fileObject = buff, sep = ',', hdr = 1)
css = {}
prg = progress.Progress(len(table), 'Processing catalytic sites', 11)
for l in table:
if l['pdb'] in pdb_u:
if l['chain'] in pdb_u[l['pdb']]:
uniprot = pdb_u[l['pdb']][l['chain']]['uniprot']
if uniprots is None or uniprot in uniprots:
offset = pdb_u[l['pdb']][l['chain']]['offset']
if offset is not None:
l['resnum'] = int(l['resnum']) + offset
else:
this_res = residue_pdb(l['pdb'], l['chain'],
l['resnum'])
if len(this_res) > 0:
l['resnum'] = int(this_res['UPCOUNT'])
else:
l['resnum'] = None
if l['resnum'] is not None:
if uniprot not in css:
css[uniprot] = {}
if l['pdb'] not in css[uniprot]:
css[uniprot][l['pdb']] = {}
if l['id'] not in css[uniprot][l['pdb']]:
css[uniprot][l['pdb']][l['id']] = []
css[uniprot][l['pdb']][l['id']].append(
intera.Residue(l['resname'], l['resnum'], uniprot))
prg.step()
prg.terminate()
return css
[docs]def get_ontology(ontology):
"""
Downloads an ontology using the bioservices module.
"""
ols = bioservices.WSDLService("OLS", urls.urls['ols']['url'])
ont = dict((x.key, x.value)
for x in ols.serv.getAllTermsFromOntology(ontology).item)
return ont
[docs]def get_listof_ontologies():
"""
Returns a list of available ontologies using the bioservices module.
"""
ols = bioservices.WSDLService("OLS", urls.urls['ols']['url'])
olist = dict((x.key, x.value) for x in ols.serv.getOntologyNames().item)
return olist
def residue_pdb(pdb, chain, residue):
url = urls.urls['pdbsws']['url']
params = urllib.urlencode({
'plain': 1,
'qtype': 'pdb',
'id': pdb,
'chain': chain,
'res': residue
})
data = urllib2.urlopen(url + "?%s" % params)
result = {}
for l in data:
if not l.startswith('//'):
l = [x.strip() for x in l.split(':')]
result[l[0]] = l[1]
return result
[docs]class ResidueMapper(object):
"""
This class stores and serves the PDB --> UniProt
residue level mapping. Attempts to download the
mapping, and stores it for further use. Converts
PDB residue numbers to the corresponding UniProt ones.
"""
def __init__(self):
self.clean()
def load_mapping(self, pdb):
non_digit = re.compile(r'[^\d.-]+')
pdb = pdb.lower()
url = urls.urls['pdb_align']['url'] + pdb
data = urllib2.urlopen(url)
mapper = {}
soup = bs4.BeautifulSoup(data.read(), 'html.parser')
for block in soup.find_all('block'):
seg = block.find_all('segment')
chain = seg[0]['intobjectid'].split('.')[1]
uniprot = seg[1]['intobjectid']
pdbstart = int(non_digit.sub('', seg[0]['start']))
pdbend = int(non_digit.sub('', seg[0]['end']))
uniprotstart = int(non_digit.sub('', seg[1]['start']))
uniprotend = int(non_digit.sub('', seg[1]['end']))
if chain not in mapper:
mapper[chain] = {}
mapper[chain][pdbend] = {
'uniprot': uniprot,
'pdbstart': pdbstart,
'uniprotstart': uniprotstart,
'uniprotend': uniprotend
}
self.mappers[pdb] = mapper
def get_residue(self, pdb, resnum, chain = None):
pdb = pdb.lower()
if pdb not in self.mappers:
self.load_mapping(pdb)
if pdb in self.mappers:
for chain, data in iteritems(self.mappers[pdb]):
pdbends = data.keys()
if resnum <= max(pdbends):
pdbend = min(
[x for x in [e - resnum for e in pdbends]
if x >= 0]) + resnum
seg = data[pdbend]
if seg['pdbstart'] <= resnum:
offset = seg['uniprotstart'] - seg['pdbstart']
residue = {
'resnum': resnum + offset,
'offset': offset,
'uniprot': seg['uniprot'],
'chain': chain
}
return residue
return None
[docs] def clean(self):
"""
Removes cached mappings, freeing up memory.
"""
self.mappers = {}
[docs]def comppi_interaction_locations(organism = 9606):
"""
Downloads and preprocesses protein interaction and cellular compartment
association data from the ComPPI database.
This data provides scores for occurrence of protein-protein interactions
in various compartments.
"""
ComppiLocation = collections.namedtuple(
'ComppiLocation',
[
'location',
'score',
],
)
ComppiInteraction = collections.namedtuple(
'ComppiInteraction',
[
'id_a',
'id_b',
'loc_a',
'loc_b',
],
)
def process_locations(loc):
return tuple(
ComppiLocation(location = llloc[0], score = float(llloc[1]))
for llloc in
(lloc.split(':') for lloc in loc.split('|'))
)
url = urls.urls['comppi']['url']
post = {
'fDlSet': 'comp',
'fDlSpec': '0',
'fDlMLoc': 'all',
'fDlSubmit': 'Download'
}
c = curl.Curl(
url,
post = post,
large = True,
silent = False,
compr = 'gz',
)
_ = next(c.result)
for l in c.result:
l = l.strip('\r\n').split('\t')
organism_a = int(l[7])
organism_b = int(l[15])
if organism and (organism_a != organism or organism_b != organism):
continue
for uniprot1, uniprot2 in itertools.product(
mapping.map_name(l[0], 'uniprot', 'uniprot'),
mapping.map_name(l[8], 'uniprot', 'uniprot'),
):
yield ComppiInteraction(
id_a = uniprot1,
id_b = uniprot2,
loc_a = process_locations(l[2]),
loc_b = process_locations(l[10]),
)
def comppi_locations(organism = 9606, score_threshold = .7):
result = collections.defaultdict(set)
for iloc in comppi_interaction_locations(organism = organism):
for label in ('a', 'b'):
for loc in getattr(iloc, 'loc_%s' % label):
if loc.location == 'N/A' or loc.score < score_threshold:
continue
result[getattr(iloc, 'id_%s' % label)].add(loc)
return dict(result)
def ramilowski_locations(long_notes = False):
reloc = re.compile(
r'([^\(]+[^\s^\(])'
r'\s?\('
r'?(?:(.*[^\)])?)'
r'\)?'
)
resep = re.compile(r'[\.;,]')
renote = re.compile(r'Note=([- \w\(\),\s\+\./%\'":;]*)')
sources = (
(4, 'UniProt'),
(5, 'HPRD'),
(7, 'LocTree3'),
(10, 'Consensus'),
(11, 'Consensus6'),
)
RamilowskiLocation = collections.namedtuple(
'RamilowskiLocation',
[
'location',
'source',
'tmh',
'note',
'long_note',
],
)
url = urls.urls['rami']['loc']
c = curl.Curl(url, silent = False, large = True)
_ = next(c.result)
result = collections.defaultdict(set)
for l in c.result:
l = l.strip('\n\r').split('\t')
for idx, source in sources:
locs = l[idx]
long_note = None
mnote = renote.search(locs)
if mnote:
long_note = mnote.groups()[0]
locs = renote.sub('', locs)
for loc in resep.split(locs):
if ':' in loc and 'GO:' not in loc:
loc = loc.split(':')[-1]
loc = loc.strip().replace('- ', '-').lower()
if (
not loc or
len(loc.split()) > 3 or
re.search(r'\d', loc) or
loc == 'n/a' or
any(
w in loc for w in
('tumor',)
)
):
continue
m = reloc.match(loc)
if not m:
continue
location, note = m.groups()
tmh = l[9].strip()
uniprots = mapping.map_name(l[3], 'uniprot', 'uniprot')
for uniprot in uniprots:
result[uniprot].add(
RamilowskiLocation(
location = (
location.lower().replace('=', '').strip()
),
source = source,
tmh = int(tmh) if tmh.isdigit() else None,
note = note,
long_note = long_note if long_notes else None,
)
)
return result
[docs]def get_psite_phos(raw = True, organism = 'human', strict = True):
"""
Downloads and preprocesses phosphorylation site data from PhosphoSitePlus.
"""
url = urls.urls['psite_kin']['url']
c = curl.Curl(
url,
silent = False,
compr = 'gz',
encoding = 'iso-8859-1',
large = True,
)
orto = {}
data = c.result
cols = {
'kinase': 2,
'kinase_org': 3,
'substrate': 6,
'substrate_org': 8,
'residue': 9,
'motif': 11
}
data = read_table(cols = cols, fileObject = data, sep = '\t', hdr = 4)
result = []
non_digit = re.compile(r'[^\d.-]+')
motre = re.compile(r'(_*)([A-Za-z]+)(_*)')
for r in data:
if organism is None or \
((r['kinase_org'] == organism or not strict) and \
r['substrate_org'] == organism):
if r['kinase_org'] != organism:
korg = r['kinase_org']
# attempting to map by orthology:
if korg in taxonomy.taxa and organism in taxonomy.taxa:
ktaxid = taxonomy.taxa[korg]
taxid = taxonomy.taxa[organism]
if korg not in orto:
orto[korg] = homologene_dict(ktaxid, taxid, 'refseqp')
korg_refseq = mapping.map_name(r['kinase'],
'uniprot',
'refseqp',
ktaxid)
kin_uniprot = \
list(
itertools.chain(
*map(
lambda ors:
mapping.map_name(ors,
'refseqp',
'uniprot',
taxid),
itertools.chain(
*map(
lambda rs:
orto[korg][rs],
filter(
lambda rs:
rs in orto[korg],
korg_refseq
)
)
)
)
)
)
else:
kin_uniprot = [r['kinase']]
for kinase in kin_uniprot:
r['resaa'] = r['residue'][0]
r['resnum'] = int(non_digit.sub('', r['residue'][1:]))
mot = motre.match(r['motif'])
r['substrate'] = r['substrate'].split('_')[0] # excluding e.g. Q12809_VAR_014388
sisoform = 1 if '-' not in r['substrate'] else \
int(r['substrate'].split('-')[1])
r['substrate'] = r['substrate'].split('-')[0]
kisoform = 1 if '-' not in kinase else int(kinase.split('-')[1])
kinase = kinase.split('-')[0]
r['substrate'] = r['substrate'].split('-')[0]
if mot:
r['start'] = r['resnum'] - 7 + len(mot.groups()[0])
r['end'] = r['resnum'] + 7 - len(mot.groups()[2])
r['instance'] = r['motif'].replace('_', '').upper()
else:
r['start'] = None
r['end'] = None
r['instance'] = None
if raw:
r['kinase'] = kinase
result.append(r)
else:
res = intera.Residue(r['resnum'], r['resaa'],
r['substrate'],
isoform = sisoform)
mot = intera.Motif(
r['substrate'],
r['start'],
r['end'],
instance = r['instance'],
isoform = sisoform)
ptm = intera.Ptm(protein = r['substrate'],
residue = res,
motif = mot,
typ = 'phosphorylation',
source = 'PhosphoSite',
isoform = sisoform)
dom = intera.Domain(protein = kinase, isoform = kisoform)
dommot = intera.DomainMotif(
domain = dom, ptm = ptm, sources = ['PhosphoSite'])
result.append(dommot)
return result
[docs]def ptm_orthology():
"""
Returns an orthology translation dict of phosphosites
based on phosphorylation sites table from PhosphoSitePlus.
In the result all PTMs represented by a tuple of the following
6 elements: UniProt ID, isoform (int), residue one letter code,
residue number (int), NCBI Taxonomy ID (int), modification type.
:param int source: Source taxon (NCBI Taxonomy).
:param int target: Target taxon (NCBI Taxonomy).
"""
result = {}
nondigit = re.compile(r'[^\d]+')
unknown_taxa = set([])
for typ in common.psite_mod_types:
groups = {}
url = urls.urls['psite_%s' % typ[0]]['url']
c = curl.Curl(url, silent = False, large = True)
data = c.result
for _ in xrange(4):
__ = next(data)
for r in data:
r = r.decode('utf-8').split('\t')
if len(r) < 10:
continue
uniprot = r[2]
isoform = 1 if '-' not in uniprot else int(uniprot.split('-')[1])
uniprot = uniprot.split('-')[0]
aa = r[4][0]
num = int(nondigit.sub('', r[4]))
if r[6] not in taxonomy.taxa:
unknown_taxa.add(r[6])
continue
tax = taxonomy.taxa[r[6]]
group = int(r[5])
this_site = (uniprot, isoform, aa, num, tax, typ[1])
if group not in groups:
groups[group] = set([])
groups[group].add(this_site)
for group, sites in iteritems(groups):
for site1 in sites:
for site2 in sites:
if site1[4] == site2[4]:
continue
if site1 not in result:
result[site1] = {}
if site2[4] not in result[site1]:
result[site1][site2[4]] = set([])
result[site1][site2[4]].add(site2)
if len(unknown_taxa):
sys.stdout.write('\t:: Unknown taxa encountered:\n\t %s\n' %
', '.join(sorted(unknown_taxa)))
return result
[docs]def get_psite_p(organism = 'human'):
"""
Downloads the phosphorylation site dataset from PhosphoSitePlus.
"""
result = []
url = urls.urls['psite_p']['url']
nondigit = re.compile(r'[^\d]+')
remot = re.compile(r'(_*)([A-Za-z]+)(_*)')
c = curl.Curl(url, silent = False, large = True)
data = c.result
for _ in xrange(4):
null = c.result.readline()
for r in data:
r = r.split('\t')
if len(r) > 9 and (organism is None or r[6] == organism):
uniprot = r[2]
isoform = 1 if '-' not in uniprot else int(uniprot.split('-')[1])
uniprot = uniprot.split('-')[0]
typ = r[3].lower()
if len(typ) == 0:
typ = r[4].split('-')[1] if '-' in r[4] else None
aa = r[4][0]
num = int(nondigit.sub('', r[4]))
motif = remot.match(r[9])
if motif:
start = num - 7 + len(motif.groups()[0])
end = num + 7 - len(motif.groups()[2])
instance = r[9].replace('_', '').upper()
else:
start = None
end = None
instance = None
res = intera.Residue(num, aa, uniprot, isoform = isoform)
mot = intera.Motif(
uniprot, start, end, instance = instance, isoform = isoform)
ptm = intera.Ptm(uniprot,
typ = typ,
motif = mot,
residue = res,
source = 'PhosphoSite',
isoform = isoform)
result.append(ptm)
return result
[docs]def get_psite_reg():
"""
Downloads and preprocesses the regulatory sites dataset from
PhosphoSitePlus. This data provides information about which
proteins a PTM disrupts or induces the interaction with.
"""
kwds_pos = {
'enzymatic activity, induced',
'activity, induced',
'protein stabilization',
'receptor inactivation, inhibited',
'receptor desensitization, inhibited',
'receptor internalization, inhibited',
'receptor recycling, induced'
}
kwds_neg = {
'enzymatic activity, inhibited',
'activity, inhibited',
'protein degradation',
'receptor inactivation, induced',
'receptor desensitization, induced',
'receptor internalization, induced',
'receptor recycling, inhibited'
}
url = urls.urls['psite_reg']['url']
c = curl.Curl(url, silent = False, compr = 'gz',
encoding = 'iso-8859-1', large = True)
data = c.result
cols = {
'uniprot': 3,
'organism': 6,
'mod': 7,
'on_function': 11,
'on_process': 12,
'on_interact': 13,
'pmids': 15,
'comments': 19
}
data = read_table(cols = cols, fileObject = data, sep = '\t', hdr = 4)
regsites = {}
for r in data:
interact = [[y.replace(')', '').strip() for y in x.split('(')]
for x in r['on_interact'].strip().split(';') if len(x) > 0]
induces = [x[0] for x in interact if x[1] == 'INDUCES']
disrupts = [x[0] for x in interact if x[1] == 'DISRUPTS']
mod = r['mod']
modt = r['mod'].split('-')
mod = list(modt[0])
aa = mod.pop(0)
modt = modt[1]
res = ''.join(mod)
isoform = (
int(r['uniprot'].split('-')[1])
if '-' in r['uniprot']
else 1
)
uniprot = r['uniprot'].split('-')[0]
if uniprot not in regsites:
regsites[uniprot] = []
function = set(map(lambda f: f.strip(),
r['on_function'].split(';')))
regsites[uniprot].append({
'aa': aa,
'res': res,
'modt': modt,
'organism': r['organism'],
'pmids': set(map(lambda f: f.strip(),
r['pmids'].split(';'))),
'induces': induces,
'disrupts': disrupts,
'isoform': isoform,
'function': function,
'process': set(map(lambda f: f.strip(),
r['on_process'].split(';'))),
'comments': r['comments'],
'positive': bool(kwds_pos & function),
'negative': bool(kwds_neg & function)
})
return regsites
[docs]def regsites_one_organism(organism = 9606):
"""
Returns PhosphoSitePlus regulatory sites translated to
one organism by orthology. Residue numbers will be translated
where necessary, while gene symbols will be translated to
UniProt IDs of the given organism.
This works with human, mouse or rat.
:param int organism:
NCBI Taxonomy ID of the target organism. In this
method possible values are human, mouse or rat, as these species
provide the vast majority of the data, and are close enough to each
other that the sites can be safely translated between orthologous
proteins by sequence alignement.
"""
def genesymbols2uniprots(genesymbols, tax):
return (
set(
itertools.chain(
*map(
lambda gs:
mapping.map_name(
gs,
'genesymbol',
'uniprot',
ncbi_tax_id = tax,
),
genesymbols
)
)
)
)
def translate_uniprots(uniprots, homo):
return (
set(
itertools.chain(
*map(
lambda usrc:
homo[usrc] if usrc in homo else [],
uniprots
)
)
)
)
result = {}
organisms = set([9606, 10090, 10116])
mod_types = dict(common.psite_mod_types2)
regsites = get_psite_reg()
other_organisms = organisms - set([organism])
homology = (
dict(
map(
lambda other:
(
other,
homologene_uniprot_dict(
source = other,
target = organism
)
),
other_organisms
)
)
)
ptm_homology = ptm_orthology()
proteome = uniprot_input.all_uniprots(organism = organism, swissprot = 'YES')
for substrate, regs in iteritems(regsites):
subs = []
if substrate in proteome:
subs = [substrate]
else:
for other, homo in iteritems(homology):
if substrate in homo:
subs = homo[substrate]
for sub in subs:
if sub not in result:
result[sub] = {}
for reg in regs:
reg_organism = taxonomy.taxa[reg['organism']]
if reg_organism not in organisms:
continue
mod_type = mod_types[reg['modt']]
resnum = int(reg['res'])
psite_key = (substrate, reg['isoform'], reg['aa'], resnum, reg_organism, mod_type)
if reg_organism != organism:
regs_target = []
disrupts = []
induces = []
if psite_key in ptm_homology:
if organism in ptm_homology[psite_key]:
regs_target = ptm_homology[psite_key][organism]
if len(regs_target):
disrupts = genesymbols2uniprots(reg['disrupts'], reg_organism)
disrupts = translate_uniprots(disrupts, homology[reg_organism])
induces = genesymbols2uniprots(reg['induces'], reg_organism)
induces = translate_uniprots(induces, homology[reg_organism])
else:
regs_target = [psite_key]
disrupts = genesymbols2uniprots(reg['disrupts'], organism)
induces = genesymbols2uniprots(reg['induces'], organism)
for regt in regs_target:
modkey = (regt[2], regt[3], regt[5])
if modkey not in result[sub]:
result[sub][modkey] = {
'induces': set([]),
'disrupts': set([]),
'pmids': set([]),
'isoforms': set([]),
'process': set([]),
'function': set([]),
'positive': False,
'negative': False,
'comments': []
}
result[sub][modkey]['induces'].update(induces)
result[sub][modkey]['disrupts'].update(disrupts)
result[sub][modkey]['process'].update(reg['process'])
result[sub][modkey]['function'].update(reg['function'])
result[sub][modkey]['isoforms'].update([regt[1]])
result[sub][modkey]['pmids'].update(reg['pmids'])
result[sub][modkey]['positive'] = \
result[sub][modkey]['positive'] or reg['positive']
result[sub][modkey]['negative'] = \
result[sub][modkey]['negative'] or reg['negative']
if len(reg['comments']):
result[sub][modkey]['comments'].append(reg['comments'])
return result
[docs]def regsites_tab(regsites, outfile = None):
"""
Exports PhosphoSite regulatory sites as a tabular file, all
IDs translated to UniProt.
"""
header = [
'uniprot_a', 'isoform_a', 'a_res_aa', 'a_res_num', 'a_mod_type',
'effect', 'uniprot_b', 'references'
]
result = []
for uniprot, regsite in iteritems(regsites):
isoform = '1'
uniprot = uniprot.split('-')
if len(uniprot) > 1:
isoform = uniprot[1]
uniprot = uniprot[0]
for r in regsite:
if r['organism'] == 'human':
for i in r['induces']:
other = mapping.map_name(i, 'genesymbol', 'uniprot')
for o in other:
if o != 'unmapped':
result.append([
uniprot, isoform, r['aa'], r['res'], r['modt'],
'+', o
])
for i in r['disrupts']:
other = mapping.map_name(i, 'genesymbol', 'uniprot')
for o in other:
if o != 'unmapped':
result.append([
uniprot, isoform, r['aa'], r['res'], r['modt'],
'-', o, ';'.join(r['pmids'])
])
if outfile is not None:
out = '\t'.join(header) + '\n'
for r in result:
out += '\t'.join(r) + '\n'
with open(outfile, 'w') as f:
f.write(out)
return result
[docs]def get_ielm_huge(ppi,
id_type = 'UniProtKB_AC',
mydomains = 'HMMS',
maxwait = 180,
cache = True,
part_size = 500,
headers = None):
"""
Loads iELM predicted domain-motif interaction data for a set of
protein-protein interactions. This method breaks the list into
reasonable sized chunks and performs multiple requests to iELM,
and also retries in case of failure, with reducing the request
size. Provides feedback on the console.
:param str id_type:
The type of the IDs in the supplied interaction list.
Default is 'UniProtKB_AC'.
Please refer to iELM what type of IDs it does understand.
:param str mydomains:
The type of the domain detection method.
Defaults to 'HMMS'.
Please refer to iELM for alternatives.
:param int maxwait:
The limit of the waiting time in seconds.
:param bool cache:
Whether to use the cache or download everything again.
:param int part_size:
The number of interactions to be queried in one request.
:param list headers:
Additional HTTP headers to send to iELM with each request.
"""
ranges = range(0, len(ppi), part_size)
result = []
done = False
while not done:
for r in ranges:
this_ppi = ppi[r:r + part_size]
sys.stdout.write('\t:: Part %u/%u: querying %u interactions.\n' %
(ranges.index(r) + 1, len(ranges), len(this_ppi)))
sys.stdout.flush()
this_res = get_ielm(
this_ppi,
id_type,
mydomains,
maxwait,
cache,
part = True,
headers = headers)
if this_res:
if type(this_res) is dict:
return this_res
result += this_res
if r == ranges[-1]:
done = True
else:
part_size = max(int(part_size * 0.8), 20)
ranges = range(r, len(ppi[r:]), part_size)
sys.stdout.write(
'\t:: One query failed. Setting part size to %u\n' %
part_size)
sys.stdout.flush()
break
return result
[docs]def get_ielm(ppi,
id_type = 'UniProtKB_AC',
mydomains = 'HMMS',
maxwait = 180,
cache = True,
part = False,
part_size = 500,
headers = None):
"""
Performs one query to iELM. Parameters are the same as at get_ielm_huge().
"""
url = urls.urls['proteomic_ielm']['url']
network = ''
from_pickle = []
ppi_pickle = []
ppi_query = []
result = []
pcache = os.path.join(settings.get('cachedir'), 'ielm.pickle')
if not part and os.path.exists(pcache):
from_pickle = pickle.load(open(pcache, 'rb'))
ppi_pickle = from_pickle['ppi']
ppi_query = list(set(ppi) - set(ppi_pickle))
result = from_pickle['ielm']
if len(ppi_query) == 0:
return result
else:
ppi_query = ppi
if len(ppi_query) > part_size and not part:
this_result = get_ielm_huge(ppi_query, id_type, mydomains, maxwait,
cache, part_size, headers)
for pp in ppi_query:
network += '%s %s\r\n' % (pp[0], pp[1])
post = {'network': network, 'databases': id_type, 'mydomains': mydomains}
net_md5 = common.md5(network)
cachefile = os.path.join(settings.get('cachedir'), net_md5 + '.ielm')
if os.path.exists(cachefile) and cache:
with open(cachefile, 'r') as f:
data = f.read()
soup = bs4.BeautifulSoup(data, 'html.parser')
src = 'cache'
else:
c = curl.Curl(
url, post = post, silent = False, cache = False, req_headers = headers)
data = c.result
soup = bs4.BeautifulSoup(data, 'html.parser')
sessid = soup.find('input', {'name': 'session_ID'})['value']
src = 'iELM'
if data is None:
sys.stdout.write(ERASE_LINE + CURSOR_UP_ONE)
sys.stdout.write(
'\t:: Initial query failed. No data retrieved from iELM.\n')
sys.stdout.flush()
return None
wait = 0
while soup.title.text == 'iELM Wait Page' and wait < maxwait:
# and \
# len([i for i in soup.find_all('font', {'color': '#FF0000'}) if i.text == \
#'File no longer available']) == 0:
sys.stdout.write(ERASE_LINE + CURSOR_UP_ONE)
sys.stdout.write('\t:: Waiting for result. Wait time: %u sec. '
'Max waiting time: %u sec.\n' % (wait, maxwait))
sys.stdout.flush()
post = {
'session_ID': sessid,
'database': id_type,
'number': '',
'domains': mydomains
}
c = curl.Curl(
'http://i.elm.eu.org/wait_2/',
post = post,
cache = False,
req_headers = headers)
data = c.result
if data is not None:
soup = bs4.BeautifulSoup(data, 'html.parser')
time.sleep(3)
wait += 3
if len(soup.find_all('table')) == 0:
sys.stdout.write(ERASE_LINE + CURSOR_UP_ONE)
sys.stdout.write('\t:: No data retrieved from iELM. \n')
sys.stdout.flush()
soup.title.string = 'http://i.elm.eu.org/proteomic_results/%s' % sessid
# return {'soup': soup, 'post': urllib.urlencode(post), 'netw':
# network}
return None
if cache:
with open(cachefile, 'w') as f:
f.write(data)
sys.stdout.write(ERASE_LINE + CURSOR_UP_ONE)
sys.stdout.write('\t:: Data retrieved from %s in %u seconds.\n' %
(src, wait))
sys.stdout.flush()
tbl = soup.find('table', {'id': 'example1'})
this_result = []
if tbl:
url = urls.urls['elm_depr']['url']
depr_c = curl.Curl(url)
depr_list = depr_c.result
depr_list = depr_list.replace('"', '').split('\n')[5:]
depr = [tuple(x.split('\t')) for x in depr_list if len(x) > 0]
try:
depr = dict(depr + [tuple([x[0].lower(), x[1]]) for x in depr])
except:
print('\n\n\n', depr, '\n\n\n\n')
# redepr = re.compile(r'\b(' + '|'.join(depr.keys()) + r')\b') :(
rows = tbl.find_all('tr')
prg = progress.Progress(
len(rows), 'Processing data (%u rows)' % (len(rows) - 1), 3)
for tr in tbl.find_all('tr'):
thisRow = [td.text.strip() for td in tr.find_all('td')]
if len(thisRow) > 15 and not thisRow[0].startswith('Motif'):
# replacing deprecated ELM names:
if thisRow[2].lower() in depr:
thisRow[2] = depr[thisRow[2].lower()]
if thisRow[2].lower() in depr:
thisRow[2] = depr[thisRow[2].lower()]
# thisRow[2] = redepr.sub(lambda x: depr[x.group()],
# thisRow[2]) :(
this_result.append(thisRow)
prg.step()
prg.terminate()
if not part:
result = {
'ppi': list(set(ppi_pickle + ppi_query)),
'ielm': result + this_result
}
pickle.dump(result, open(pcache, 'wb'))
return this_result
[docs]def get_pepcyber(cache = None):
"""
Downloads phosphoprotein binding protein interactions
from the PEPCyber database.
"""
def get_cells(row):
cells = row.find_all('td')
if len(cells) == 10:
sp = cells[4].find('span')
if sp is not None and 'class' in sp.attrs \
and 'sequence' in sp.attrs['class']:
return cells
url = urls.urls['pepcyber']['url']
# this is huge, takes a few minutes!
c = curl.Curl(url, silent = False, timeout = 600, encoding = 'iso-8859-1')
data = c.result
soup = bs4.BeautifulSoup(data, 'html.parser')
rows = soup.find_all('tr')
result = []
uniprots = {}
if cache is None:
cache = os.path.join(settings.get('cachedir'), 'pepcyber-uniprots')
if os.path.exists(cache):
with open(cache, 'r') as f:
for l in f:
l = l.split('\t')
if l[0] == u'\xce':
continue
l.extend(['', ''])
uniprots[l[0].strip()] = [l[1].strip(), l[2].strip()]
prg = progress.Progress(len(rows), 'Retrieving and processing data', 7)
notfound = []
for row in rows:
prg.step()
cells = get_cells(row)
if cells is None:
continue
thisRow = [c.text.strip() for c in cells]
if len(thisRow) > 9 and thisRow[5].isdigit():
inum = int(row.find('a')['name'])
thisRow[9] = None if 'p' not in thisRow[4] else \
thisRow[4][thisRow[4].index('p') + 1]
if thisRow[2] not in uniprots or thisRow[3] not in uniprots:
up = pepcyber_uniprot(inum)
uniprots.update(up)
if thisRow[2] in uniprots and thisRow[3] in uniprots:
thisRow.extend(uniprots[thisRow[2]])
thisRow.extend(uniprots[thisRow[3]])
result.append(thisRow[1:])
else:
notfound.append([thisRow[2], thisRow[3], inum])
prg.terminate()
with open(cache, 'w') as f:
for g, u in iteritems(uniprots):
if g[0] == u'\xce':
continue
f.write('\t'.join([g] + u) + '\n')
return result
def pepcyber_uniprot(num):
result = {}
url = urls.urls['pepcyber_details']['url'] % num
c = curl.Curl(url, cache = False, encoding = 'iso-8859-1')
data = c.result
if data is None:
return result
soup = bs4.BeautifulSoup(data, 'html.parser')
gname = None
prev = ''
for td in soup.find_all('td'):
if prev.startswith('Gene name'):
gname = td.text.strip().split('(')[0]
if prev.startswith('RefSeq'):
refseq = td.text.strip()
if prev.startswith('SwissProt') and gname is not None:
swprot = td.text.strip()
if len(gname) > 0 and gname[0] != u'\xce':
result[gname] = [swprot, refseq]
gname = None
prev = td.text.strip()
return result
[docs]def pdzbase_interactions():
"""
Downloads data from PDZbase. Parses data from the HTML tables.
"""
PDZbaseInteraction = collections.namedtuple(
'PDZbaseInteraction',
[
'uniprot_pdz',
'isoform_pdz',
'uniprot_ligand',
'isoform_ligand',
'genesymbol_pdz',
'genesymbol_ligand',
'pdz_domain',
'organism',
'pubmed',
],
)
url = urls.urls['pdzbase']['url_rescued']
c = curl.Curl(url, silent = False)
data = c.result
soup = bs4.BeautifulSoup(data, 'html.parser')
rows = (
soup.find_all('table')[3].find('table').find('table').find_all('tr')
)
result = []
del rows[0]
for r in rows:
r = [c.text.strip() for c in r.find_all('td')]
uniprot_pdz, isoform_pdz = reupi.match(r[1]).groups()
uniprot_ligand, isoform_ligand = reupi.match(r[4]).groups()
result.append(
PDZbaseInteraction(
uniprot_pdz = uniprot_pdz,
isoform_pdz = int(isoform_pdz) if isoform_pdz else 1,
uniprot_ligand = uniprot_ligand,
isoform_ligand = int(isoform_ligand) if isoform_ligand else 1,
genesymbol_pdz = r[0],
genesymbol_ligand = r[3],
pdz_domain = int(r[2]),
organism = taxonomy.ensure_ncbi_tax_id(r[5]),
pubmed = int(r[6]),
)
)
return result
def get_domino(none_values = False, outfile = None):
result = []
taxid = re.compile(r'taxid:(.*)\([a-zA-Z ]*\)')
miont = re.compile(r'MI:[0-9]{4}\((.*)\)')
binds = re.compile(r'([-0-9]*);.*')
domai = re.compile(r'.*;.*;.*\((.*)\)')
dipro = re.compile(r'.*;.*;.+:(IPR[0-9]*).*')
ptmrs = re.compile(r'([-0-9]*);.*')
ptmmi = re.compile(r'[0-9]*;(MI:[0-9]*)\(.*\);.*;.*')
ptmrn = re.compile(
r'.*sequence:[\s]*[0-9]+-[0-9]+[\s]*:[\s]*([A-Z]{10,}).*')
ptmty = re.compile(r'[0-9]*;MI:[0-9]*\((.*)\);.*;.*')
refrs = re.compile(r'(pubmed|doi):["]*([-0-9a-zA-Z\.\(\)/]*)["]*')
url = urls.urls['domino']['rescued']
c = curl.Curl(url, silent = False, large = True)
data = c.result
_ = next(data)
header = [
'uniprot-A', 'uniprot-B', 'isoform-A', 'isoform-B', 'exp. method',
'references', 'taxon-A', 'taxon-B', 'role-A', 'role-B',
'binding-site-range-A', 'binding-site-range-B', 'domains-A',
'domains-B', 'ptm-residue-A', 'ptm-residue-B', 'ptm-type-mi-A',
'ptm-type-mi-B', 'ptm-type-A', 'ptm-type-B', 'ptm-res-name-A',
'ptm-res-name-B', 'mutations-A', 'mutations-B', 'mutation-effects-A',
'mutation-effects-B', 'domains-interpro-A', 'domains-interpro-B',
'negative'
]
for r in data:
r = r.strip().split('\t')
if len(r) < 39:
continue
thisRow = [
None if ':' not in r[0] else r[0].split(':')[1].split('-')[0], None
if ':' not in r[1] else r[1].split(':')[1].split('-')[0], '1'
if '-' not in r[0] else r[0].split('-')[1], '1'
if '-' not in r[1] else r[1].split('-')[1], None if
miont.match(r[6]) is None else miont.match(r[6]).groups(1)[0], None
if refrs.match(r[8]) is None else refrs.match(r[8]).groups(1)[1],
None if taxid.match(r[9]) is None else
taxid.match(r[9]).groups(1)[0], None if taxid.match(r[10]) is None
else taxid.match(r[10]).groups(1)[0], None
if miont.match(r[11]) is None else miont.match(r[11]).groups(1)[0],
None if miont.match(r[16]) is None else
miont.match(r[17]).groups(1)[0], ';'.join([
'' if binds.match(x) is None else binds.match(x).groups(1)[0]
for x in r[32].split(',')
]), ';'.join([
'' if binds.match(x) is None else binds.match(x).groups(1)[0]
for x in r[33].split(',')
]), ';'.join([
'' if domai.match(x) is None else domai.match(x).groups(1)[0]
for x in r[32].split(',')
]), ';'.join([
'' if domai.match(x) is None else domai.match(x).groups(1)[0]
for x in r[33].split(',')
]), ';'.join([
'' if ptmrs.match(x) is None else ptmrs.match(x).groups(1)[0]
for x in r[34].split('|')
]), ';'.join([
'' if ptmrs.match(x) is None else ptmrs.match(x).groups(1)[0]
for x in r[35].split('|')
]), ';'.join([
'' if ptmmi.match(x) is None else ptmmi.match(x).groups(1)[0]
for x in r[34].split('|')
]), ';'.join([
'' if ptmmi.match(x) is None else ptmmi.match(x).groups(1)[0]
for x in r[35].split('|')
]), ';'.join([
'' if ptmty.match(x) is None else ptmty.match(x).groups(1)[0]
for x in r[34].split('|')
]), ';'.join([
'' if ptmty.match(x) is None else ptmty.match(x).groups(1)[0]
for x in r[35].split('|')
]), ';'.join([
'' if ptmrn.match(x) is None else ptmrn.match(x).groups(1)[0]
for x in r[34].split('|')
]), ';'.join([
'' if ptmrn.match(x) is None else ptmrn.match(x).groups(1)[0]
for x in r[35].split('|')
]), ';'.join([
'' if ptmrs.match(x) is None else ptmrs.match(x).groups(1)[0]
for x in r[36].split('|')
]), ';'.join([
'' if ptmrs.match(x) is None else ptmrs.match(x).groups(1)[0]
for x in r[37].split('|')
]), ';'.join([
'' if ptmty.match(x) is None else ptmty.match(x).groups(1)[0]
for x in r[36].split('|')
]), ';'.join([
'' if ptmty.match(x) is None else ptmty.match(x).groups(1)[0]
for x in r[37].split('|')
]), '' if dipro.match(r[32]) is None else
dipro.match(r[32]).groups(1)[0], '' if dipro.match(r[33]) is None
else dipro.match(r[33]).groups(1)[0], '0'
if r[38].strip() == '-' else '1'
]
if not none_values:
thisRow = ['' if x is None else x for x in thisRow]
result.append(thisRow)
if outfile:
with open(outfile, 'w') as outf:
outf.write('\t'.join(header) + '\n')
for r in result:
outf.write('\t'.join(['' if x is None else x
for x in r]) + '\n')
return result
def get_elm_domains():
result = {}
url = urls.urls['ielm_domains']['url']
c = curl.Curl(url, silent = False)
data = c.result
soup = bs4.BeautifulSoup(data, 'html.parser')
tbl = soup.find('table').find_all('td')
rows = [tbl[x:x + 4] for x in xrange(0, len(tbl), 4)]
for r in rows:
uniprot = r[1].text
motif = r[0].text
if uniprot not in result:
result[uniprot] = {}
if motif not in result[uniprot]:
result[uniprot][motif] = []
result[uniprot][motif].append((r[2].text, r[3].text))
return result
[docs]def get_phosphoelm(organism = 9606, ltp_only = True):
"""
Downloads kinase-substrate interactions from phosphoELM.
Returns list of dicts.
:param int organism: NCBI Taxonomy ID.
:param bool ltp_only: Include only low-throughput interactions.
"""
result = []
non_digit = re.compile(r'[^\d.-]+')
if organism is None:
_organism = None
elif organism in taxonomy.phosphoelm_taxids:
_organism = taxonomy.phosphoelm_taxids[organism]
else:
sys.stdout.write('\t:: Unknown organism: `%u`.\n' % organism)
return []
url = urls.urls['p_elm']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = [
n for d, n in iteritems(data)
if d.startswith(urls.urls['p_elm']['psites'])
]
data = data[0] if len(data) > 0 else ''
data = [l.split('\t') for l in data.split('\n')]
kinases = get_phelm_kinases()
del data[0]
for l in data:
if len(l) == 9 and (l[7] == _organism or _organism is None) \
and (not ltp_only or l[6] == 'LTP'):
l[1] = 1 if '-' not in l[0] else int(l[0].split('-')[1])
l[0] = l[0].split('-')[0]
del l[-1]
if len(l[5]) > 0 and l[5] in kinases:
kinase = kinases[l[5]]
result.append({
'instance': None,
'isoform': l[1],
'resaa': l[3],
'resnum': int(non_digit.sub('', l[2])),
'start': None,
'end': None,
'substrate': l[0],
'kinase': kinase,
'references': l[4].split(';'),
'experiment': l[6],
'organism': l[7]
})
return result
def phelm_interactions(organism = 'Homo sapiens'):
result = []
data = get_phosphoelm(ltp_only = True)
for l in data:
result.append([
l['kinase'], l['substrate'], ';'.join(l['references']),
l['organism']
])
return result
def phelm_psites():
result = []
data = get_phosphoelm()
kinases = get_phelm_kinases()
for l in data:
l.append('1' if '-' not in l[0] else l[0].split('-')[1])
l[0] = l[0].split('-')[0]
l.append('' if l[4] not in kinases else kinases[l[4]])
result.append(l)
return result
def get_phelm_kinases():
result = {}
url = urls.urls['p_elm_kin']['url']
c = curl.Curl(url, silent = False)
data = c.result
soup = bs4.BeautifulSoup(data, 'html.parser')
for row in soup.find_all('table')[1].find_all('tr'):
thisRow = [x.text for x in row.find_all('td')]
if len(thisRow) > 2 and len(thisRow[2].strip()) > 0:
result[thisRow[0]] = thisRow[2].strip()
return result
def get_elm_classes():
url = urls.urls['elm_class']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = [
x.split('\t')[:-2] for x in data.replace('"', '').split('\n')[6:]
if len(x) > 0
]
return dict(zip([x[1] for x in data], data))
def get_elm_instances():
url = urls.urls['elm_inst']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = data.replace('"', '').split('\t')
data = data[6:]
[docs]def elm_interactions():
"""
Downlods manually curated interactions from ELM.
This is the gold standard set of ELM.
"""
def number_or_none(value, typ = int):
return typ(value) if value != 'None' else None
retax = re.compile(r'"([0-9]+)"\([-:/,\.\[\]\(\)\w\s]+\)')
ELMInteraction = collections.namedtuple(
'ELMInteraction',
[
'motif_elm',
'domain_pfam',
'uniprot_motif',
'uniprot_domain',
'isoform_motif',
'isoform_domain',
'start_motif',
'end_motif',
'start_domain',
'end_domain',
'affinity_min',
'affinity_max',
'pubmeds',
'taxon_motif',
'taxon_domain',
],
)
result = []
url = urls.urls['elm_int']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = data.split('\n')
del data[0]
for l in data:
if not l:
continue
l = tuple(x.strip() for x in l.split('\t'))
uniprot_mofif, isoform_motif = reupi.match(l[2]).groups()
uniprot_domain, isoform_domain = reupi.match(l[3]).groups()
result.append(
ELMInteraction(
motif_elm = l[0],
domain_pfam = l[1],
uniprot_motif = uniprot_mofif,
uniprot_domain = uniprot_domain,
isoform_motif = int(isoform_motif) if isoform_motif else 1,
isoform_domain = int(isoform_domain) if isoform_domain else 1,
start_motif = int(l[4]),
end_motif = int(l[5]),
start_domain = number_or_none(l[6]),
end_domain = number_or_none(l[7]),
affinity_min = number_or_none(l[8], float),
affinity_max = number_or_none(l[9], float),
pubmeds = tuple(map(int, l[10].split(','))) if l[10] else (),
taxon_motif = int(retax.match(l[11]).groups()[0]),
taxon_domain = int(retax.match(l[12]).groups()[0]),
)
)
return result
def pfam_uniprot(uniprots, infile = None):
result = {}
url = urls.urls['pfam_up']['url']
infile = infile if infile is not None \
else os.path.join(settings.get('cachedir'), 'pfam-regions.tab.gz')
if not os.path.exists(infile):
sys.stdout.write('\t:: Downloading data...\n')
sys.stdout.flush()
if hasattr(urllib, 'urlretrieve'):
urllib.urlretrieve(url, infile)
else:
_urllib.request.urlretrieve(url, infile)
sys.stdout.write('\t:: Processing domains...\n')
sys.stdout.flush()
gzfile = gzip.open(infile, mode = 'r')
prg = progress.Progress(len(uniprots), 'Looking up domains', 1)
for l in gzfile:
l = l.split('\t')
if l[0] in uniprots:
prg.step()
if l[0] not in result:
result[l[0]] = {}
if l[4] not in result[l[0]]:
result[l[0]][l[4]] = []
result[l[0]][l[4]].append([l[1], l[5], l[6]])
prg.terminate()
return result
def get_dbptm(organism = 9606):
if organism is None:
_organism = None
elif organism in taxonomy.dbptm_taxids:
_organism = taxonomy.dbptm_taxids[organism]
else:
sys.stdout.write('\t:: Unknown organism: `%u`.\n' % organism)
return []
url = urls.urls['dbptm']['old_table']
c = curl.Curl(url, silent = False, large = True)
data = []
hdr = next(c.result).strip().split('\t')
for l in c.result:
l = l.strip().split('\t')
data.append(dict(
(
key,
(
None
if val == '' else
val.split(';')
if key in {'references', 'kinase'} else
int(val)
if val.isdigit() else
val
)
)
for key, val in zip(hdr, l)
))
return data
[docs]def get_dbptm_old(organism = 9606):
"""
Downloads enzyme-substrate interactions from dbPTM.
Returns list of dicts.
"""
if organism is None:
_organism = None
elif organism in taxonomy.dbptm_taxids:
_organism = taxonomy.dbptm_taxids[organism]
else:
sys.stdout.write('\t:: Unknown organism: `%u`.\n' % organism)
return []
result = []
byre = re.compile(r'.*by\s([A-Za-z0-9\s]+)\.*')
andre = re.compile(r',|and')
non_digit = re.compile(r'[^\d.-]+')
for url in urls.urls['dbptm']['urls']:
c = curl.Curl(url, silent = False)
extra = c.result
for k, data in iteritems(extra):
data = [x.split('\t') for x in data.split('\n')]
for l in data:
if len(l) > 8:
if _organism:
mnemonic = l[0].split('_')[1].strip()
if mnemonic != _organism:
continue
resnum = int(non_digit.sub('', l[2]))
ptm = ({
'substrate': l[1],
'typ': l[7].lower(),
'resaa': l[8][6],
'resnum': resnum,
'instance': l[8].strip(),
'references': l[4].split(';'),
'source': l[5].split()[0],
'kinase': None if byre.match(l[3]) is None else [
i.strip()
for i in andre.split(
byre.match(l[3]).groups(1)[0])
],
'start': resnum - 6,
'end': resnum + 6
})
if ptm['kinase'] is not None:
if 'autocatalysis' in ptm['kinase']:
ptm['kinase'].append(ptm['substrate'])
ptm['kinase'].remove('autocatalysis')
ptm['kinase'] = [
k.replace('host', '').strip()
for k in ptm['kinase']
]
ptm['kinase'] = [
k for k in ptm['kinase'] if len(k) > 0
]
if len(ptm['kinase']) == 0:
ptm['kinase'] = None
result.append(ptm)
return result
def dbptm_interactions():
result = []
data = get_dbptm()
for r in data:
if r['kinase'] is not None:
for src in r['kinase']:
result.append([
src, r['substrate'],
';'.join([i for i in r['references'] if i != '-'])
])
return result
def get_phosphonetworks():
result = []
reres = re.compile(r'([A-Z])([0-9]+)')
non_digit = re.compile(r'[^\d.-]+')
motre = re.compile(r'(-*)([A-Za-z]+)(-*)')
url = urls.urls['phosnw']['url']
c = curl.Curl(url, silent = False)
data = c.result
if data is None:
return None
data = data.split('\n')
for l in data:
if l.startswith('>'):
substrate = l[1:].strip()
elif len(l.split('\t')) >= 4:
l = [x.strip() for x in l.split('\t')]
res = reres.match(l[1])
resnum = int(non_digit.sub('', res.groups()[1]))
mot = motre.match(l[0])
if mot:
start = resnum - 7 + len(mot.groups()[0])
end = resnum + 7 - len(mot.groups()[2])
instance = l[0].replace('-', '').upper()
else:
start = None
end = None
instance = l[0]
result.append({
'instance': instance,
'kinase': l[2],
'resaa': res.groups()[0],
'resnum': resnum,
'score': float(non_digit.sub('', l[3])),
'substrate': substrate,
'start': start,
'end': end
})
return result
def pnetworks_interactions():
result = []
data = get_phosphonetworks()
for l in data:
result.append((l['kinase'], l['substrate']))
return [list(x) for x in list(set(result))]
def get_depod(organism = 9606):
result = []
reunip = re.compile(r'uniprotkb:([A-Z0-9]+)')
reptm = re.compile(r'([A-Z][a-z]{2})-([0-9]+)')
repmidsep = re.compile(r'[,|]\s?')
url = urls.urls['depod']['urls'][0]
c = curl.Curl(url, silent = False, encoding = 'ascii')
data = c.result
data = [x.split('\t') for x in data.split('\n')]
del data[0]
url_mitab = urls.urls['depod']['urls'][1]
c_mitab = curl.Curl(url_mitab, silent = False, encoding = 'iso-8859-1')
data_mitab = c_mitab.result
data_mitab = [x.split('\t') for x in data_mitab.split('\n')]
del data_mitab[0]
for i, l in enumerate(data):
if (
len(l) > 6 and
l[2] == 'protein substrate' and
taxonomy.ensure_ncbi_tax_id(
l[3].split('(')[0].strip()
) == organism and
l[4].strip() != 'N/A'
):
enzyme_uniprot = reunip.search(data_mitab[i][0]).groups()[0]
substrate_uniprot = reunip.search(data_mitab[i][1]).groups()[0]
for enzyme_up, substrate_up in itertools.product(
mapping.map_name(
enzyme_uniprot,
'uniprot',
'uniprot'
),
mapping.map_name(
substrate_uniprot,
'uniprot',
'uniprot'
),
):
for resaa, resnum in reptm.findall(l[4]):
resnum = int(resnum)
resaa = (
common.aminoa_3_to_1_letter[resaa]
if resaa in common.aminoa_3_to_1_letter else
resaa
)
result.append({
'instance': None,
'kinase': enzyme_up,
'resaa': resaa,
'resnum': resnum,
'references': repmidsep.split(l[6].strip()),
'substrate': substrate_up,
'start': None,
'end': None,
'typ': 'dephosphorylation',
})
return result
def get_mimp():
db_names = {
'PhosphoSitePlus': 'PhosphoSite',
'PhosphoELM': 'phosphoELM',
}
result = []
non_digit = re.compile(r'[^\d.-]+')
motre = re.compile(r'(-*)([A-Za-z]+)(-*)')
url = urls.urls['mimp']['url']
c = curl.Curl(url, silent = False)
data = c.result
kclass = get_kinase_class()
if data is None:
return None
data = [x.split('\t') for x in data.split('\n')]
del data[0]
for l in data:
if len(l) > 6 and len(l[2]) > 0:
kinases = l[2].split(';')
kinases_gnames = []
for k in kinases:
if k.endswith('GROUP'):
grp = k.split('_')[0]
if grp in kclass['groups']:
kinases_gnames += kclass['groups'][grp]
elif grp in kclass['families']:
kinases_gnames += kclass['families'][grp]
elif grp in kclass['subfamilies']:
kinases_gnames += kclass['subfamilies'][grp]
else:
kinases_gnames.append(k)
mot = motre.match(l[4])
for k in kinases_gnames:
resaa = l[4][7]
resnum = int(non_digit.sub('', l[3]))
if mot:
start = resnum - 7 + len(mot.groups()[0])
end = resnum + 7 - len(mot.groups()[2])
instance = l[4].replace('-', '').upper()
else:
start = None
end = None
instance = l[4]
databases = ';'.join(
tuple(
'%s_MIMP' % (db_names[db] if db in db_names else db)
for db in l[6].split(';')
) +
('MIMP',)
)
result.append({
'instance': instance,
'kinase': k.upper(),
'resaa': resaa,
'resnum': resnum,
'npmid': int(non_digit.sub('', l[5])),
'substrate_refseq': l[1],
'substrate': l[0],
'start': start,
'end': end,
'databases': databases,
})
return result
def mimp_interactions():
result = []
mimp = get_mimp()
for m in mimp:
result.append([m['kinase'], m['substrate'], m['databases']])
return result
def phosphopoint_interactions():
interactions = []
url = urls.urls['phosphopoint']['url']
c = curl.Curl(url, silent = False, large = True)
_ = next(c.result)
return [
l.strip().split(';')
for l in c.result
]
def phosphopoint_directions():
return [
l[:2] for l in phosphopoint_interactions()
]
def get_kinase_class():
result = {'groups': {}, 'families': {}, 'subfamilies': {}, 'kinases': {}}
tabs = re.compile(r'[\t]{3,}')
reps = re.compile(r'ps[0-9]*$')
url = urls.urls['kinclass']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = tabs.sub('', data)
data = [x.split('\t') for x in data.split('\n')]
data = data[9:]
for l in data:
if len(l) > 4:
kinase = reps.sub('', l[0])
group = l[2]
family = l[3]
subfamily = l[4]
if group not in result['groups']:
result['groups'][group] = []
result['groups'][group].append(kinase)
if family not in result['families']:
result['families'][family] = []
result['families'][family].append(kinase)
if subfamily not in result['subfamilies']:
result['subfamilies'][subfamily] = []
result['subfamilies'][subfamily].append(kinase)
result['kinases'][kinase] = {
'group': group,
'family': family,
'subfamily': subfamily
}
return result
def get_acsn():
greek = {
'_alpha_': 'A',
'_beta_': 'B',
'_gamma_': 'C',
'_delta_': 'D',
'_epsilon_': 'E'
}
regreek = re.compile(r'\b(' + '|'.join(greek.keys()) + r')\b')
result = []
url = urls.urls['acsn']['sif']
c = curl.Curl(url, silent = False)
data = c.result
data = [
x.split('\t')
for x in data.replace('\r', '').replace('*', '').strip().split('\n')
]
for l in data:
l[0] = regreek.sub('', l[0]).split('_')[0].split('~')[0]
l[2] = regreek.sub('', l[2]).split('_')[0].split('~')[0]
return data
def get_abs():
result = []
url = urls.urls['abs']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = [[x.replace('*', '') for x in xx.split('\t')]
for xx in data.split('\n')]
for d in data:
if len(d) > 2:
result.append([d[2], d[0]])
return result
def get_pazar():
url = urls.urls['pazar']['url_rescued']
c = curl.Curl(url, silent = False)
data = c.result
return [
list(map(x.split('\t').__getitem__, (1, 4, 10)))
for x in ''.join(data.values()).split('\n') if len(x) > 0
]
def get_htri():
HTRIInteraction = collections.namedtuple(
'HTRIInteraction',
[
'entrez_tf',
'genesymbol_tf',
'entrez_target',
'genesymbol_target',
'pubmed',
]
)
c = curl.Curl(
urls.urls['htri']['url'],
init_url = urls.urls['htri']['init_url'],
silent = False,
follow = False,
large = True,
)
data = c.result
_ = next(c.result)
return [
HTRIInteraction(
entrez_tf = fields[1],
genesymbol_tf = fields[2],
entrez_target = fields[3],
genesymbol_target = fields[4],
pubmed = fields[6],
)
for fields in
(
row.split(';') for row in data if row.strip()
)
]
def get_oreganno_old(organism = 9606):
taxids = common.swap_dict(taxonomy.taxids)
if organism in taxids:
organism = taxids[organism]
nsep = re.compile(r'([-A-Za-z0-9]{3,})[\s/\(]*.*')
nrem = re.compile(r'[-/]')
result = []
url = urls.urls['oreganno_old']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = [[xx.strip() for xx in x.split('\t')] for x in data.split('\n')
if len(x) > 0][1:]
for l in data:
if l[0] == organism and \
l[10] == 'TRANSCRIPTION FACTOR BINDING SITE' and \
l[3] == 'POSITIVE OUTCOME' and \
not l[11].startswith('UNKNOWN') and not l[14].startswith('UNKNOWN'):
result.append([
l[14]
if len(l[14]) < 3 else nrem.sub('',
nsep.findall(l[14])[0]), l[11]
if len(l[11]) < 3 else nrem.sub('', nsep.findall(l[11])[0]),
l[18]
])
return result
def get_oreganno(organism = 9606):
taxids = taxonomy.phosphoelm_taxids
if organism in taxids:
organism = taxids[organism]
nsep = re.compile(r'([-A-Za-z0-9]{3,})[\s/\(]*.*')
nrem = re.compile(r'[-/]')
result = []
url = urls.urls['oreganno']['url']
c = curl.Curl(url, silent = False, large = True)
data = c.result
_ = next(data)
for l in data:
if not l:
continue
l = [x.strip() for x in l.split('\t')]
if (l[1] == organism and
l[3] == 'TRANSCRIPTION FACTOR BINDING SITE' and
l[2] == 'POSITIVE OUTCOME' and
l[4] != 'N/A' and
l[7] != 'N/A'):
yield (
l[7]
if len(l[7]) < 3 else nrem.sub('',
nsep.findall(l[7])[0]), l[4]
if len(l[4]) < 3 else nrem.sub('', nsep.findall(l[4])[0]),
l[11] if l[11] != 'N/A' else ''
)
def get_cpdb_ltp():
return get_cpdb(
['HPRD', 'BioGRID', 'PhosphoPOINT', 'MINT', 'BIND', 'IntAct'])
def get_cpdb(exclude = None):
exclude = set(exclude) if type(exclude) is list else exclude
result = []
url = urls.urls['cpdb']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = [
x.split('\t') for x in data.split('\n')
if not x.startswith('#') and len(x) > 0
]
for l in data:
participants = l[2].split(',')
if len(participants) == 2:
if not exclude or len(set(l[0].split(',')) - exclude) > 0:
result.append([participants[0], participants[1], l[0], l[1]])
return result
def get_pathwaycommons(sources = None, types = None, sources_separated = True):
result = {}
interactions = []
if type(types) is list:
types = set(types)
source_names = {
'wp': 'WikiPathways',
'kegg': 'KEGG',
'bind': 'BIND',
'intact': 'IntAct',
'intact_complex': 'IntAct',
'panther': 'PANTHER',
'pid': 'NCI-PID',
'reactome': 'Reactome',
'dip': 'DIP',
'hprd': 'HPRD',
'inoh': 'INOH',
'netpath': 'NetPath',
'biogrid': 'BioGRID',
'corum': 'CORUM',
'psp': 'PhosphoSite'
}
directed = set([
'state-change', 'controls-state-change-of', 'controls-transport-of',
'controls-phosphorylation-of'
])
sources = list(source_names.keys()) \
if sources is None else sources
prg = progress.Progress(
len(sources), 'Processing PathwayCommons', 1, percent = False)
url = urls.urls['pwcommons']['url']
for s in sources:
prg.step()
surl = url % s
c = curl.Curl(surl, silent = False, large = True)
for l in c.result:
l = l.decode('ascii').strip().split('\t')
if types is None or l[1] in types:
if sources_separated:
l.append(source_names[s])
interactions.append(l)
else:
pair = (l[0], l[2])
if pair not in result:
result[pair] = [set([]), set([]), 0]
result[pair][0].add(source_names[s])
result[pair][1].add(l[1])
if l[1] in directed:
result[pair][2] = 1
if not sources_separated:
for pair, details in iteritems(result):
interactions.append([
pair[0], pair[1], ';'.join(details[0]), ';'.join(details[1]),
str(details[2])
])
return interactions
[docs]def go_annotations_uniprot(organism = 9606, swissprot = 'yes'):
"""
Deprecated, should be removed soon.
"""
rev = '' if swissprot is None \
else ' AND reviewed:%s' % swissprot
query = 'organism:%u%s' % (int(organism), rev)
url = urls.urls['uniprot_basic']['url']
get = {'query': query, 'format': 'tab', 'columns': 'id,go-id'}
c = curl.Curl(url, get = get, silent = False)
data = c.result
return dict([(x[0], [go.strip() for go in x[1].split(';')])
for x in [x.split('\t') for x in data.split('\n')]
if len(x) > 1])
[docs]def go_annotations_goa(organism = 'human'):
"""
Downloads GO annotation from UniProt GOA.
"""
organism = (
taxonomy.taxids[organism]
if isinstance(organism, int) else
organism
)
annot = dict(
(asp, collections.defaultdict(set))
for asp in ('C', 'P', 'F')
)
url = urls.urls['goa']['ebi_url'] % (organism.upper(), organism)
c = curl.Curl(url, silent = False, large = True)
for line in c.result:
if not line or line[0] == '!':
continue
line = line.strip().split('\t')
annot[line[8]][line[1]].add(line[4])
return annot
# synonym for the default method
go_annotations = go_annotations_goa
[docs]def go_ancestors_goose(aspects = ('C','F','P')):
"""
Queries the ancestors of GO terms by AmiGO goose.
Returns dict of sets where keys are GO accessions and values are sets
of their ancestors.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
aspects_part = ''
respaces = re.compile(r'[\s\n]+')
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
if set(aspects) != {'C', 'F', 'P'}:
aspects_part = 'WHERE (%s)' % (
' OR '.join(
'term.term_type = "%s"' % ontologies[asp]
for asp in aspects
)
)
sql_path = os.path.join(common.DATA, 'goose_ancestors.sql')
with open(sql_path, 'r') as fp:
query = fp.read()
query = query % aspects_part
query = respaces.sub(r' ', query).strip()
url = urls.urls['goose']['url'] % query
c = curl.Curl(url, silent = False, large = True)
ancestors = collections.defaultdict(set)
for l in c.result:
l = l.strip().split('\t')
ancestors[l[0]].add(l[1])
return ancestors
[docs]def go_ancestors_quickgo(aspects = ('C', 'F', 'P')):
"""
Queries the ancestors of GO terms by QuickGO REST API.
Returns dict of sets where keys are GO accessions and values are sets
of their ancestors.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
desc = go_descendants_quickgo(aspects = aspects)
return go_descendants_to_ancestors(desc)
# synonym for the default method
go_ancestors = go_ancestors_quickgo
[docs]def go_descendants_to_ancestors(desc):
"""
Turns a dict of descendants to dict of ancestors by swapping the
relationships. This way descendants will be the keys and their ancestors
will be the values.
"""
ancestors = {}
for asp, dct in iteritems(desc):
ancestors[asp] = collections.defaultdict(set)
for anc_term, des in iteritems(dct):
for des_term, rel in des:
ancestors[asp][des_term].add((anc_term, rel))
return ancestors
[docs]def go_descendants_goose(aspects = ('C','F','P')):
"""
Queries descendants of GO terms by AmiGO goose.
IMPORTANT:
This is not the preferred method any more to get descendants.
Recently the preferred method to access GO annotations is
``pypath.dataio.go_descendants_quickgo()``.
The data in GO MySQL instances has not been updated since Dec 2016.
Unfortunately the providers ceased to support MySQL, the most flexible
and highest performance access to GO data. The replacement is Solr
which is far from providing the same features as MySQL, for example
it is unable to provide GO graph relationships. Other service is QuickGO
which is up to date and has nice ways to query the ontology.
Returns dict of sets where keys are GO accessions and values are sets
of their descendants.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
desc = collections.defaultdict(set)
anc = go_ancestors_goose(aspects = aspects)
for term, ancs in iteritems(anc):
for terma in ancs:
desc[terma].add(term)
return desc
[docs]def go_descendants_quickgo(
aspects = ('C', 'F', 'P'),
terms = None,
relations = None,
quickgo_download_size = 500,
):
"""
Queries descendants of GO terms by QuickGO REST API.
Returns dict of sets where keys are GO accessions and values are sets
of their descendants.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
:param dict terms:
Result from ``go_terms_solr``. If ``None`` the method will be called.
"""
def download_in_chunks(terms, chunk_size, target = None):
target = target or collections.defaultdict(set)
paginator = common.paginate(terms, chunk_size)
for p, terms_part in enumerate(paginator):
url = urls.urls['quickgo_rest']['desc'] % (
','.join(terms_part),
'?relations = %s' % relations_part,
)
c = curl.Curl(
url,
req_headers = req_headers,
silent = True,
large = True,
)
try:
result = json.load(c.fileobj)
except json.decoder.JSONDecodeError:
done = chunk_size * p
remaining = terms[done:]
new_chunk_size = chunk_size // 2
if new_chunk_size < 10:
_log(
'Failed to download QuickGO, tried to decrease the '
'number of terms in each query, went below 10 terms '
'per query but still getting erroneous JSON. '
'This might be due to very slow network connection. '
'You might increase the timeout of CURL. '
'But then it will take forever.'
)
return target
return download_in_chunks(
terms = remaining,
chunk_size = new_chunk_size,
target = taret,
)
for res in result['results']:
if 'children' not in res:
continue
target[res['id']].update(
set(
(child['id'], child['relation'])
for child in res['children']
)
)
return target
desc = {}
terms = terms or go_terms_quickgo(aspects = aspects)
relations = relations or ('is_a', 'part_of', 'occurs_in', 'regulates',)
req_headers = ['Accept:application/json']
relations_part = ','.join(relations)
for asp in aspects:
desc[asp] = download_in_chunks(
terms = list(terms[asp].keys()),
chunk_size = quickgo_download_size,
)
return desc
# synonym for the default method
go_descendants = go_descendants_quickgo
[docs]def go_terms_solr(aspects = ('C', 'F', 'P')):
"""
Queries GO terms by AmiGO Solr.
Returns dict of dicts where upper level keys are one letter codes of the
aspects `C`, `F` and `P` for cellular_component, molecular_function and
biological_process, respectively. Lower level keys are GO accessions
and values are names of the terms.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
reamp = re.compile(r'[\s\n\r]+([&\?])')
relin = re.compile(r'[\s\n\r]+')
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
terms = dict((a, {}) for a in aspects)
query = '''
?q = document_category:"ontology_class" AND
idspace:GO AND
is_obsolete:0
&rows = 9999999
&start = 0
&fl = annotation_class,annotation_class_label,source
'''
query = relin.sub(' ', reamp.sub(r'\1', query.strip()))
# downloading data
url = urls.urls['golr']['url'] % query
c = curl.Curl(url, silent = False, large = True)
# parsing XML by lxml.etree.iterparse
parser = etree.iterparse(c.fileobj, events = ('start', 'end'))
root = next(parser)
used_elements = []
for ev, elem in parser:
if ev == 'end' and elem.tag == 'doc':
asp = elem.find('.//str[@name="source"]').text
asp = ontol_short[asp]
if asp not in aspects:
continue
term = elem.find('.//str[@name="annotation_class"]').text
name = elem.find('.//str[@name="annotation_class_label"]').text
terms[asp][term] = name
used_elements.append(elem)
# removing used elements to keep memory low
if len(used_elements) > 1000:
for _ in xrange(500):
e = used_elements.pop(0)
e.clear()
# closing the XML
c.fileobj.close()
del c
return terms
[docs]def go_terms_quickgo(aspects = ('C','F','P')):
"""
Queries GO terms by the QuickGO REST API.
Return dict of dicts where upper level keys are one letter codes of the
aspects `C`, `F` and `P` for cellular_component, molecular_function and
biological_process, respectively. Lower level keys are GO accessions
and values are names of the terms.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
result = dict((a, {}) for a in aspects)
url = urls.urls['quickgo_rest']['terms']
last_page = 9999999
this_page = 1
prg = progress.Progress(
name = 'Downloading data from QuickGO',
interval = 1,
)
while this_page <= last_page:
page_url = url % this_page
c = curl.Curl(page_url, silent = True)
this_result = json.loads(c.result)
last_page = this_result['pageInfo']['total']
for res in this_result['results']:
if 'aspect' not in res:
continue
asp = ontol_short[res['aspect']]
if res['isObsolete'] or asp not in aspects:
continue
result[asp][res['id']] = res['name']
if prg.total is None:
prg.set_total(last_page)
prg.step()
this_page += 1
return result
# synonym for the default method
go_terms = go_terms_quickgo
[docs]def go_terms_goose(aspects = ('C','F','P')):
"""
Queries GO terms by AmiGO goose.
Return dict of dicts where upper level keys are one letter codes of the
aspects `C`, `F` and `P` for cellular_component, molecular_function and
biological_process, respectively. Lower level keys are GO accessions
and values are names of the terms.
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
"""
aspects_part = ''
respaces = re.compile(r'[\s\n]+')
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
if set(aspects) != {'C', 'F', 'P'}:
aspects_part = 'WHERE (%s)' % (
' OR '.join(
'term.term_type = "%s"' % ontologies[asp]
for asp in aspects
)
)
sql_path = os.path.join(common.DATA, 'goose_terms.sql')
with open(sql_path, 'r') as fp:
query = fp.read()
query = query % aspects_part
query = respaces.sub(r' ', query).strip()
url = urls.urls['goose']['url'] % query
c = curl.Curl(url, silent = False, large = True)
terms = {'P': {}, 'C': {}, 'F': {}}
for l in c.result:
l = l.strip().split('\t')
if l[1] not in ontol_short:
continue
aspect = ontol_short[l[1]]
terms[aspect][l[2]] = l[0]
return terms
[docs]def go_annotations_quickgo(
organism = 9606,
aspects = ('C','F','P'),
relations = ('is_a', 'part_of'),
):
"""
Queries GO annotations by QuickGO REST API.
IMPORTANT:
Recently the preferred method to access GO annotations is
``pypath.dataio.go_annotations_goa()``.
Contrary to its name QuickGO is super slow, otherwise it should yield
up to date data, identical to the GOA file.
Returns terms in dict of dicts and annotations in dict of dicts of sets.
In both dicts the keys are aspects by their one letter codes.
In the term dicts keys are GO accessions and values are their names.
In the annotation dicts keys are UniProt IDs and values are sets
of GO accessions.
:param int organism:
NCBI Taxonomy ID of one organism. Default is human (9606).
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
:param list uniprots:
Optionally a list of UniProt IDs. If `None`, results for all proteins
returned.
"""
annot = dict((a, collections.defaultdict(set)) for a in aspects)
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
url = urls.urls['quickgo_rest']['annot']
aspects_part = ','.join(ontologies[a] for a in aspects)
relations_part = ','.join(relations)
req_headers = ['Accept:text/tsv']
page = 1
while True:
this_url = url % (
aspects_part, # aspect
relations_part, # goUsageRelationships
organism, # taxonId
page,
)
c = curl.Curl(
url = this_url,
req_headers = req_headers,
silent = False,
large = True
)
_ = next(c.result) # the header row
for line in c.result:
line = line.strip().split('\t')
if line[3] not in relations:
continue
annot[line[5]][line[1]].add(line[4])
page += 1
return annot
[docs]def go_annotations_solr(
organism = 9606,
aspects = ('C', 'F', 'P'),
references = False,
):
"""
Queries GO annotations by AmiGO Solr.
Before other methods have been provided to access GO.
Now this is the preferred method to get annotations.
Returns terms in dict of dicts and annotations in dict of dicts of sets.
In both dicts the keys are aspects by their one letter codes.
In the term dicts keys are GO accessions and values are their names.
In the annotation dicts keys are UniProt IDs and values are sets
of GO accessions.
:param int organism:
NCBI Taxonomy ID of one organism. Default is human (9606).
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
:param bool references:
Retrieve the references (PubMed IDs) for the annotations.
Currently not implemented.
"""
reamp = re.compile(r'[\s\n\r]+([&\?])')
relin = re.compile(r'[\s\n\r]+')
annot = dict((a, collections.defaultdict(set)) for a in aspects)
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
# assembling the query
if len(aspects) < 3:
aspects_part = ' AND (%s)' % (
' OR '.join('aspect:%s' % a for a in aspects)
)
else:
aspects_part = ''
refs_part = ',reference' if references else ''
query = '''
?q = taxon:"NCBITaxon:%u" AND
type:protein AND
document_category:annotation AND
source:UniProtKB%s
&rows = 9999999
&start = 0
&fl = bioentity,annotation_class,aspect%s
''' % (
organism,
aspects_part,
refs_part
)
query = relin.sub(' ', reamp.sub(r'\1', query.strip()))
# downloading data
url = urls.urls['golr']['url'] % query
c = curl.Curl(url, silent = False, large = True)
# parsing XML by lxml.etree.iterparse
parser = etree.iterparse(c.fileobj, events = ('start', 'end'))
root = next(parser)
used_elements = []
for ev, elem in parser:
if ev == 'end' and elem.tag == 'doc':
id_ = elem.find('.//str[@name="bioentity"]').text
if not id_.startswith('UniProtKB:'):
continue
asp = elem.find('.//str[@name="aspect"]').text
if asp not in aspects:
continue
term = elem.find('.//str[@name="annotation_class"]').text
id_ = id_[10:] # removing the `UniProtKB:` prefix
# adding the term to the annotation dict
annot[asp][id_].add(term)
used_elements.append(elem)
# removing used elements to keep memory low
if len(used_elements) > 1000:
for _ in xrange(500):
e = used_elements.pop(0)
e.clear()
# closing the XML
c.fileobj.close()
del c
return terms, annot
[docs]def go_annotations_goose(organism = 9606, aspects = ('C','F','P'), uniprots = None):
"""
Queries GO annotations by AmiGO goose.
IMPORTANT:
This is not the preferred method any more to get terms and annotations.
Recently the preferred method to access GO annotations is
``pypath.dataio.go_annotations_solr()``.
The data in GO MySQL instances has not been updated since Dec 2016.
Unfortunately the providers ceased to support MySQL, the most flexible
and highest performance access to GO data. The replacement is Solr
which is far from providing the same features as MySQL.
Returns terms in dict of dicts and annotations in dict of dicts of sets.
In both dicts the keys are aspects by their one letter codes.
In the term dicts keys are GO accessions and values are their names.
In the annotation dicts keys are UniProt IDs and values are sets
of GO accessions.
:param int organism:
NCBI Taxonomy ID of one organism. Default is human (9606).
:param tuple aspects:
GO aspects: `C`, `F` and `P` for cellular_component,
molecular_function and biological_process, respectively.
:param list uniprots:
Optionally a list of UniProt IDs. If `None`, results for all proteins
returned.
"""
aspects_part = ''
uniprot_part = ''
respaces = re.compile(r'[\s\n]+')
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
ontol_short = dict(reversed(i) for i in ontologies.items())
if set(aspects) != {'C', 'F', 'P'}:
aspects_part = '(%s) AND' % (
' OR '.join(
'term.term_type="%s"' % ontologies[asp]
for asp in aspects
)
)
if uniprots is not None:
uniprot_part = 'dbxref.xref_key IN (%s) AND' % (
','.join('"%s"' % uniprot for uniprot in uniprots)
)
sql_path = os.path.join(common.DATA, 'goose_annotations.sql')
with open(sql_path, 'r') as fp:
query = fp.read()
query = query % (organism, aspects_part, uniprot_part)
query = respaces.sub(r' ', query).strip()
url = urls.urls['goose']['url'] % query
c = curl.Curl(url, silent = False, large = True)
terms = {'P': {}, 'C': {}, 'F': {}}
annot = {
'C': collections.defaultdict(set),
'F': collections.defaultdict(set),
'P': collections.defaultdict(set),
}
for l in c.result:
l = l.strip().split('\t')
aspect = ontol_short[l[1]]
terms[aspect][l[2]] = l[0]
annot[aspect][l[5]].add(l[2])
return terms, annot
[docs]def get_go_desc(go_ids, organism = 9606):
"""
Deprecated, should be removed soon.
"""
go_ids = (
','.join(sorted(go_ids))
if type(go_ids) in {list, tuple, set} else
go_ids
)
url = urls.urls['quickgo_desc']['url'] % (organism, go_ids)
c = curl.Curl(
url, silent = False, large = True, req_headers = {'Accept': 'text/tsv'}
)
_ = c.result.readline()
return set(l.split('\t')[1] for l in c.result)
[docs]def get_go_quick(
organism = 9606,
slim = False,
names_only = False,
aspects = ('C', 'F', 'P'),
):
"""
Deprecated, should be removed soon.
Loads GO terms and annotations from QuickGO.
Returns 2 dicts: `names` are GO terms by their IDs,
`terms` are proteins GO IDs by UniProt IDs.
"""
ontologies = {
'C': 'cellular_component',
'F': 'molecular_function',
'P': 'biological_process',
}
terms = {
'C': collections.defaultdict(set),
'F': collections.defaultdict(set),
'P': collections.defaultdict(set),
}
names = {}
aspects_param = ','.join(sorted(ontologies[a] for a in aspects))
url = urls.urls['quickgo']['url'] % (
organism,
aspects_param,
'&goUsage = slim' if slim else '',
)
c = curl.Curl(url, silent = False, large = True)
_ = next(c.result)
for l in result:
l = l.split('\t')
if not names_only:
terms[l[5]][l[1]].add(l[4])
return {'terms': terms, 'names': names}
def get_goslim(url = None):
rego = re.compile(r'GO:[0-9]{7}')
url = url if type(url) in [str, unicode] \
else urls.urls['goslim_gen']['url']
c = curl.Curl(url, silent = False)
data = c.result
result = []
for l in data.split('\n'):
if l.startswith('id:'):
result += rego.findall(l)
return result
def netpath_names():
repwnum = re.compile(r'_([0-9]+)$')
result = {}
url = urls.urls['netpath_names']['url']
c = curl.Curl(url, silent = False)
html = c.result
soup = bs4.BeautifulSoup(html, 'html.parser')
for a in soup.find_all('a'):
if a.attrs['href'].startswith('pathways'):
num = repwnum.findall(a.attrs['href'])[0]
name = a.text
result[num] = name.strip()
return result
def netpath_pathway_annotations():
NetpathPathway = collections.namedtuple(
'NetpathPathway', ['pathway'],
)
result = collections.defaultdict(set)
url_template = urls.urls['netpath_pw']['url']
url_main = urls.urls['netpath_pw']['mainpage']
c = curl.Curl(url_main, cache = False)
cookie = [
h.decode().split(':')[1].split(';')[0].strip()
for h in c.resp_headers
if h.startswith(b'Set-Cookie')
]
cookie_hdr = ['Cookie: %s' % '; '.join(cookie)]
pathway_ids = netpath_names()
for _id, pathway in iteritems(pathway_ids):
url = url_template % int(_id)
c = curl.Curl(
url,
req_headers = cookie_hdr,
silent = False,
encoding = 'iso-8859-1',
)
soup = bs4.BeautifulSoup(c.result, 'html.parser')
for tbl in soup.find_all('table'):
hdr = tbl.find('td', {'class': 'barhead'})
if not hdr or not hdr.text.strip().startswith('Molecules Invol'):
continue
for td in tbl.find_all('td'):
genesymbol = td.text.strip()
if not genesymbol:
continue
uniprots = mapping.map_name(
genesymbol,
'genesymbol',
'uniprot',
)
for uniprot in uniprots:
result[uniprot].add(
NetpathPathway(
pathway = pathway
)
)
return result
def netpath_interactions():
result = []
repwnum = re.compile(r'NetPath_([0-9]+)_')
mi = '{net:sf:psidev:mi}'
url = urls.urls['netpath_psimi']['url']
c = curl.Curl(url, silent = False)
data = c.result
data = dict([(k, v) for k, v in iteritems(data) if k.endswith('xml')])
pwnames = netpath_names()
for pwfile, rawxml in iteritems(data):
try:
pwnum = repwnum.findall(pwfile)[0]
except:
sys.stdout.write('Error at processing file:\n')
sys.stdout.write(pwfile)
sys.stdout.write('\n')
sys.stdout.flush()
pwname = pwnames[pwnum]
root = ET.fromstring(rawxml)
for e in root.findall(mi + 'entry'):
thisInt = ()
db = [
pr.find(mi + 'primaryRef').attrib['db']
for pr in e.find(mi + 'source').findall(mi + 'xref')
]
refs = []
mets = []
for ex in e.find(mi + 'experimentList').findall(
mi + 'experimentDescription'):
for pm in ex.find(mi + 'bibref').iter(mi + 'primaryRef'):
if pm.attrib['db'] == 'pubmed':
refs.append(pm.attrib['id'])
for me in ex.find(mi + 'interactionDetectionMethod').\
iter(mi + 'shortLabel'):
mets.append(me.text)
mols = {}
for mo in e.find(mi + 'interactorList').findall(mi + 'interactor'):
iid = mo.attrib['id']
name = mo.find(mi + 'names').find(mi + 'shortLabel').text
entrez = ''
if mo.find(mi + 'xref') is not None:
entrez = ';'.join([
ac.attrib['id']
for ac in mo.find(mi + 'xref')
.findall(mi + 'secondaryRef')
if ac.attrib['db'] == 'Entrez gene'
])
mols[iid] = (name, entrez)
theInt = e.find(mi + 'interactionList').find(mi + 'interaction')
for p in theInt.find(mi + 'participantList').findall(
mi + 'participant'):
pid = p.find(mi + 'interactorRef').text
roles = ''
if p.find(mi + 'experimentalRoleList') is not None:
roles = ';'.join([
rl.find(mi + 'names').find(mi + 'shortLabel').text
for rl in p.find(mi + 'experimentalRoleList')
.findall(mi + 'experimentalRole')
])
mols[pid] += (roles, )
intTyp = theInt.find(mi + 'interactionType').find(mi + 'names')\
.find(mi + 'shortLabel').text
molkeys = list(mols.keys())
for i in range(0, len(mols) - 1):
for j in range(i, len(mols)):
A = mols[molkeys[i]][0:2]
B = mols[molkeys[j]][0:2]
result.append(
list(A) + list(B) +
[';'.join(refs), ';'.join(mets), intTyp, pwname])
return result
def get_pubmeds(pmids):
pmids = [str(pmid) for pmid in pmids]
url = urls.urls['pubmed-eutils']['url']
cache = len(pmids) < 10
data = {}
prg = progress.Progress(
len(pmids) / 100 + 1,
'Retrieving data from NCBI e-utils',
1,
percent = False)
for offset in xrange(0, len(pmids), 100):
prg.step()
post = {
'id': ','.join(pmids[offset:offset + 100]),
'retmode': 'json',
'db': 'pubmed'
}
for i in xrange(3):
try:
c = curl.Curl(
url,
silent = False,
cache = cache,
post = post,
override_post = True,
)
res = c.result
data = dict([(k, v)
for k, v in iteritems(json.loads(res)['result'])]
+ [(k, v) for k, v in iteritems(data)])
break
except ValueError:
sys.stdout.write('\t:: Error in JSON, retry %u\n' % i)
sys.stdout.flush()
prg.terminate()
return data
def get_lincs_compounds():
sys.stdout.write(
'\n\tReturned dict has names, brand names or company specific\n'
'\tIDs of compounds as keys, and tuples of PubChem, ChEMBL, ChEBI, InChi, \n'
'\tInChi Key, SMILES and LINCS as values.\n\n')
sys.stdout.flush()
c = curl.Curl(urls.urls['lincs-compounds']['url'], silent = False)
return dict(
[(key, pair[1])
for pair in [([
it for sl in [
filter(lambda z: len(z) > 0, y.split(';')) for y in x[1:4]
if len(y) > 0
] for it in sl
], (x[4], '' if len(x[7]) == 0 else 'CHEMBL%s' % x[7], ''
if len(x[8]) == 0 else 'CHEBI:%s' % x[8], x[9], x[10], x[11], x[3]
)) for x in [[b.strip() for b in a.split('\t')] for a in ''.join([
s.replace(',', '\t') if i % 2 == 0 else s.replace('\n', '')
for i, s in enumerate(c.result.split('"'))
]).split('\n')[1:] if len(a) > 0]] for key in pair[0]])
[docs]def ramilowski_interactions(putative = False):
"""
Downloads and processes ligand-receptor interactions from
Supplementary Table 2 of Ramilowski 2015.
"""
c = curl.Curl(urls.urls['rami']['url'], silent = False, large = True)
xlsname = c.fname
del(c)
raw = read_xls(xlsname, 'All.Pairs')[1:]
return [
[
r[1],
r[3],
r[13].replace(' ', ''),
';'.join(filter(len, itertools.chain(r[5:11], [r[15]])))
]
for r in raw
if r[15] != 'EXCLUDED not ligand' and (
putative or r[15] != 'putative'
)
]
return raw
def kirouac2010_interactions():
rename = re.compile(r'[A-Z]{2}[A-Z0-9][-A-Z0-9]*')
rerange = re.compile(r'([0-9])-([0-9])')
reslash = re.compile(r'.*?([A-Z0-9]{1,3}/[/A-Z0-9]+)')
def get_names(s):
names = set([])
prev = None
for n in s.split():
m = rename.findall(n)
if m:
prev = m
m = reslash.match(n)
if m:
for post in m.groups()[0].split('/'):
for pre in prev:
names.add('%s%s' % (pre, post))
else:
m = rerange.match(n)
if m:
intv = m.groups()
for post in range(int(intv[0]), int(intv[1]) + 1):
for pre in prev:
names.add('%s%u' % (pre, post))
else:
names.update(prev)
prev = None
return names
init_url = urls.urls['kirouac2010']['init_url']
req_headers = [
(
'User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:68.0) '
'Gecko/20100101 Firefox/68.0'
),
]
url = urls.urls['kirouac2010']['url']
c00 = curl.Curl(url, call = False, process = False)
if (
not os.path.exists(c00.cache_file_name) or
os.path.getsize(c00.cache_file_name) == 0
):
_log('Kirouac 2010 download: requesting website cookie.')
c0 = curl.Curl(
init_url,
silent = True,
large = False,
req_headers = req_headers,
follow = False,
cache = False,
)
cookies = []
if hasattr(c0, 'resp_headers'):
for hdr in c0.resp_headers:
if hdr.lower().startswith(b'set-cookie'):
cookie = hdr.split(b':')[1].split(b';')[0].strip()
if cookie not in cookies:
cookies.append(cookie.decode('ascii'))
cookies = '; '.join(cookies)
req_headers.append('Cookie: %s' % cookies)
_log('Response header: %s' % str(c0.resp_headers))
_log('Cookies: %s' % str(cookies))
_log('Request header: %s' % str(req_headers))
os.remove(c00.cache_file_name)
c = curl.Curl(
url,
silent = False,
large = True,
req_headers = req_headers,
)
xlsname = c.fname
del(c)
tbl = read_xls(xlsname, sheet = 'S12')
result = []
for r in tbl[2:]:
namesA = get_names(r[0])
namesB = get_names(r[1])
result.extend(list(itertools.product(namesA, namesB)))
return result
[docs]def get_hpmr_old():
"""
Deprecated, should be removed soon.
Downloads and processes the list of all human receptors from
human receptor census (HPMR -- Human Plasma Membrane Receptome).
Returns list of GeneSymbols.
"""
c = curl.Curl(urls.urls['hpmr']['url'], silent = False)
html = c.result
soup = bs4.BeautifulSoup(html, 'html.parser')
gnames = [
row[1].text
for row in (
tr.find_all('td')
for tr in soup.find(
'table', {'class': 'gridtable'}
).find_all('tr')
)
if len(row) > 1 and not row[1].text.lower().startswith('similar')
]
return common.uniq_list(gnames)
[docs]def hpmr_interactions_old():
"""
Deprecated, should be removed soon.
Downloads ligand-receptor and receptor-receptor interactions from the
Human Plasma Membrane Receptome database.
"""
cachefile = os.path.join(
settings.get('cachedir'),
'hpmr_interactions',
)
if os.path.exists(cachefile):
with open(cachefile, 'r') as fp:
result = [r.split('\t') for r in fp.read().split('\n')[1:]]
return result
rerecname = re.compile(r'Receptor ([A-z0-9]+) interacts with:')
reint = re.compile(r'(Receptor|Ligand) ([A-z0-9]+) -')
rerefid = re.compile(r'list_uids=([- \.:,0-9A-z]+)')
result = []
recpages = []
c = curl.Curl(urls.urls['hpmri']['browse'])
soup = bs4.BeautifulSoup(c.result, 'html.parser')
for rec in soup.find_all('a', {'title': 'Open Receptor Page'}):
recpages.append(rec.attrs['href'])
prg = progress.Progress(len(recpages), 'Downloading HPMR data', 1)
for url in recpages:
prg.step()
c = curl.Curl(url)
if c.result is None:
#print('No receptor page: %s' % url)
continue
soup = bs4.BeautifulSoup(c.result, 'html.parser')
ints = soup.find('div', {'id': 'GeneInts'})
if not ints:
#print('No interactions: %s' % url)
continue
recname = rerecname.search(
ints.find_previous_sibling('span').text
)
recname = recname.groups()[0] if recname else 'Unknown'
if recname == 'Unknown':
# print('Could not find receptor name: %s' % url)
continue
for td in ints.find_all('td'):
interactors = []
for span in td.find_all('span', {'class': 'IntRow'}):
ints = reint.search(span.text)
if ints:
interactors.append(ints.groups())
references = []
for ref in td.find_all(
'a', {'title': 'click to open reference in new window'}):
references.append(
rerefid.search(ref.attrs['href']).groups()[0]
)
result.extend([
[recname, i[0], i[1], ';'.join(references)]
for i in interactors
])
prg.terminate()
with open(cachefile, 'w') as fp:
fp.write('%s\n' % '\t'.join([
'receptor', 'partner', 'partner_type', 'references'
]))
fp.write('\n'.join('\t'.join(r) for r in result))
return result
[docs]def get_hpmr(use_cache = None):
"""
Downloads ligand-receptor and receptor-receptor interactions from the
Human Plasma Membrane Receptome database.
"""
def get_complex(interactors, typ, recname = None, references = None):
"""
typ : str
`Receptor` or `Ligand`.
"""
components = [i[1] for i in interactors if i[0] == typ]
if typ == 'Receptor' and recname:
components.append(recname)
if len(components) == 1:
return components[0]
elif len(components) > 1:
return components
cachefile = settings.get('hpmr_preprocessed')
use_cache = (
use_cache
if isinstance(use_cache, bool) else
settings.get('use_intermediate_cache')
)
if os.path.exists(cachefile) and use_cache:
_log('Reading HPMR data from cache file `%s`.' % cachefile)
return pickle.load(open(cachefile, 'rb'))
rerecname = re.compile(r'Receptor ([A-z0-9]+) interacts with:')
reint = re.compile(r'(Receptor|Ligand) ([A-z0-9]+) -')
rerefid = re.compile(r'list_uids=([- \.:,0-9A-z]+)')
refamid = re.compile(r'.*FamId=([0-9\.]+)')
a_family_title = 'Open Family Page'
a_receptor_title = 'Open Receptor Page'
a_titles = {a_family_title, a_receptor_title}
interactions = []
complex_interactions = []
families = {}
recpages = []
c = curl.Curl(urls.urls['hpmri']['browse'])
soup = bs4.BeautifulSoup(c.result, 'html.parser')
this_family = ('0', None)
this_subfamily = ('0', None)
this_subsubfamily = ('0', None)
for a in soup.find_all('a'):
a_title = a.attrs['title'] if 'title' in a.attrs else None
if a_title not in a_titles:
continue
if a_title == a_family_title:
family_id = refamid.match(a.attrs['href']).groups()[0]
if family_id.startswith(this_subfamily[0]):
this_subsubfamily = (family_id, a.text)
elif family_id.startswith(this_family[0]):
this_subfamily = (family_id, a.text)
this_subsubfamily = ('0', None)
else:
this_family = (family_id, a.text)
this_subfamily = ('0', None)
this_subsubfamily = ('0', None)
elif a_title == a_receptor_title:
recpages.append((
a.attrs['href'],
this_family[1],
this_subfamily[1],
this_subsubfamily[1],
))
prg = progress.Progress(len(recpages), 'Downloading HPMR data', 1)
i_complex = 0
for url, family, subfamily, subsubfamily in recpages:
prg.step()
c = curl.Curl(url)
if c.result is None:
#print('No receptor page: %s' % url)
continue
soup = bs4.BeautifulSoup(c.result, 'html.parser')
ints = soup.find('div', {'id': 'GeneInts'})
if not ints:
#print('No interactions: %s' % url)
continue
recname = rerecname.search(
ints.find_previous_sibling('span').text
)
recname = recname.groups()[0] if recname else 'Unknown'
if recname == 'Unknown':
# print('Could not find receptor name: %s' % url)
continue
recname_u = mapping.map_name0(recname, 'genesymbol', 'uniprot')
if not recname_u:
continue
families[recname_u] = (
family,
subfamily,
subsubfamily,
)
for td in ints.find_all('td'):
interactors = []
for span in td.find_all('span', {'class': 'IntRow'}):
ints = reint.search(span.text)
if ints:
interactors.append(ints.groups())
references = []
for ref in td.find_all(
'a', {'title': 'click to open reference in new window'}):
references.append(
rerefid.search(ref.attrs['href']).groups()[0].strip()
)
interactors_u = []
for role, genesymbol in interactors:
uniprot = (
mapping.map_name0(genesymbol, 'genesymbol', 'uniprot')
)
if uniprot:
interactors_u.append((role, uniprot))
interactions.extend([
[recname_u, i[0], i[1], ';'.join(references)]
for i in interactors_u
])
rec_complex = get_complex(
interactors_u,
'Receptor',
recname = recname_u,
references = references,
)
lig_complex = get_complex(
interactors_u,
'Ligand',
references = references,
)
if (
isinstance(rec_complex, list) or
isinstance(lig_complex, list)
):
complex_interactions.append((lig_complex, rec_complex))
prg.terminate()
result = {
'interactions': interactions,
'families': families,
'complex_interactions': complex_interactions,
}
pickle.dump(result, open(cachefile, 'wb'))
return result
def hpmr_complexes(use_cache = None):
hpmr_data = get_hpmr(use_cache = use_cache)
complexes = {}
i_complex = 0
for components in itertools.chain(*hpmr_data['complex_interactions']):
if isinstance(components, list):
cplex = intera.Complex(
components = components,
sources = 'HPMR',
ids = 'HPMR-COMPLEX-%u' % i_complex,
)
complexes[cplex.__str__()] = cplex
return complexes
def hpmr_interactions(use_cache = None):
hpmr_data = get_hpmr(use_cache = use_cache)
return hpmr_data['interactions']
def hpmr_annotations(use_cache = None):
annot = collections.defaultdict(set)
HPMRAnnotation = collections.namedtuple(
'HPMRAnnotation',
('role', 'mainclass', 'subclass', 'subsubclass'),
)
hpmr_data = get_hpmr(use_cache = use_cache)
for i in hpmr_data['interactions']:
# first partner is always a receptor
# (because ligand pages simply don't work on HPMR webpage)
args1 = ('Receptor',) + (
hpmr_data['families'][i[0]]
if i[0] in hpmr_data['families'] else
(None, None, None)
)
# the second is either a ligand or another receptor
args2 = (i[1],) + (
hpmr_data['families'][i[2]]
if i[2] in hpmr_data['families'] else
(None, None, None)
)
annot[i[0]].add(HPMRAnnotation(*args1))
annot[i[2]].add(HPMRAnnotation(*args2))
for uniprot, classes in iteritems(hpmr_data['families']):
args = ('Receptor',) + classes
annot[uniprot].add(HPMRAnnotation(*args))
return dict(annot)
def adhesome_interactions():
AdhesomeInteraction = collections.namedtuple(
'AdhesomeInteraction',
['source', 'target', 'effect', 'type', 'pmid'],
)
url = urls.urls['adhesome']['interactions']
c = curl.Curl(url, large = True, silent = False)
data = csv.DictReader(c.result, delimiter = ',')
result = []
for rec in data:
result.append(
AdhesomeInteraction(
source = rec['Source'],
target = rec['Target'],
effect = rec['Effect'],
type = rec['Type'],
pmid = rec['PMID'],
)
)
return result
def get_cpad():
url = urls.urls['cpad']['url']
c = curl.Curl(url, silent = False, large = True, encoding = 'iso-8859-1')
reader = csv.DictReader(c.result, delimiter = '\t')
return reader
def cpad_annotations(include_unknown_type = False):
CpadAnnotation = collections.namedtuple(
'CpadAnnotation',
[
'regulator_type',
'effect_on_pathway',
'pathway',
'effect_on_cancer',
'effect_on_cancer_outcome',
'cancer',
'pathway_category',
]
)
cpad = get_cpad()
result = collections.defaultdict(set)
for rec in cpad:
if rec['Regulator'] == 'NULL':
continue
for regulator in rec['Regulator'].split(' and '):
uniprot = mapping.map_name0(regulator, 'genesymbol', 'uniprot')
if uniprot:
regulator_name = uniprot
regulator_type = 'protein'
else:
mirbase = mapping.map_name(
'hsa-%s' % regulator,
'mir-mat-name',
'mirbase',
)
if not mirbase:
mirbase = mapping.map_name(
'hsa-%s' % regulator,
'mir-name',
'mirbase',
)
if mirbase:
regulator_name = mirbase
regulator_type = 'mirna'
else:
if include_unknown_type:
regulator_name = regulator
regulator_type = 'unknown'
else:
continue
if isinstance(regulator_name, common.basestring):
regulator_name = (regulator_name,)
for regulator_name_0 in regulator_name:
record = CpadAnnotation(
regulator_type = regulator_type,
effect_on_pathway = rec['Regulator_Type'],
effect_on_cancer = rec['Regulation_Type'],
effect_on_cancer_outcome = rec['Outcome_Description'],
pathway = rec['Pathway'],
pathway_category = rec['Pathway_Category'],
cancer = rec['Cancer'],
)
result[regulator_name_0].add(record)
return result
[docs]def cpad_pathway_cancer():
"""
Collects only the pathway-cancer relationships. Returns sets of records
grouped in dicts by cancer and by pathway.
"""
CpadPathwayCancer = collections.namedtuple(
'CpadPathwayCancer',
[
'pathway',
'cancer',
'pathway_category',
'effect_on_cancer',
'effect_on_cancer_outcome',
]
)
cpad = get_cpad()
by_cancer = collections.defaultdict(set)
by_pathway = collections.defaultdict(set)
for rec in cpad:
record = CpadPathwayCancer(
pathway = rec['Pathway'],
cancer = rec['Cancer'],
pathway_category = rec['Pathway_Category'],
effect_on_cancer = rec['Regulation_Type'],
effect_on_cancer_outcome = rec['Outcome_Description'],
)
by_cancer[rec['Cancer']].add(record)
by_pathway[rec['Pathway']].add(record)
return by_cancer, by_pathway
[docs]def get_integrins():
"""
Returns a set of the UniProt IDs of the human integrins from
Table 1 of Takada et al 2007 (10.1186/gb-2007-8-5-215).
"""
url = urls.urls['integrins']['url']
req_headers = [
'Host: www.ncbi.nlm.nih.gov',
'User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:67.0) '\
'Gecko/20100101 Firefox/67.0',
'Accept: text/html,application/xhtml+xml,'
'application/xml;q=0.9,*/*;q=0.8',
'Accept-Language: en-US,en;q=0.5',
'Connection: keep-alive',
'Upgrade-Insecure-Requests: 1',
'Pragma: no-cache',
'Cache-Control: no-cache',
]
c = curl.Curl(
url, silent = False, req_headers = req_headers, large = True,
)
soup = bs4.BeautifulSoup(c.fileobj.read(), 'lxml')
integrins = []
rows = soup.find_all('tr')
for tr in rows[1:]:
cells = [td for td in tr.find_all('td')]
integrins.append(cells[-1].text.split('}')[-1])
return mapping.map_names(integrins, 'uniprot', 'uniprot')
[docs]def get_tfcensus(classes = ('a', 'b', 'other')):
"""
Downloads and processes the list of all known transcription factors from
TF census (Vaquerizas 2009). This resource is human only.
Returns set of UniProt IDs.
"""
ensg = []
hgnc = []
reensg = re.compile(r'ENSG[0-9]{11}')
url = urls.urls['vaquerizas2009']['url']
c = curl.Curl(url, silent = False, large = True)
f = c.result
for l in f:
if len(l) > 0 and l.split('\t')[0] in classes:
ensg += reensg.findall(l)
h = l.split('\t')[5].strip()
if len(h) > 0:
hgnc.append(h)
return (
set.union(*(
mapping.map_name(e, 'ensembl', 'uniprot')
for e in ensg
)) |
set.union(*(
mapping.map_name(h, 'genesymbol', 'uniprot')
for h in hgnc
))
)
[docs]def get_guide2pharma(
organism = 'human',
endogenous = True,
process_interactions = True,
process_complexes = True,
):
"""
Downloads and processes Guide to Pharmacology data.
Returns list of dicts.
@organism : str
Name of the organism, e.g. `human`.
@endogenous : bool
Whether to include only endogenous ligands interactions.
"""
get_taxid = taxonomy.taxid_from_common_name
if isinstance(organism, common.basestring):
try:
organism = taxonomy.taxid_from_common_name(organism)
except KeyError:
organism = None
positives = {
'agonist', 'activator', 'potentiation', 'partial agonist',
'inverse antagonist', 'full agonist', 'activation',
'irreversible agonist', 'positive',
}
negatives = {
'inhibitor', 'antagonist', 'inhibition', 'irreversible inhibition',
'inverse agonist', 'negative', 'weak inhibition',
'reversible inhibition',
}
GuideToPharmacologyInteraction = collections.namedtuple(
'GuideToPharmacologyInteraction',
[
'ligand',
'ligand_id_type',
'target',
'target_id_type',
'target_is_ligand',
'ligand_organism',
'target_organism',
'effect',
'ligand_location',
'target_type',
'ligand_endogenous',
'pubmed_ids',
]
)
def is_positive(term):
return term.lower().strip() in positives
def is_negative(term):
return term.lower().strip() in negatives
interactions = []
complexes = {}
url = urls.urls['gtp']['url']
c = curl.Curl(url, silent = False, large = True, encoding = 'utf-8')
data = csv.DictReader(c.result)
#return data
if organism is not None:
data = [
d for d in data
if (
get_taxid(d['target_species']) == organism and
organism in set(
get_taxid(t)
for t in d['ligand_species'].split('|')
)
)
]
if endogenous:
data = [d for d in data if d['endogenous'].strip() == 't']
for d in data:
if is_positive(d['type']) or is_positive(d['action']):
effect = 1
elif is_negative(d['type']) or is_negative(d['action']):
effect = -1
else:
effect = 0
for ligand_taxon in d['ligand_species'].split('|'):
ligand_taxid = get_taxid(ligand_taxon)
ligands = d['ligand_gene_symbol'] or d['ligand_pubchem_sid']
ligands = ligands.split('|')
targets = (
d['target_uniprot'] or
d['target_ligand_uniprot'] or
d['target_ligand_pubchem_sid']
)
targets = targets.split('|')
references = d['pubmed_id'].split('|') if d['pubmed_id'] else []
if process_interactions:
for ligand, target in itertools.product(ligands, targets):
interactions.append(
GuideToPharmacologyInteraction(
ligand = ligand,
ligand_id_type = (
'genesymbol'
if d['ligand_gene_symbol'] else
'pubchem_sid'
if d['ligand_pubchem_sid'] else
None
),
target = target,
target_id_type = (
'uniprot'
if (
d['target_uniprot'] or
d['target_ligand_uniprot']
) else
'pubchem_sid'
if d['target_ligand_pubchem_sid'] else
None
),
target_is_ligand = bool(d['target_ligand']),
ligand_organism = ligand_taxid,
target_organism = get_taxid(d['target_species']),
effect = effect,
ligand_location = (
d['ligand_context'].strip().lower() or None
),
target_type = (
d['receptor_site'].strip().lower() or None
),
ligand_endogenous = (
d['endogenous'].strip() == 't'
),
pubmed_ids = references,
)
)
if process_complexes:
if (
len(targets) > 1 and (
d['target_uniprot'] or
d['target_ligand_uniprot']
)
):
cplex = intera.Complex(
components = targets,
sources = 'Guide2Pharma',
references = references,
)
key = cplex.__str__()
if key in complexes:
complexes[key] += cplex
else:
complexes[key] = cplex
if (
len(ligands) > 1 and
d['ligand_gene_symbol']
):
ligand_uniprots = [
mapping.map_name0(ligand, 'genesymbol', 'uniprot')
for ligand in ligands
]
ligand_uniprots = [u for u in ligand_uniprots if u]
if len(ligand_uniprots) > 1:
cplex = intera.Complex(
components = ligand_uniprots,
sources = 'Guide2Pharma',
references = references,
)
key = cplex.__str__()
if key in complexes:
complexes[key] += cplex
else:
complexes[key] = cplex
return interactions, complexes
def guide2pharma_interactions(**kwargs):
interactions, complexes = get_guide2pharma(
process_complexes = False,
**kwargs
)
return interactions
def guide2pharma_complexes(**kwargs):
interactions, complexes = get_guide2pharma(
process_interactions = False,
**kwargs
)
return complexes
[docs]def cellphonedb_ligands_receptors():
"""
Retrieves the set of ligands and receptors from CellPhoneDB.
Returns tuple of sets.
"""
receptors = set()
ligands = set()
proteins = cellphonedb_protein_annotations()
complexes = cellphonedb_complex_annotations()
for _id, annot in itertools.chain(
iteritems(proteins),
iteritems(complexes)
):
if annot.receptor:
receptors.add(_id)
if annot.secreted or (
not annot.receptor and (
annot.transmembrane or
annot.peripheral
)
):
ligands.add(_id)
return ligands, receptors
def _cellphonedb_annotations(url, name_method):
def get_bool(rec, attr):
return attr in rec and rec[attr] == 'True'
def get_desc(rec, attr):
desc = '%s_desc' % attr
return (
'' if (
attr in rec and rec[attr] == 'False' or
attr not in rec and not rec[desc]
) else
rec[desc] if rec[desc] else
attr.capitalize()
)
record = CellPhoneDBAnnotation
annot = {}
c = curl.Curl(url, large = True, silent = False)
tab = list(csv.DictReader(c.result))
for rec in tab:
names = name_method(rec)
if isinstance(names, (common.basestring, intera.Complex)):
names = (names,)
for name in names:
annot[name] = record(
receptor = get_bool(rec, 'receptor'),
receptor_class = (
get_desc(rec, 'receptor').replace('_add', '').lower() or
None
),
peripheral = get_bool(rec, 'peripheral'),
secreted = get_bool(rec, 'secreted'),
secreted_class = (
get_desc(rec, 'secreted').replace(' | ', ',').lower() or
None
),
transmembrane = get_bool(rec, 'transmembrane'),
integrin = get_bool(rec, 'integrin'),
)
return annot
def cellphonedb_protein_annotations():
def name_method(rec):
uniprot = rec['uniprot']
uniprot = _cellphonedb_hla(uniprot)
uniprot = mapping.map_names(uniprot, 'uniprot', 'uniprot')
return uniprot
return _cellphonedb_annotations(
url = urls.urls['cellphonedb_git']['proteins'],
name_method = name_method,
)
def _cellphonedb_hla(uniprot):
uniprots = None
if uniprot.startswith('HLA') and '-' not in uniprot:
genesymbol = 'HLA-%s' % uniprot[3:]
uniprots = mapping.map_name(genesymbol, 'genesymbol', 'uniprot')
return uniprots or uniprot
def cellphonedb_complex_annotations():
def get_uniprots(rec):
return tuple(
uniprot
for uniprot in
(rec['uniprot_%u' % i] for i in xrange(1, 5))
if uniprot
)
def get_stoichiometry(rec):
if not rec['stoichiometry']:
return get_uniprots(rec)
return tuple(
mapping.map_name0(genesymbol, 'genesymbol', 'uniprot')
for genesymbol in rec['stoichiometry'].split(';')
)
def name_method(rec):
comp = get_stoichiometry(rec)
cplex = intera.Complex(
name = rec['complex_name'],
components = comp,
sources = 'CellPhoneDB',
ids = rec['complex_name'],
)
return cplex
return _cellphonedb_annotations(
url = urls.urls['cellphonedb_git']['complexes'],
name_method = name_method,
)
def _cellphonedb_get_entity(name, complexes):
if name in complexes:
return (complexes[name],)
if ':' in name:
name = name.split(':')[1]
if '_' in name:
name = mapping.map_name0(name, 'name-entry', 'name')
if not uniprot_input.is_uniprot(name):
uniprot = mapping.map_name0(name, 'genesymbol', 'uniprot')
name = uniprot or name
name = _cellphonedb_hla(name)
return (name,) if isinstance(name, common.basestring) else name
def cellphonedb_interactions():
def get_type(entity):
return (
'ligand'
if entity in ligands else
'receptor'
if entity in receptors else
'unknown'
)
CellphonedbInteraction = collections.namedtuple(
'CellphonedbInteraction',
[
'id_a',
'id_b',
'sources',
'references',
'interaction_type',
'type_a',
'type_b',
]
)
repmid = re.compile(r'PMID: ([0-9]+)')
recomma = re.compile(r'[,;]')
ligands, receptors = cellphonedb_ligands_receptors()
complexes = dict(
(
_id,
cplex
)
for cplex in cellphonedb_complexes().values()
for _id in cplex.ids['CellPhoneDB']
)
url = urls.urls['cellphonedb_git']['interactions']
c = curl.Curl(url, silent = False, large = True)
reader = csv.DictReader(c.result)
for rec in reader:
_partner_a = _cellphonedb_get_entity(
rec['partner_a'],
complexes = complexes,
)
_partner_b = _cellphonedb_get_entity(
rec['partner_b'],
complexes = complexes,
)
for partner_a, partner_b in itertools.product(_partner_a, _partner_b):
type_a = get_type(partner_a)
type_b = get_type(partner_b)
rev = partner_b == 'ligand' and partner_b == 'receptor'
_type_a = type_b if rev else type_a
_type_b = type_a if rev else type_b
sources = (
'CellPhoneDB'
if rec['annotation_strategy'] == 'curated' else
'%s;CellPhoneDB' % (
';'.join(
'%s_CellPhoneDB' % (
res.replace(
'guidetopharmacology.org',
'Guide2Pharma'
)
)
for res in
recomma.split(rec['annotation_strategy'])
)
)
)
refs = ';'.join(repmid.findall(rec['source']))
type_b if rev else type_b
yield(
CellphonedbInteraction(
id_a = partner_b if rev else partner_a,
id_b = partner_a if rev else partner_b,
sources = sources,
references = refs,
interaction_type = '%s-%s' % (_type_a, _type_b),
type_a = _type_a,
type_b = _type_b,
)
)
def cellphonedb_complexes():
annot = cellphonedb_complex_annotations()
complexes = {}
for cplex in annot.keys():
key = cplex.__str__()
if key in annot:
cplex.add_attr('CellPhoneDB', annot[key])
complexes[key] = cplex
return complexes
[docs]def open_pubmed(pmid):
"""
Opens PubMed record in web browser.
@pmid : str or int
PubMed ID
"""
pmid = str(pmid)
url = urls.urls['pubmed']['url'] % pmid
webbrowser.open(url)
[docs]def only_pmids(idList, strict = True):
"""
Return elements unchanged which comply with the PubMed ID format,
and attempts to translate the DOIs and PMC IDs using NCBI
E-utils.
Returns list containing only PMIDs.
@idList : list, str
List of IDs or one single ID.
@strict : bool
Whether keep in the list those IDs which are not PMIDs,
neither DOIs or PMC IDs or NIH manuscript IDs.
"""
if type(idList) in common.simple_types:
idList = [idList]
pmids = {i for i in idList if isinstance(i, int) or i.isdigit()}
pmcids = [i for i in idList if i.startswith('PMC')]
dois = [i for i in idList if '/' in i]
manuscids = [i for i in idList if i.startswith('NIHMS')]
if not strict:
pmids = set(pmids) | set(dois) | set(pmcids) | set(manuscids)
if len(pmcids) > 0:
pmids = pmids | set(pmids_list(pmcids))
if len(dois) > 0:
pmids = pmids | set(pmids_list(dois))
return list(pmids)
[docs]def get_pmid(idList):
"""
For a list of doi or PMC IDs
fetches the corresponding PMIDs.
"""
if type(idList) in common.simple_types:
idList = [idList]
url = urls.urls['pubmed-eutils']['conv'] % ','.join(str(i) for i in idList)
c = curl.Curl(url, silent = True)
data = c.result
try:
js = json.loads(data)
except:
js = {}
return js
def pmids_dict(idList):
jsn = get_pmid(idList)
result = {'doi': {}, 'pmc': {}}
if 'records' in jsn:
for r in jsn['records']:
if 'pmid' in r:
if 'doi' in r:
result['doi'][r['pmid']] = r['doi']
if 'pmcid' in r:
result['pmc'][r['pmid']] = r['pmcid']
return result
def pmids_list(idList):
jsn = get_pmid(idList)
result = []
if 'records' in jsn:
for r in jsn['records']:
if 'pmid' in r:
result.append(r['pmid'])
return result
[docs]def get_hprd(in_vivo = True):
"""
Downloads and preprocesses HPRD data.
"""
url = urls.urls['hprd_all']['url']
files = [urls.urls['hprd_all']['ptm_file']]
c = curl.Curl(url, silent = False, files_needed = files)
data = c.result
if len(data) == 0:
return []
data = [l.split('\t') for l in data[files[0]].split('\n')][:-1]
if in_vivo:
data = [i for i in data if 'in vivo' in i[9].split(';')]
return data
[docs]def hprd_interactions(in_vivo = True):
"""
Processes HPRD data and extracts interactions.
Returns list of interactions.
"""
return [i for i in get_hprd(in_vivo = in_vivo) if i[6] != '-']
def hprd_htp():
url = urls.urls['hprd_all']['url']
fname = urls.urls['hprd_all']['int_file']
c = curl.Curl(url, silent = False, large = True, files_needed = [fname])
return list(
map(
lambda l: l.split('\t'),
c.result[fname].read().decode('ascii').split('\n')
)
)
[docs]def get_hprd_ptms(in_vivo = True):
"""
Processes HPRD data and extracts PTMs.
Returns list of kinase-substrate interactions.
"""
ptms = []
non_digit = re.compile(r'[^\d]+')
data = get_hprd(in_vivo = in_vivo)
for ptm in