#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#fromfuture.utilsimportiteritemsfrompast.builtinsimportxrange,rangeimportosimportreimportgzipimportstructimportcollectionstry:importurllib2except:importurllib.requestasurllib2importpypath.inputs.uniprot_dbasuniprot_dbimportpypath.share.progressasprogressimportpypath.share.curlascurlimportpypath.share.commonascommonimportpypath.share.cacheascacheimportpypath.share.sessionassessionimportpypath.resources.urlsasurlsimportpypath.utils.taxonomyastaxonomy_logger=session.Logger(name='pfam_input')_log=_logger._log
[docs]defpfam_uniprot(uniprots=None,organism=9606):""" Mappings between Pfam and UniProt. Args uniprots (set): The UniProt IDs to query. organism (int): NCBI Taxonomy ID of an organism. Returns A pair of dicts of sets, the first mapping from UniProt ACs to Pfam ACs, the second the other way around. """uniprots=uniprotsoruniprot_db.all_swissprots(organism=organism)uniprots=common.to_list(uniprots)u_pfam=collections.defaultdict(set)pfam_u=collections.defaultdict(set)ifuniprotsisnotNone:prg=progress.Progress(len(uniprots)/30,'Downloading data from UniProt',1,)data_all=[]foriinxrange(0,len(uniprots),200):to=i+200thisPart=uniprots[i:to]thisPart=' OR '.join(['accession:%s'%uforuinthisPart])get={'query':thisPart,'format':'tab','columns':'id,database(Pfam)'}forjinxrange(3):c=curl.Curl(urls.urls['uniprot_basic']['url'],get=get)data=c.resultifdata:breakdata=data.split('\n')deldata[0]deldata[-1]data_all+=dataprg.step()prg.terminate()else:organism=taxonomy.ensure_ncbi_tax_id(organism)ifnotorganism:returnNone,NoneorganismQuery='organism:%u AND reviewed:yes'%organismget={'query':organismQuery,'format':'tab','columns':'id,database(Pfam)'}forjinxrange(3):c=curl.Curl(urls.urls['uniprot_basic']['url'],get=get,silent=False,outf='uniprot-pfam-%u.tab'%organism,)data_all=c.resultifdata_all:breakdata_all=data_all.split('\n')deldata_all[0]forlindata_all:l=l.split('\t')pfams=re.sub(';$','',l[1]).strip()pfams=common.to_set(pfams.split(';')ifpfamselseset())u_pfam[l[0]].update(pfams)forpfaminpfams:pfam_u[pfam].add(l[0])returndict(u_pfam),dict(pfam_u)
[docs]defpfam_regions(uniprots=None,pfams=None,organism=9606,keepfile=True,value='both',):""" Args uniprots (set): UniProt IDs to include in the result. If neither this or ``pfams`` provided, all SwissProts for the given organism will be queried. pfams (set): Pfam IDs to include in the result. organism (int): NCBI Taxonomy ID (or any other name) of the organism. keepfile (bool): Keep the downloaded file in the cache directory. value (str): The return value: either "uniprot", "pfam" or "both". This is the direction of the mapping "uniprot" returns a dict with UniProt IDs as keys, "pfam" the other way around, a dict with Pfam IDs as keys, while "both" returns both dicts as a tuple. """url=urls.urls['pfam_up']['url']outf=common.suffix(url,'/')urlmd5=common.md5(url)cachefile=os.path.join(cache.get_cachedir(),'%s-%s'%(urlmd5,outf),)u_pfam={}pfam_u={}uniprots=common.to_set(uniprots)pfams=common.to_set(pfams)ifnotuniprotsandnotpfams:organism=taxonomy.ensure_ncbi_tax_id(organism)uniprots=uniprot_db.all_swissprots(organism=organism)ifnotos.path.exists(cachefile):_log('Downloading `%s` to `%s`.'%(url,cachefile))urllib2.urlretrieve(url,cachefile)_log('Finished downloading `%s` to `%s`.'%(url,cachefile))withopen(cachefile,'rb')asf:f.seek(-4,2)gzsize=struct.unpack('<I',f.read())[0]prg=progress.Progress(gzsize,'Processing Pfam domains',11)withgzip.open(cachefile,'r')asf:forlinf:prg.step(len(l))l=l.strip().split()ifl[0]inuniprotsorl[4]inpfams:ifvaluein{'uniprot','both'}:ifl[0]notinu_pfam:u_pfam[l[0]]={}ifl[4]notinu_pfam[l[0]]:u_pfam[l[0]][l[4]]=[]u_pfam[l[0]][l[4]].append({'isoform':int(l[1]),'start':int(l[5]),'end':int(l[6])})ifvaluein{'pfam','both'}:ifl[4]notinpfam_u:pfam_u[l[4]]={}ifl[0]notinpfam_u[l[4]]:pfam_u[l[4]][l[0]]=[]pfam_u[l[4]][l[0]].append({'isoform':int(l[1]),'start':int(l[5]),'end':int(l[6])})prg.terminate()ifnotkeepfile:os.remove(cachefile)ifvalue=='uniprot':returnu_pfamelifvalue=='pfam':returnpfam_uelse:returnu_pfam,pfam_u
[docs]defpfam_names():""" Mappings between Pfam accessions and human readable names. Returns A pair of dictionaries, the first maps from names to accessions, the second from accessions to names. """c=curl.Curl(urls.urls['pfam_pdb']['url'],silent=False)data=c.resultdname_pfam=collections.defaultdict(set)pfam_dname=collections.defaultdict(set)data=data.strip().split('\n')deldata[0]forlindata:l=l.split('\t')iflen(l)>5:pfam=common.prefix(l[4],'.')name=l[5]pfam_dname[pfam].add(name)dname_pfam[name].add(pfam)returndict(dname_pfam),dict(pfam_dname)
[docs]defpfam_pdb():""" Mappings between Pfam and PDB. Returns A pair of dicts of dicts, the first mapping from PDB IDs to Pfam ACs, the second the other way around. Each inner dict contains sets of domains as values, each domain defined by the PDB chain ID, and its start and end positions. """PfamDomain=collections.namedtuple('PfamDomain',('chain','start','end',),)c=curl.Curl(urls.urls['pfam_pdb']['url'],silent=False)data=c.resultpdb_pfam=collections.defaultdict(dict)pfam_pdb=collections.defaultdict(dict)data=data.strip().split('\n')[2:]forlindata:l=l.split('\t')iflen(l)>4:pfam=common.prefix(l[4],'.')pdb=l[0].lower()chain=l[1]start=int(common.non_digit.sub('',l[2]))end=int(common.non_digit.sub('',l[3]))domain=PfamDomain(chain,start,end)pdb_pfam[pdb][pfam]=domainpfam_pdb[pfam][pdb]=domainreturndict(pdb_pfam),dict(pfam_pdb)
def_pfam_uniprot(uniprots,infile=None):result={}url=urls.urls['pfam_up']['url']c=curl.Curl(url,large=True,silent=False)prg=progress.Progress(len(uniprots),'Looking up domains',1)forlinc.result:l=l.split('\t')ifl[0]inuniprots:prg.step()ifl[0]notinresult:result[l[0]]={}ifl[4]notinresult[l[0]]:result[l[0]][l[4]]=[]result[l[0]][l[4]].append([l[1],l[5],l[6]])prg.terminate()returnresult