Source code for pypath.inputs.pfam

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from future.utils import iteritems
from past.builtins import xrange, range

import os
import re
import gzip
import struct
import collections

try:
    import urllib2
except:
    import urllib.request as urllib2

import pypath.inputs.uniprot_db as uniprot_db
import pypath.share.progress as progress
import pypath.share.curl as curl
import pypath.share.common as common
import pypath.share.cache as cache
import pypath.share.session as session
import pypath.resources.urls as urls
import pypath.utils.taxonomy as taxonomy

_logger = session.Logger(name = 'pfam_input')
_log = _logger._log


[docs] def pfam_uniprot(uniprots = None, organism = 9606): """ Mappings between Pfam and UniProt. Args uniprots (set): The UniProt IDs to query. organism (int): NCBI Taxonomy ID of an organism. Returns A pair of dicts of sets, the first mapping from UniProt ACs to Pfam ACs, the second the other way around. """ uniprots = uniprots or uniprot_db.all_swissprots(organism = organism) uniprots = common.to_list(uniprots) u_pfam = collections.defaultdict(set) pfam_u = collections.defaultdict(set) if uniprots is not None: prg = progress.Progress( len(uniprots) / 30, 'Downloading data from UniProt', 1, ) data_all = [] for i in xrange(0, len(uniprots), 200): to = i + 200 thisPart = uniprots[i:to] thisPart = ' OR '.join(['accession:%s' % u for u in thisPart]) get = { 'query': thisPart, 'format': 'tab', 'columns': 'id,database(Pfam)' } for j in xrange(3): c = curl.Curl(urls.urls['uniprot_basic']['url'], get = get) data = c.result if data: break data = data.split('\n') del data[0] del data[-1] data_all += data prg.step() prg.terminate() else: organism = taxonomy.ensure_ncbi_tax_id(organism) if not organism: return None, None organismQuery = 'organism:%u AND reviewed:yes' % organism get = { 'query': organismQuery, 'format': 'tab', 'columns': 'id,database(Pfam)' } for j in xrange(3): c = curl.Curl( urls.urls['uniprot_basic']['url'], get = get, silent = False, outf = 'uniprot-pfam-%u.tab' % organism, ) data_all = c.result if data_all: break data_all = data_all.split('\n') del data_all[0] for l in data_all: l = l.split('\t') pfams = re.sub(';$', '', l[1]).strip() pfams = common.to_set(pfams.split(';') if pfams else set()) u_pfam[l[0]].update(pfams) for pfam in pfams: pfam_u[pfam].add(l[0]) return dict(u_pfam), dict(pfam_u)
[docs] def pfam_regions( uniprots = None, pfams = None, organism = 9606, keepfile = True, value = 'both', ): """ Args uniprots (set): UniProt IDs to include in the result. If neither this or ``pfams`` provided, all SwissProts for the given organism will be queried. pfams (set): Pfam IDs to include in the result. organism (int): NCBI Taxonomy ID (or any other name) of the organism. keepfile (bool): Keep the downloaded file in the cache directory. value (str): The return value: either "uniprot", "pfam" or "both". This is the direction of the mapping "uniprot" returns a dict with UniProt IDs as keys, "pfam" the other way around, a dict with Pfam IDs as keys, while "both" returns both dicts as a tuple. """ url = urls.urls['pfam_up']['url'] outf = common.suffix(url, '/') urlmd5 = common.md5(url) cachefile = os.path.join( cache.get_cachedir(), '%s-%s' % (urlmd5, outf), ) u_pfam = {} pfam_u = {} uniprots = common.to_set(uniprots) pfams = common.to_set(pfams) if not uniprots and not pfams: organism = taxonomy.ensure_ncbi_tax_id(organism) uniprots = uniprot_db.all_swissprots(organism = organism) if not os.path.exists(cachefile): _log('Downloading `%s` to `%s`.' % (url, cachefile)) urllib2.urlretrieve(url, cachefile) _log('Finished downloading `%s` to `%s`.' % (url, cachefile)) with open(cachefile, 'rb') as f: f.seek(-4, 2) gzsize = struct.unpack('<I', f.read())[0] prg = progress.Progress(gzsize, 'Processing Pfam domains', 11) with gzip.open(cachefile, 'r') as f: for l in f: prg.step(len(l)) l = l.strip().split() if l[0] in uniprots or l[4] in pfams: if value in {'uniprot', 'both'}: if l[0] not in u_pfam: u_pfam[l[0]] = {} if l[4] not in u_pfam[l[0]]: u_pfam[l[0]][l[4]] = [] u_pfam[l[0]][l[4]].append({ 'isoform': int(l[1]), 'start': int(l[5]), 'end': int(l[6]) }) if value in {'pfam', 'both'}: if l[4] not in pfam_u: pfam_u[l[4]] = {} if l[0] not in pfam_u[l[4]]: pfam_u[l[4]][l[0]] = [] pfam_u[l[4]][l[0]].append({ 'isoform': int(l[1]), 'start': int(l[5]), 'end': int(l[6]) }) prg.terminate() if not keepfile: os.remove(cachefile) if value == 'uniprot': return u_pfam elif value == 'pfam': return pfam_u else: return u_pfam, pfam_u
[docs] def pfam_names(): """ Mappings between Pfam accessions and human readable names. Returns A pair of dictionaries, the first maps from names to accessions, the second from accessions to names. """ c = curl.Curl(urls.urls['pfam_pdb']['url'], silent = False) data = c.result dname_pfam = collections.defaultdict(set) pfam_dname = collections.defaultdict(set) data = data.strip().split('\n') del data[0] for l in data: l = l.split('\t') if len(l) > 5: pfam = common.prefix(l[4], '.') name = l[5] pfam_dname[pfam].add(name) dname_pfam[name].add(pfam) return dict(dname_pfam), dict(pfam_dname)
[docs] def pfam_pdb(): """ Mappings between Pfam and PDB. Returns A pair of dicts of dicts, the first mapping from PDB IDs to Pfam ACs, the second the other way around. Each inner dict contains sets of domains as values, each domain defined by the PDB chain ID, and its start and end positions. """ PfamDomain = collections.namedtuple( 'PfamDomain', ( 'chain', 'start', 'end', ), ) c = curl.Curl(urls.urls['pfam_pdb']['url'], silent = False) data = c.result pdb_pfam = collections.defaultdict(dict) pfam_pdb = collections.defaultdict(dict) data = data.strip().split('\n')[2:] for l in data: l = l.split('\t') if len(l) > 4: pfam = common.prefix(l[4], '.') pdb = l[0].lower() chain = l[1] start = int(common.non_digit.sub('', l[2])) end = int(common.non_digit.sub('', l[3])) domain = PfamDomain(chain, start, end) pdb_pfam[pdb][pfam] = domain pfam_pdb[pfam][pdb] = domain return dict(pdb_pfam), dict(pfam_pdb)
def _pfam_uniprot(uniprots, infile = None): result = {} url = urls.urls['pfam_up']['url'] c = curl.Curl(url, large = True, silent = False) prg = progress.Progress(len(uniprots), 'Looking up domains', 1) for l in c.result: l = l.split('\t') if l[0] in uniprots: prg.step() if l[0] not in result: result[l[0]] = {} if l[4] not in result[l[0]]: result[l[0]][l[4]] = [] result[l[0]][l[4]].append([l[1], l[5], l[6]]) prg.terminate() return result