#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#from__future__importannotations"""Performs mapping between IDs of different consensus systems forproteins and genes, miRNAs, and chemical compounds."""fromfuture.utilsimportiteritemsfrompast.builtinsimportxrange,rangeimportosimportsysimportmathimportreimportimportlibasimpimportcollectionsimportfunctoolsimportdatetimeimporttimeimporturllibifnothasattr(urllib,'urlencode'):importurllib.parse_urllib=urlliburllib=_urllib.parseimportjsontry:importcPickleaspickleexcept:importpicklefromtypingimportIterable,List,Literal,Optional,Set,Unionimportpandasaspdimporttimeloop# from pypath:importpypath.share.progressasprogressimportpypath.share.commonascommonimportpypath_common._constantsas_constimportpypath.share.cacheascache_modimportpypath.internals.mapsasmapsimportpypath.resources.urlsasurlsimportpypath.share.curlascurlimportpypath.inputsasinputsimportpypath.inputs.uniprotasuniprot_inputimportpypath.inputs.uniprot_dbasuniprot_dbimportpypath.inputs.proaspro_inputimportpypath.inputs.biomartasbiomart_inputimportpypath.inputs.unichemasunichem_inputimportpypath.inputs.rampasramp_inputimportpypath.inputs.hmdbashmdb_inputimportpypath.internals.input_formatsasinput_formatsimportpypath.utils.reflistsasreflistsimportpypath.utils.taxonomyastaxonomyimportpypath.share.settingsassettingsimportpypath.share.sessionassession_mod_logger=session_mod.log()__all__=['MapReader','MappingTable','Mapper']_logger=session_mod.Logger(name='mapping')_log=_logger._logtry:UNICHEM_NAME_TYPES=set(unichem_input.unichem_sources().values())exceptExceptionase:exc=sys.exc_info()_log('Failed to retrieve UniChem ID types:')_logger._log_traceback()UNICHEM_NAME_TYPES=()RESOURCES_EXPLICIT=('uniprot','basic','mirbase','ipi')RESOURCES_IMPLICIT=((input_formats.AC_MAPPING,'uniprot',input_formats.UniprotListMapping,),(input_formats.PRO_MAPPING,'pro',input_formats.ProMapping,),(input_formats.BIOMART_MAPPING,'biomart',input_formats.BiomartMapping,),(input_formats.ARRAY_MAPPING,'array',input_formats.ArrayMapping,),({n:nforninUNICHEM_NAME_TYPES},'unichem',input_formats.UnichemMapping,),(dict(**{it:itforitinramp_input.ramp_id_types('compound')},**input_formats.RAMP_MAPPING,),'ramp',input_formats.RampMapping,),(dict(**{it:itforitinhmdb_input.ID_FIELDS},**input_formats.HMDB_MAPPING,),'hmdb',input_formats.HmdbMapping,),)UNIPROT_ID_TYPES={'uniprot','trembl','swissprot','uniprot-pri','uniprot-sec',}"""Classes for reading and use serving ID mapping data from custom file,function, UniProt, UniProt ID Mapping, Ensembl BioMart,PRO (Protein Ontology), miRBase or pickle file."""MappingTableKey=collections.namedtuple('MappingTableKey',['id_type','target_id_type','ncbi_tax_id',],)MappingTableKey.__new__.__defaults__=('protein',9606)
[docs]classMapReader(session_mod.Logger):""" Reads ID translation data and creates ``MappingTable`` instances. When initializing ID conversion tables for the first time data is downloaded from UniProt and read into dictionaries. It takes a couple of seconds. Data is saved to pickle dumps, this way later the tables load much faster. """
[docs]def__init__(self,param,ncbi_tax_id=None,entity_type=None,load_a_to_b=True,load_b_to_a=False,uniprots=None,lifetime=300,resource_id_types=None,):""" Args param (MappingInput): A mapping table definition, any child of the `internals.input_formats.MappingInput` class. ncbi_tax_id (int): NCBI Taxonomy identifier of the organism. entity_type (str): An optional, custom string showing the type of the entities, e.g. `protein`. This is not mandatory for the identification of mapping tables, hence the same name types can't be used for different entities. E.g. if both proteins and miRNAs have Entrez gene IDs then these should be different ID types (e.g. `entrez_protein` and `entrez_mirna`) or both protein and miRNA IDs can be loaded into one mapping table and simply called `entrez`. load_a_to_b (bool): Load the mapping table for translation from `id_type` to `target_id_type`. load_b_to_a (bool): Load the mapping table for translation from `target_id_type` to `id_type`. uniprots (set): UniProt IDs to query in case the source of the mapping table is the UniProt web service. lifetime (int): If this table has not been used for longer than this preiod it is to be removed at next cleanup. Time in seconds. Passed to ``MappingTable``. resource_id_types: Additional mappings between pypath and resource specific identifier type labels. """session_mod.Logger.__init__(self,name='mapping')self.ncbi_tax_id=(ncbi_tax_idorparam.ncbi_tax_idorsettings.get('default_organism'))self._log('Reader created for ID translation table, parameters: ''`ncbi_tax_id=%u, id_a=%s, id_b=%s, ''load_a_to_b=%u, load_b_to_a=%u, ''input_type=%s (%s)`.'%(self.ncbi_tax_id,param.id_type_a,param.id_type_b,load_a_to_b,load_b_to_a,param.type,param.__class__.__name__,))self.cachedir=cache_mod.get_cachedir()self.id_type_a=param.id_type_aself.id_type_b=param.id_type_bself.load_a_to_b=load_a_to_bself.load_b_to_a=load_b_to_aself.entity_type=entity_typeself.source_type=param.typeself.param=paramself.lifetime=lifetimeself.a_to_b=Noneself.b_to_a=Noneself.uniprots=uniprotsself._resource_id_types=resource_id_typesself.load()
[docs]defload(self):""" The complete process of loading mapping tables. First sets up the paths of the cache files, then loads the tables from the cache files or the original sources if necessary. Upon successful loading from an original source writes the results to cache files. """self.use_cache=settings.get('mapping_use_cache')self.setup_cache()ifself.use_cache:self.read_cache()ifnotself.tables_loaded():# read from the original sourceself.read()ifself.tables_loaded():# write cache only at successful loadingself.write_cache()
@propertydefmapping_table_a_to_b(self):""" Returns a ``MappingTable`` instance created from the already loaded data. """returnself._get_mapping_table('a','b')@propertydefmapping_table_b_to_a(self):""" Returns a ``MappingTable`` instance created from the already loaded data. """returnself._get_mapping_table('b','a')
[docs]defid_type_side(self,id_type):""" Tells if an ID type is on the "a" or "b" (source or target) side in the current mapping table definition. Args id_type (str): An ID type label. Returns Returns the string "a" if `id_type` is on the source side in the mapping table definition, "b" if it is on the target side, None if the `id_type` is not in the definition. """return('a'ifid_type==self.id_type_aelse'b'ifid_type==self.id_type_belseNone)
[docs]deftables_loaded(self):""" Tells if the requested tables have been created. """return((bool(self.a_to_b)ornotself.load_a_to_b)and(bool(self.b_to_a)ornotself.load_b_to_a))
[docs]defwrite_cache(self):""" Exports the ID translation data into pickle files. """self._write_cache('a','b')self._write_cache('b','a')
[docs]defread_cache(self):""" Reads the ID translation data from a previously saved pickle file. """self._read_cache('a','b')self._read_cache('b','a')
def_read_cache(self,*args):ifself._to_be_loaded(*args):cachefile=self._attr('cachefile',*args)ifos.path.exists(cachefile):withopen(cachefile,'rb')asfp:from_cache=pickle.load(fp)setattr(self,'%s_to_%s'%args,from_cache,)self._log('Loading `%s` to `%s` mapping table ''from pickle file `%s`.'%(self.param.id_type_a,self.param.id_type_b,cachefile,))def_to_be_loaded(self,*args):returnself._attr('load',*args)def_attr(self,attr,*args):returngetattr(self,self._attr_name(attr,*args))@staticmethoddef_attr_name(attr,*args):return'%s_%s_to_%s'%((attr,)+args)
[docs]defread(self):""" Reads the ID translation data from the original source. """method='read_mapping_%s'%self.source_typeifhasattr(self,method):getattr(self,method)()
[docs]defsetup_cache(self):""" Constructs the cache file path as md5 hash of the parameters. """self._setup_cache('a','b')self._setup_cache('b','a')
def_setup_cache(self,*args):mapping_id_attr=self._attr_name('mapping_id',*args)cachefile_attr=self._attr_name('cachefile',*args)setattr(self,mapping_id_attr,self._get_mapping_id(*args),)setattr(self,cachefile_attr,os.path.join(self.cachedir,getattr(self,mapping_id_attr)),)def_get_mapping_id(self,*args):""" Returns an md5 checksum unambigously identifying the mapping table by the identifiers, the direction of translation, the organism and other parameters like, for example, the source URL. """returncommon.md5(json.dumps((getattr(self,'id_type_%s'%args[0]),getattr(self,'id_type_%s'%args[1]),self.ncbi_tax_id,sorted(self.param.__dict__.items()))))def_cache_files_exist(self):""" Checks if both cache files are either not necessary or exist. """return(self.cache_file_exists('a','b')andself.cache_file_exists('b','a'))def_cache_file_exists(self,*args):""" Checks if a cache file is either not necessary or exists. """return(notself._attr('load',*args)oros.path.isfile(self._attr('cachefile',*args)))def_remove_cache_file(self,*args):cachefile=self._attr('cachefile',*args)ifos.path.exists(cachefile):self._log('Removing mapping table cache file `%s`.'%cachefile)os.remove(cachefile)
[docs]defread_mapping_file(self):""" Reads a mapping table from a local file or a function. """ifnotos.path.exists(self.param.input):method=inputs.get_method(self.param.input)ifnotmethod:return{}else:input_args=(self.param.input_argsifhasattr(self.param,'input_args')else{})infile=method(**input_args)else:infile=open(self.param.input,encoding='utf-8',mode='r')total=os.path.getsize(self.param.input)a_to_b=collections.defaultdict(set)b_to_a=collections.defaultdict(set)fori,lineinenumerate(infile):ifself.param.headerandi<self.param.header:continueifhasattr(line,'decode'):line=line.decode('utf-8')ifhasattr(line,'rstrip'):line=line.rstrip().split(self.param.separator)iflen(line)<max(self.param.col_a,self.param.col_b):continueid_a=line[self.param.col_a]id_b=line[self.param.col_b]ifself.load_a_to_b:a_to_b[id_a].add(id_b)ifself.load_b_to_a:b_to_a[id_b].add(id_a)ifhasattr(infile,'close'):infile.close()self.a_to_b=a_to_bifself.load_a_to_belseNoneself.b_to_a=b_to_aifself.load_b_to_aelseNone
[docs]defread_mapping_uniprot_list(self):""" Builds a mapping table by downloading data from UniProt's upload lists service. """a_to_b=collections.defaultdict(set)b_to_a=collections.defaultdict(set)swap=Falseifnotself.uniprots:self.set_uniprot_space()# We need a list to query this service, and we have method only for# getting a proteome wide list of UniProt IDs. If the translated# ID type is not UniProt, then first we need to translate the# proteome wide reference list from UniProt to the target ID type.ifnotself._uniprotkb_id_type(self.param.id_type_a):ifself._uniprotkb_id_type(self.param.id_type_b):swap=Trueself.param.swap_sides()self.load_a_to_b,self.load_b_to_a=(self.load_b_to_a,self.load_a_to_b,)upload_ac_list=self.uniprotselse:u_target=self._read_mapping_uniprot_list(uniprot_id_type_a='UniProtKB_AC-ID',uniprot_id_type_b=self.param.uniprot_id_type_a,)upload_ac_list=[l.split('\t')[1].strip()forlinu_target]else:upload_ac_list=self.uniprotsuniprot_data=self._read_mapping_uniprot_list(upload_ac_list=upload_ac_list,)ens=(self.param.id_type_a.startswith('ens')orself.param.id_type_b.startswith('ens')or'ensembl'inself.param.id_type_a.lower()or'ensembl'inself.param.id_type_b.lower())reens=re.compile(r'(ENS[A-Z]+\d+)\.\d+')forlinuniprot_data:ifnotl:continueifens:l=reens.sub(r'\1',l)l=l.strip().split('\t')ifself.load_a_to_b:a_to_b[l[0]].add(l[1])ifself.load_b_to_a:b_to_a[l[1]].add(l[0])ifswap:a_to_b,b_to_a=b_to_a,a_to_bself.load_a_to_b,self.load_b_to_a=(self.load_b_to_a,self.load_a_to_b,)self.param.swap_sides()self.a_to_b=a_to_bifself.load_a_to_belseNoneself.b_to_a=b_to_aifself.load_b_to_aelseNone
[docs]defset_uniprot_space(self,swissprot=None):""" Sets up a search space of UniProt IDs. Args swissprot (bool): Use only SwissProt IDs, not TrEMBL. True loads only SwissProt IDs, False only TrEMBL IDs, None loads both. """swissprot=self.param.swissprotifswissprotisNoneelseswissprotself.uniprots=uniprot_db.all_uniprots(self.ncbi_tax_id,swissprot=swissprot,)
def_read_mapping_uniprot_list(self,uniprot_id_type_a=None,uniprot_id_type_b=None,upload_ac_list=None,chunk_size=None,):""" Reads a mapping table from UniProt "upload lists" service. Args uniprot_id_type_a (str): Source ID type label as used in UniProt. uniprot_id_type_b (str): Target ID type label as used in UniProt. upload_ac_list (list): The identifiers to use in the query to the ID Mapping service. By default the list of all UniProt IDs for the organism is used. chunk_size (int): Number of IDs in one query. Too large queries might fail, by default we include 100,000 IDs in one query. """chunk_size=(chunk_sizeorsettings.get('uniprot_uploadlists_chunk_size'))uniprot_id_type_a=uniprot_id_type_aorself.param.uniprot_id_type_auniprot_id_type_b=uniprot_id_type_borself.param.uniprot_id_type_bifnotupload_ac_list:self._log('No identifiers provided, ''using all UniProt IDs of the organism.')upload_ac_list=self.uniprotsupload_ac_list=sorted(upload_ac_list)self._log('Querying the UniProt ID Mapping service for ID translation ''data. Querying a list of %u IDs.'%len(upload_ac_list))run_url=urls.urls['uniprot_idmapping']['run']poll_result={}result=[]# loading data in chunks of 10,000 by defaultforiinrange(math.ceil(len(upload_ac_list)/chunk_size)):this_chunk=upload_ac_list[i*chunk_size:(i+1)*chunk_size]self._log('Request to UniProt ID Mapping, chunk #%u with %u IDs.'%(i,len(this_chunk),))post={'from':uniprot_id_type_a,'to':uniprot_id_type_b,'ids':' '.join(sorted(this_chunk)),}accept_json={'req_headers':['Accept: application/json']}run_args={'url':run_url,'post':post}nocache={'cache':False,'large':False}large={'silent':False,'large':True}cache_path=curl.Curl.cache_path(**run_args)ifnotos.path.exists(cache_path):run_c=curl.Curl(**run_args,**nocache,**accept_json)ifrun_c.status!=200:raiseRuntimeError('Failed to submit job to UniProt ID Mapping. ''See details in the log.')jobid=json.loads(run_c.result)['jobId']self._log(f'Submitted job to UniProt ID Mapping, job ID: `{jobid}`.')timeout=settings.get('uniprot_idmapping_timeout')interval=settings.get('uniprot_idmapping_poll_interval')max_polls=math.ceil(timeout/interval)poll_url=urls.urls['uniprot_idmapping']['poll']%jobidpoll_args={'url':poll_url}|nocache|accept_jsonforiinrange(max_polls):self._log(f'Polling job UniProt ID Mapping job `{jobid}`, 'f'poll {i+1} of {max_polls}.')poll_c=curl.Curl(**poll_args)ifpoll_c.status!=200:self._log(f'Poll failed with HTTP {poll_c.status}.')continuepoll_result=json.loads(poll_c.result)if'status'inpoll_resultor'failedIds'inpoll_result:self._log(f'UniProt ID Mapping job `{jobid}` ''successfully completed.')breakelif'messages'inpoll_result:msg=('UniProt ID Mapping job failed: '+' '.join(common.to_list(poll_result['messages'])))self._log(msg)raiseRuntimeError(msg)time.sleep(interval)self._log('Getting UniProt ID Mapping results URL ''for job `{jobid}`.')det_url=urls.urls['uniprot_idmapping']['details']%jobiddet_c=curl.Curl(url=det_url,**nocache,**accept_json)result_url=(json.loads(det_c.result)['redirectURL'].replace('/idmapping/results/','/idmapping/stream/').replace('/results/','/results/stream/').__add__('?format=tsv'))self._log('Retrieving UniProt ID Mapping results 'f'from `{result_url}`.')withcurl.cache_delete_on():res_c=curl.Curl(url=result_url,cache=cache_path,**large)else:res_c=curl.Curl(**run_args,**large)result.extend(list(res_c.fileobj)[1:])returnresult
[docs]defread_mapping_uniprot(self):""" Downloads ID mappings directly from UniProt. See the names of possible identifiers here: http://www.uniprot.org/help/programmatic_access """query=uniprot_input.UniprotQuery(reviewed=Trueifself.param.swissprotelseNone,organism=self.ncbi_tax_id,fields=self.param._resource_id_type_a,)self._log(f'UniProt REST API call: `{query.url_plain}`.')trembl='trembl'inself.paramprotein_name=self.param.field=='protein names'query.name_process=notprotein_nameandnottrembldata=query.perform()ifnotquery.name_process:defmaybe_split(v):iftremblandnotany(ch.islower()forchinv):v=common.del_empty(query._FIELDSEP.split(v))elifprotein_name:v=self._process_protein_name(v)returnvdata={k:maybe_split(v)fork,vindata.items()}data={k:common.to_set(v)fork,vindata.items()}self.a_to_b=(common.swap_dict(data,force_sets=True)ifself.load_a_to_belseNone)self.b_to_a=dataifself.load_b_to_aelseNone
[docs]defread_mapping_biomart(self):""" Loads a mapping table using BioMart data. """ens_organism=taxonomy.ensure_ensembl_name(self.param.ncbi_tax_id)ifnotens_organism:self._log('Organism not available in Ensembl: `%u`.'%(self.param.ncbi_tax_id))returndataset='%s_gene_ensembl'%ens_organismbiomart_data=biomart_input.biomart_query(attrs=self.param.attrs,dataset=dataset,)a_to_b=collections.defaultdict(set)b_to_a=collections.defaultdict(set)forrecinbiomart_data:id_a=getattr(rec,self.param.biomart_id_type_a)id_b=getattr(rec,self.param.biomart_id_type_b)ifid_aandid_b:ifself.load_a_to_b:a_to_b[id_a].add(id_b)ifself.load_b_to_a:b_to_a[id_b].add(id_a)self.a_to_b=dict(a_to_b)ifself.load_a_to_belseNoneself.b_to_a=dict(b_to_a)ifself.load_b_to_aelseNone
[docs]defread_mapping_array(self):""" Loads mapping table between microarray probe IDs and genes. """probe_mapping=biomart_input.biomart_microarrays(organism=self.param.ncbi_tax_id,vendor=self.param.array_id,gene=self.param.ensembl_id=='ensg',transcript=self.param.ensembl_id=='enst',peptide=self.param.ensembl_id=='ensp',)a_to_b__probe_to_gene=self.param.id_type_a==self.param.array_idif((a_to_b__probe_to_geneandself.load_a_to_b)or(nota_to_b__probe_to_geneandself.load_b_to_a)):probe_to_gene=collections.defaultdict(set)forensembl_id,probesiniteritems(probe_mapping):forprobeinprobes:probe_to_gene[probe.probe].add(ensembl_id)setattr(self,'a_to_b'ifa_to_b__probe_to_geneelse'b_to_a',dict(probe_to_gene),)if((a_to_b__probe_to_geneandself.load_b_to_a)or(nota_to_b__probe_to_geneandself.load_a_to_b)):gene_to_probe=dict((ensembl_id,{p.probeforpinprobe_ids})forensembl_id,probe_idsiniteritems(probe_mapping))setattr(self,'b_to_a'ifa_to_b__probe_to_geneelse'a_to_b',gene_to_probe,)
def_read_mapping_smallmolecule(self):""" Loads a small molecule ID translation table. """ifself.param.input_method:method=inputs.get_method(self.param.input_method)else:mod=globals()[f'{self.source_type}_input']method=getattr(mod,f'{self.source_type}_mapping')data=method(id_type_a=self.resource_id_type_a,id_type_b=self.resource_id_type_b,)ifself.load_a_to_b:self.a_to_b=dataifself.load_b_to_a:self.b_to_a=common.swap_dict(data,force_sets=True)self.ncbi_tax_id=_const.NOT_ORGANISM_SPECIFIC
[docs]defread_mapping_ramp(self):""" Loads an ID translation table from RaMP. """self._read_mapping_smallmolecule()
[docs]defread_mapping_unichem(self):""" Loads an ID translation table from UniChem. """self._read_mapping_smallmolecule()
[docs]defread_mapping_hmdb(self):""" Loads an ID translation table from th Human Metabolome Database. """self._read_mapping_smallmolecule()
[docs]classMappingTable(session_mod.Logger):""" This is the class directly handling ID translation data. It does not care about loading it or what kind of IDs these only accepts the translation dictionary. lifetime : int If this table has not been used for longer than this preiod it is to be removed at next cleanup. Time in seconds. """
[docs]def__init__(self,data,id_type,target_id_type,ncbi_tax_id,lifetime=300,):""" Wrapper around a dictionary of identifier mapping. The dictionary is located in the `data` attribute, keys are the source identifiers, values are sets of target identifiers. Most often the mapping is unambigous, which means one target identifier for each source identifier. Args data (dict): The identifier translation dictionary. id_type (str): The source ID type. target_id_type (str): The target ID type. ncbi_tax_id (int): NCBI Taxonomy identifier of the organism. lifetime (int): Time in seconds to keep the table loaded in the memory. If not used, the table will be unloaded after this time. Each usage resets the expiry time. """session_mod.Logger.__init__(self,name='mapping')self.id_type=id_typeself.target_id_type=target_id_typeself.ncbi_tax_id=ncbi_tax_idself.data=dataself.lifetime=lifetimeself._used()
[docs]defget_key(self):""" Creates a mapping table key, a tuple with all the defining properties of the mapping table. """returnMappingTableKey(id_type=self.id_type,target_id_type=self.target_id_type,ncbi_tax_id=self.ncbi_tax_id,)
[docs]classMapper(session_mod.Logger):default_name_types=settings.get('default_name_types')default_label_types=settings.get('default_label_types')def_get_label_type_to_id_type(default_name_types):label_type_to_id_type=dict((label_type,default_name_types[entity_type],)forentity_type,label_typeiniteritems(settings.get('default_label_types')))#TODO: some nicer solutionlabel_type_to_id_type['mir-name']='mir-pre'returnlabel_type_to_id_typelabel_type_to_id_type=_get_label_type_to_id_type(default_name_types)
[docs]def__init__(self,ncbi_tax_id=None,cleanup_period=10,lifetime=300,translate_deleted_uniprot=None,keep_invalid_uniprot=None,trembl_swissprot_by_genesymbol=None,):""" cleanup_period : int Periodically check and remove unused mapping data. Time in seconds. If `None` tables kept forever. lifetime : int If a table has not been used for longer than this preiod it is to be removed at next cleanup. translate_deleted_uniprot : bool Do an extra attempt to translate deleted or obsolete UniProt IDs by retrieving their archived datasheet and use the gene symbol to find the corresponding valid UniProt ID? keep_invalid_uniprot : bool If the target ID is UniProt, keep the results if they fit the format for UniProt IDs (we won't check if they are deleted or from a different taxon). The alternative is to keep only those which are in the list of all UniProt IDs for the given organism. trembl_swissprot_by_genesymbol : bool Attempt to translate TrEMBL IDs to SwissProt by translating to gene symbols and then to SwissProt. """session_mod.Logger.__init__(self,name='mapping')cleanup_period=settings.get('mapper_cleanup_interval',cleanup_period)self._translate_deleted_uniprot=settings.get('mapper_translate_deleted_uniprot',translate_deleted_uniprot,)self._keep_invalid_uniprot=settings.get('mapper_keep_invalid_uniprot',keep_invalid_uniprot,)self._trembl_swissprot_by_genesymbol=settings.get('mapper_trembl_swissprot_by_genesymbol',trembl_swissprot_by_genesymbol,)self._mapper_cleanup_timeloop=timeloop.Timeloop()self._mapper_cleanup_timeloop.logger.setLevel(9999)forjobinself._mapper_cleanup_timeloop.jobs:ifjob.is_alive():job.stop()job.stopped.set()self._mapper_cleanup_timeloop.jobs=[]@self._mapper_cleanup_timeloop.job(interval=datetime.timedelta(seconds=cleanup_period))def_cleanup():self.remove_expired()self._mapper_cleanup_timeloop.start(block=False)# regex for matching UniProt AC formatself.reuniprot=re.compile(r'^(?:%s)$'%uniprot_input.reac.pattern)self.remipreac=re.compile(r'^MI\d{7}$')self.remimatac=re.compile(r'^MIMAT\d{7}$')self.remipreid=re.compile(r'^[a-z]{3}-'r'(?:mir|MIR|let|lsy|lin)-?'r'\d+-?[A-z\*]*(?:-((?!p)[\w\*\.-])+)?$')self.remimatid=re.compile(r'^[a-z]{3}-'r'(?:miR|let|lsy|lin)-?'r'\d+[a-z\*]*(?:-((?!p)[\w\*])+)?(?:-(3|5)p)?$')self.cachedir=cache_mod.get_cachedir()self.ncbi_tax_id=ncbi_tax_idorsettings.get('default_organism')self.unmapped=[]self.tables={}self.uniprot_mapped=[]self.trace=[]self.uniprot_static_names={'uniprot_id':'UniProtKB-ID','embl':'EMBL-CDS','embl_id':'EMBL','entrez':'GeneID','gi':'GI','refseqp':'RefSeq','refseqn':'RefSeq_NT','ensembl':'Ensembl','ensg':'ENSEMBL','ensp':'ENSEMBL_PRO_ID','enst':'ENSEMBL_TRS','hgnc':'HGNC',}self.names_uniprot_static=(common.swap_dict_simple(self.uniprot_static_names))
[docs]defreload(self):""" Reload the class from the module level. """modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)
[docs]defget_table_key(self,id_type,target_id_type,ncbi_tax_id=None,):""" Returns a tuple unambigously identifying a mapping table. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idreturnMappingTableKey(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)
[docs]defwhich_table(self,id_type,target_id_type,load=True,ncbi_tax_id=None,):""" Returns the table which is suitable to convert an ID of id_type to target_id_type. If no such table have been loaded yet, it attempts to load from UniProt. If all attempts failed returns `None`. """tbl=Nonencbi_tax_id=ncbi_tax_idorself.ncbi_tax_iddefcheck_loaded():returnself.which_table(id_type=id_type,target_id_type=target_id_type,load=False,ncbi_tax_id=ncbi_tax_id,)tbl_key=self.get_table_key(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)tbl_key_noorganism=self.get_table_key(*tbl_key[:-1],ncbi_tax_id=_const.NOT_ORGANISM_SPECIFIC,)tbl_key_rev=self.get_table_key(id_type=target_id_type,target_id_type=id_type,ncbi_tax_id=ncbi_tax_id,)tbl_key_rev_noorganism=self.get_table_key(*tbl_key_rev[:-1],ncbi_tax_id=_const.NOT_ORGANISM_SPECIFIC,)iftbl_keyinself.tables:tbl=self.tables[tbl_key]eliftbl_key_noorganisminself.tables:tbl=self.tables[tbl_key_noorganism]eliftbl_key_revinself.tables:self.create_reverse(tbl_key_rev)tbl=self.tables[tbl_key_rev]eliftbl_key_rev_noorganisminself.tables:self.create_reverse(tbl_key_rev_noorganism)tbl=self.tables[tbl_key_rev_noorganism]elifload:self._log('Requested to load ID translation table from ''`%s` to `%s`, organism: %u.'%(id_type,target_id_type,ncbi_tax_id,))ifid_type=='complex'ortarget_id_type=='complex':raiseValueError('Can not translate protein complexes.')id_types=(id_type,target_id_type)id_types_rev=tuple(reversed(id_types))resource=Noneforresource_attrinRESOURCES_EXPLICIT:resources=getattr(maps,resource_attr)ifid_typesinresources:resource=resources[id_types]load_a_to_b=Trueload_b_to_a=Falseelifid_types_revinresources:resource=resources[id_types_rev]load_a_to_b=Falseload_b_to_a=Trueifresource:self._log('Chosen built-in defined ID translation table: ''resource=%s, id_type_a=%s, id_type_b=%s'%(resource_attr,resource.id_type_a,resource.id_type_b,))self.load_mapping(resource=resource,load_a_to_b=load_a_to_b,load_b_to_a=load_b_to_a,ncbi_tax_id=ncbi_tax_id,)tbl=check_loaded()breakiftblisnotNone:breakiftblisNone:basic_services={'hmdb','ramp','uniprot','unichem'}for(service_ids,service_id_type,input_cls)in(RESOURCES_IMPLICIT):if((input_cls.possible(id_type,target_id_type,ncbi_tax_id,)andid_type!=target_id_type)or(service_id_type=='pro'and((id_typeinservice_idsortarget_id_typeinservice_ids)and(id_type==service_id_typeortarget_id_type==service_id_type)))or(service_id_type=='biomart'and((id_typeinservice_idsandtarget_id_typeinservice_ids)))or(service_id_type=='array'and((id_typeinservice_idsandtarget_id_typein{'ensg','enst','ensp'})or(target_id_typeinservice_idsandid_typein{'ensg','enst','ensp'})))):iftarget_id_type==service_id_type:_id_type,_target_id_type=(target_id_type,id_type,)load_a_to_b=Falseload_b_to_a=Trueelse:_id_type,_target_id_type=(id_type,target_id_type,)load_a_to_b=Trueload_b_to_a=Falseself._log('Chosen ID translation table from service: ''service=%s, id_type_a=%s, id_type_b=%s'%(service_id_type,_id_type,_target_id_type,))ifservice_id_typein{'hmdb','ramp','unichem'}:ncbi_tax_id=_const.NOT_ORGANISM_SPECIFICtbl_key=tbl_key_noorganismtbl_key_rev=tbl_key_rev_noorganism# for uniprot/idmapping or PRO or array# we create here the mapping paramsthis_param=input_cls(id_type_a=_id_type,id_type_b=_target_id_type,ncbi_tax_id=ncbi_tax_id,)reader=MapReader(param=this_param,ncbi_tax_id=ncbi_tax_id,load_a_to_b=load_a_to_b,load_b_to_a=load_b_to_a,uniprots=None,lifetime=300,resource_id_types=service_ids,)self.tables[tbl_key]=getattr(reader,'mapping_table_%s_to_%s'%(reader.id_type_side(tbl_key.id_type),reader.id_type_side(tbl_key.target_id_type),))tbl=check_loaded()iftbl:breakiftblisNoneandid_type=='genesymbol5':self.load_genesymbol5(ncbi_tax_id=ncbi_tax_id)tbl=check_loaded()iftblisNone:if(settings.get('mapping_uniprot_static')andid_typeinself.uniprot_static_namesandtarget_id_type=='uniprot'):self.load_uniprot_static([id_type])tbl=check_loaded()iftblisNone:self._log('Could not find suitable ID translation table 'f'between id types `{id_type}` and `{target_id_type}` 'f'for organism `{ncbi_tax_id}`.')ifhasattr(tbl,'_used'):tbl._used()returntbl
[docs]@staticmethoddefreverse_mapping(mapping_table):""" Creates an opposite direction `MappingTable` by swapping the dictionary inside an existing `MappingTable` object. Args mapping_table (MappingTable): A `MappingTable` object. Returns A new `MappingTable` object. """rev_data=common.swap_dict(mapping_table.data)returnMappingTable(data=rev_data,id_type=mapping_table.target_id_type,target_id_type=mapping_table.id_type,ncbi_tax_id=mapping_table.ncbi_tax_id,lifetime=mapping_table.lifetime,)
[docs]defreverse_key(self,key):""" For a mapping table key returns a new key with the identifiers reversed. Args key (tuple): A mapping table key. Returns A tuple representing a mapping table key, identifiers swapped. """return(self.get_table_key(id_type=key.target_id_type,target_id_type=key.id_type,ncbi_tax_id=key.ncbi_tax_id,))
[docs]defcreate_reverse(self,key):""" Creates a mapping table with ``id_type`` and ``target_id_type`` (i.e. direction of the ID translation) swapped. """table=self.tables[key]rev_key=self.reverse_key(key)self.tables[rev_key]=self.reverse_mapping(table)
[docs]defmap_name0(self,name,id_type=None,target_id_type=None,ncbi_tax_id=None,strict=False,expand_complexes=None,uniprot_cleanup=None,):""" Translates the name and returns only one of the resulted IDs. It means in case of ambiguous ID translation, a random one of them will be picked and returned. Recommended to use only if the translation between the given ID types is mostly unambigous and the loss of information can be ignored. See more details at `map_name`. """names=self.map_name(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,expand_complexes=expand_complexes,uniprot_cleanup=uniprot_cleanup,)returnlist(names)[0]ifnameselseNone
[docs]@common.ignore_unhashable@functools.lru_cache(maxsize=int(1e5))defmap_name(self,name,id_type=None,target_id_type=None,ncbi_tax_id=None,strict=False,expand_complexes=True,uniprot_cleanup=True,):""" Translates one instance of one ID type to a different one. Returns set of the target ID type. This function should be used to convert individual IDs. It takes care about everything and ideally you don't need to think on the details. How does it work: looks up dictionaries between the original and target ID type, if doesn't find, attempts to load from the predefined inputs. If the original name is genesymbol, first it looks up among the preferred gene names from UniProt, if not found, it takes an attempt with the alternative gene names. If the gene symbol still couldn't be found, and strict = False, the last attempt only the first 5 characters of the gene symbol matched. If the target name type is uniprot, then it converts all the ACs to primary. Then, for the Trembl IDs it looks up the preferred gene names, and find Swissprot IDs with the same preferred gene name. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID [#]) - refseqp (NCBI RefSeq Protein ID [NP_|XP_*]) - ensp (Ensembl protein ID [ENSP*]) - enst (Ensembl transcript ID [ENST*]) - ensg (Ensembl genomic DNA ID [ENSG*]) - hgnc (HGNC ID [HGNC:#]) - gi (GI number [#]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """ifnotname:returnset()ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_id# we support translating from more name types# at the same timeifisinstance(id_type,(list,set,tuple)):returnset.union(*(self.map_name(name=name,id_type=this_id_type,target_id_type=target_id_type,strict=strict,ncbi_tax_id=ncbi_tax_id,)forthis_id_typeinid_type))# complexesifhasattr(name,'components'):ifexpand_complexes:returnset(name.components.keys())else:return{name}# translating from an ID type to the same ID type?elifid_type==target_id_type:iftarget_id_type!='uniprot'ornotuniprot_cleanup:# no need for translationreturn{name}else:# we still try to search the primary UniProtmapped_names={name}# actual translation comes hereelifid_type.startswith('refseq'):# RefSeq is specialmapped_names=self._map_refseq(refseq=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,)elifid_type=='ensp':mapped_names=self._map_ensp(ensp=name,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)eliftarget_id_type=='ensp':mapped_names=self._map_to_ensp(name=name,id_type=id_type,ncbi_tax_id=ncbi_tax_id,)elif((id_typeininput_formats.ARRAY_MAPPINGandnottarget_id_type.startswith('ens'))or(target_id_typeininput_formats.ARRAY_MAPPINGandnotid_type.startswith('ens'))):# microarray probe IDs we are able to directly translate# only to and from Ensembl gene, transcript and protein IDs# if the other ID is different (such as uniprot), we translate# in two steps, via Ensembl peptide ID:mapped_names=self.chain_map(name=name,id_type=id_type,by_id_type='ensp',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,expand_complexes=expand_complexes,uniprot_cleanup=uniprot_cleanup,)else:# all the other ID typesmapped_names=self._map_name(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)# as ID translation tables for PRO IDs are not organism specific# we need an extra step to limit the results to the target organismifid_type=='pro'andtarget_id_type=='uniprot':mapped_names=(mapped_names&reflists.get_reflist(id_type='uniprot',ncbi_tax_id=ncbi_tax_id,))# by default the uniprot-genesymbol tables contain only SwissProtifid_type=='uniprot'andtarget_id_type=='genesymbol':mapped_names=self._map_name(name=name,id_type='trembl',target_id_type='genesymbol',ncbi_tax_id=ncbi_tax_id,)ifnotmapped_names:uniprots=self._map_name(name=name,id_type='uniprot-sec',target_id_type='uniprot-pri',ncbi_tax_id=ncbi_tax_id,)ifuniprots:mapped_names=self.map_names(names=uniprots,id_type='uniprot',target_id_type='genesymbol',ncbi_tax_id=ncbi_tax_id,)# further attempts to set it right if# first attempt was not successful# for miRNAs if the translation from mature miRNA name failed# we still try if maybe it is a hairpin name# or the other way aroundifnotmapped_namesandid_typein{'mir-mat-name','mir-name'}:forid_type0,id_type1,target_id_type0,target_id_type1in(('mir-name','mir-mat-name','mir-pre','mirbase'),('mir-mat-name','mir-name','mirbase','mir-pre'),):ifid_type==id_type0:mapped_names=self._map_name(name=name,id_type=id_type1,target_id_type=target_id_type1,ncbi_tax_id=ncbi_tax_id,)ifmapped_namesandtarget_id_type==target_id_type0:mapped_names=self.map_names(names=mapped_names,id_type=target_id_type1,target_id_type=target_id_type0,ncbi_tax_id=ncbi_tax_id,)ifmapped_names:break# for genesymbol, we automatically try 2 steps mapping via uniprotif(notmapped_namesand(id_type=='genesymbol'ortarget_id_type=='genesymbol')andid_typenotinUNIPROT_ID_TYPESandtarget_id_typenotinUNIPROT_ID_TYPES):mapped_names=self.chain_map(name=name,id_type=id_type,by_id_type='uniprot',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifnotmapped_names:# maybe it should be all uppercase (e.g. human gene symbols)?mapped_names=self._map_name(name=name.upper(),id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)if(notmapped_namesandid_typenotin{'uniprot','trembl','uniprot-sec'}):# maybe should be capitalized (e.g. rodent gene symbols)?mapped_names=self._map_name(name=name.capitalize(),id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)if(notmapped_namesandid_typenotin{'uniprot','trembl','uniprot-sec'}):# maybe it should be all lowercase?mapped_names=self._map_name(name=name.lower(),id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)if(notmapped_namesandid_type.startswith('ens')and'.'inname):# trying to split the part after the dot:mapped_names=self._map_name(name=name.upper().split('.')[0],id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)if(notmapped_namesand':'inname):# trying to remove the prefix which sometimes# shows the ID type, e.g. CHEBI:4956 should become 4956mapped_names=self._map_name(name=common.remove_prefix(name,':'),id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)# if a gene symbol could not be translated by the default# conversion table, containing only the primary gene symbols# in next step we try the secondary (synonym) gene symbolsif(notmapped_namesandid_type=='genesymbol'):mapped_names=self._map_name(name=name,id_type='genesymbol-syn',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)# for gene symbols we might try one more thing,# sometimes the source gene symbol missing some isoform# information or number because it refers to the first# or all isoforms or subtypes; or the opposite: the# original resource contains a gene symbol with a number# appended which is not part of the official primary# gene symbol## here we try to translate by adding a number `1` or# by matching only the first few letters;# obviously we can not exclude mistranslation here## by setting `strict = True` this step is disabledifnotstrictandnotmapped_names:mapped_names=self._map_name(name='%s1'%name,id_type='genesymbol',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifnotmapped_namesandtarget_id_type=='uniprot':mapped_names=self._map_name(name=name,id_type='genesymbol5',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)# for UniProt IDs we do a few more steps to# try to find out the primary SwissProt IDifuniprot_cleanupandtarget_id_type=='uniprot':mapped_names=self.uniprot_cleanup(uniprots=mapped_names,ncbi_tax_id=ncbi_tax_id,)returnmapped_names
[docs]defuniprot_cleanup(self,uniprots,ncbi_tax_id=None):""" We use this function as a standard callback when the target ID type is UniProt. It checks if the format of the IDs are correct, if they are part of the organism proteome, attempts to translate secondary and deleted IDs to their primary, recent counterparts. Args uniprots (str,set): One or more UniProt IDs. ncbi_tax_id (int): The NCBI Taxonomy identifier of the organism. Returns Set of checked and potentially translated UniProt iDs. Elements which do not fit the criteria will be discarded. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_iduniprots=common.to_set(uniprots)# step 1: translate secondary IDs to primaryuniprots=self.primary_uniprot(uniprots)# step 2: translate TrEMBL to SwissProt by gene symbolsifself._trembl_swissprot_by_genesymbol:uniprots=self.trembl_swissprot(uniprots,ncbi_tax_id=ncbi_tax_id,)# step 3: translate deleted IDs by gene symbolsifself._translate_deleted_uniprot:uniprots=self.translate_deleted_uniprots_by_genesymbol(uniprots)# step 4: check if the IDs exist in the proteome of the organismifnotself._keep_invalid_uniprot:uniprots=self.only_valid_uniprots(uniprots,ncbi_tax_id=ncbi_tax_id,)# step 5: ensure the format validityuniprots=self.only_uniprot_ac(uniprots)returnuniprots
[docs]defmap_names(self,names,id_type=None,target_id_type=None,ncbi_tax_id=None,strict=False,expand_complexes=True,uniprot_cleanup=True,):""" Same as ``map_name`` but translates multiple IDs at once. These two functions could be seamlessly implemented as one, still I created separate functions to always make it explicit if a set of translated IDs come from multiple original IDs. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID [#]) - refseqp (NCBI RefSeq Protein ID [NP_*|XP_*]) - ensp (Ensembl protein ID [ENSP*]) - enst (Ensembl transcript ID [ENST*]) - ensg (Ensembl genomic DNA ID [ENSG*]) - hgnc (HGNC ID [HGNC:#]) - gi (GI number [#]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """returnset.union(*(self.map_name(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,)fornameinnames))ifnameselseset()
[docs]defchain_map(self,name,id_type,by_id_type,target_id_type,ncbi_tax_id=None,**kwargs):""" Translate IDs which can not be directly translated in two steps: from `id_type` to `via_id_type` and from there to `target_id_type`. Args name (str): The original name to be converted. id_type (str): The type of the name. by_id_type (str): The intermediate name type. target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): The NCBI Taxonomy identifier of the organism. kwargs: Passed to `map_name`. Returns Set of IDs of type `target_id_type`. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idmapped_names=self.map_names(names=self.map_name(name=name,id_type=id_type,target_id_type=by_id_type,ncbi_tax_id=ncbi_tax_id,**kwargs),id_type=by_id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,**kwargs)returnmapped_names
def_map_refseq(self,refseq,id_type,target_id_type,ncbi_tax_id=None,strict=False,):""" ID translation adapted to the specialities of RefSeq IDs. """mapped_names=set()ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_id# try first as it ismapped_names=self._map_name(name=refseq,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)# then with the number at the end removed# this is disabled if `strict = True`ifnotmapped_namesandnotstrict:mapped_names=self._map_name(name=refseq.split('.')[0],id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifnotmapped_namesandnotstrict:rstem=refseq.split('.')[0]# try some other numbers# this risky and is disabled if `strict = True`forninxrange(49):mapped_names.update(self._map_name(name='%s.%u'%(rstem,n),id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,))returnmapped_namesdef_map_ensp(self,ensp,target_id_type,ncbi_tax_id=None,):""" Special ID translation from ENSP (Ensembl peptide IDs). """mapped_names=set()ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_id# try first UniProt ID Mapping# then Ensembl BioMartforid_typein('ensp','ensp_biomart'):ifnotmapped_names:mapped_names=self._map_name(name=ensp,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifnotmapped_names:tax_ensp='%u.%s'%(ncbi_tax_id,ensp)# this uses UniProt ID Mapping with STRING ID typemapped_names=self._map_name(name=tax_ensp,id_type='ensp_string',target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)returnmapped_namesdef_map_to_ensp(self,name,id_type,ncbi_tax_id=None,):""" Special ID translation to ENSP (Ensembl peptide IDs). """mapped_names=set()ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_id# try first UniProt ID Mapping# then Ensembl BioMartfortarget_id_typein('ensp','ensp_biomart'):ifnotmapped_names:mapped_names=self._map_name(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifnotmapped_names:# this uses UniProt ID Mapping with STRING typemapped_names=self._map_name(name=name,id_type=id_type,target_id_type='ensp_string',ncbi_tax_id=ncbi_tax_id,)mapped_names={n.split('.')[-1]forninmapped_names}returnmapped_namesdef_map_name(self,name,id_type,target_id_type,ncbi_tax_id=None,):""" Once we have defined the name type and the target name type, this function looks it up in the most suitable dictionary. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idtbl=self.which_table(id_type,target_id_type,ncbi_tax_id=ncbi_tax_id,)returntbl[name]iftblelseset()
[docs]deftranslation_dict(self,id_type:str,target_id_type:str,ncbi_tax_id:int|None=None,)->MappingTable|None:""" Translation table as a dict. """returnself.which_table(id_type,target_id_type,ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_id,)
[docs]deftranslation_df(self,id_type:str,target_id_type:str,ncbi_tax_id:int|None=None,)->pd.DataFrame|None:""" Translation table as a data frame. """tbl=self.translation_dict(id_type,target_id_type,ncbi_tax_id)iftbl:returnpd.DataFrame(((source_id,target_id)forsource_id,target_idsintbl.data.items()fortarget_idintarget_ids),columns=[id_type,target_id_type],)
## ID specific translation methods#
[docs]deflabel(self,name,entity_type=None,id_type=None,ncbi_tax_id=None,):""" For any kind of entity, either protein, miRNA or protein complex, returns the preferred human readable label. For proteins this means Gene Symbols, for miRNAs miRNA names, for complexes a series of Gene Symbols. """ifisinstance(name,_const.LIST_LIKE):return[self.label(_name,entity_type=entity_type,id_type=id_type,ncbi_tax_id=ncbi_tax_id,)for_nameinname]elifhasattr(name,'genesymbol_str'):returnname.genesymbol_strelifisinstance(name,str):ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_identity_type=(entity_typeor('small_molecule'ifncbi_tax_id==_const.NOT_ORGANISM_SPECIFICelse'protein'))ifname.startswith('MIMAT'):returnmap_name0(name,id_typeor'mirbase','mir-mat-name',ncbi_tax_id=ncbi_tax_id,)ornameelifname.startswith('MI'):returnself.map_name0(name,id_typeor'mir-pre','mir-name',ncbi_tax_id=ncbi_tax_id,)ornameelifentity_typeinself.default_label_types:id_type=id_typeorself.default_name_types[entity_type]target_id_type=self.default_label_types[entity_type]returnself.map_name0(name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ornameelse:returnself.map_name0(name,id_typeor'uniprot','genesymbol',ncbi_tax_id=ncbi_tax_id,)ornameelse:returnstr(name)
[docs]defidentifier(self,label:Union[str,Iterable[str]],ncbi_tax_id:Optional[int]=None,id_type:Optional[str]=None,entity_type:Optional[Literal['drug','lncrna','mirna','protein','small_molecule',]]=None,)->Union[Set[str],List[Set[str]]]:""" For a label returns the corresponding primary identifier. The type of default identifiers is determined by the settings module. Note, this kind of translation is not always unambigous, one gene symbol might correspond to multiple UniProt IDs. """ifnotcommon.is_str(label):return[self.identifier(_label,entity_type=entity_type,id_type=id_type,ncbi_tax_id=ncbi_tax_id,)for_labelinlabel]elifhasattr(label,'components'):returnlabel.__str__()elifcommon.is_str(label):ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_identity_type=(entity_typeor('small_molecule'ifncbi_tax_id==_const.NOT_ORGANISM_SPECIFICelse'protein'))id_type=(id_typeorsettings.get('default_label_types')[entity_type])target_id_type=settings.get('default_name_types')[entity_type]returnself.map_name(label,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)else:returnstr(name)
[docs]defguess_type(self,name,entity_type=None):""" From a string, tries to guess the ID type and optionally the entity type. Returns a tuple of strings: ID type and entity type. """if((notentity_typeorentity_type=='protein')andself.reuniprot.match(name)):return'uniprot','protein'ifnotentity_typeorentity_type=='mirna':ifself.remipreac.match(name):return'mir-pre','mirna'ifself.remimatac.match(name):return'mirbase','mirna'ifself.remimatid.match(name):return'mir-mat-name','mirna'ifself.remipreid.match(name):return'mir-name','mirna'returnNone,entity_type
[docs]defprimary_uniprot(self,uniprots,ncbi_tax_id=None):""" For an iterable of UniProt IDs returns a set with the secondary IDs changed to the corresponding primary IDs. Anything what is not a secondary UniProt ID left intact. """primaries=set()ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idforuniprotinuniprots:primary=self.map_name(name=uniprot,id_type='uniprot-sec',target_id_type='uniprot-pri',ncbi_tax_id=ncbi_tax_id,)ifprimary:primaries.update(primary)else:# most probably this UniProt is already primaryprimaries.add(uniprot)returnprimaries
[docs]deftrembl_swissprot(self,uniprots,ncbi_tax_id=None):""" For an iterable of TrEMBL and SwissProt IDs, returns a set with only SwissProt, mapping from TrEMBL to gene symbols, and then back to SwissProt. If this kind of translation is not successful for any of the IDs it will be kept in the result, no matter if it's not a SwissProt ID. If the """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idswissprots=set()foruniprotinuniprots:swissprot=Nonegenesymbols=self.map_name(name=uniprot,id_type='trembl',target_id_type='genesymbol',ncbi_tax_id=ncbi_tax_id,)this_swissprots=self.map_names(names=genesymbols,id_type='genesymbol',target_id_type='swissprot',ncbi_tax_id=ncbi_tax_id,)ifnotthis_swissprots:swissprots.add(uniprot)else:swissprots.update(this_swissprots)returnswissprots
[docs]deftranslate_deleted_uniprot_by_genesymbol(self,uniprot,ncbi_tax_id=None,):""" Due to potentially ambiguous translation always returns set. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idifuniprot_db.is_uniprot(uniprot,organism=ncbi_tax_id):return{uniprot}elifself.other_organism_uniprot(uniprot,ncbi_tax_id=ncbi_tax_id):returnset()else:genesymbol,taxid=self.deleted_uniprot_genesymbol(uniprot)ifgenesymbolandtaxid==ncbi_tax_id:returnself.map_name(genesymbol,'genesymbol','uniprot',ncbi_tax_id=ncbi_tax_id,uniprot_cleanup=False,)return{uniprot}
[docs]defother_organism_uniprot(self,uniprot,ncbi_tax_id=None):""" Tells if ``uniprot`` is an UniProt ID from some other organism than ``ncbi_tax_id``. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_iduniprot_taxid=taxonomy.uniprot_taxid(uniprot)returnuniprot_taxidanduniprot_taxid!=ncbi_tax_id
[docs]defvalid_uniprot(self,uniprot,ncbi_tax_id=None):""" If the UniProt ID ``uniprot`` exist in the proteome of the organism ``ncbi_tax_id`` returns the ID, otherwise returns None. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idifuniprot_db.is_uniprot(uniprot,organism=ncbi_tax_id):returnuniprot
[docs]defonly_uniprot_ac(self,uniprots):""" For one or more strings returns only those which match the format of UniProt accession numbers. The format is defined here: https://www.uniprot.org/help/accession_numbers If string provided, returns string or None. If iterable provided, returns set (potentially empty if none of the strings are valid). """ifisinstance(uniprots,str):returnself._only_uniprot_ac(uniprots)else:return{validatedforvalidatedin(self._only_uniprot_ac(uniprot)foruniprotinuniprots)ifvalidated}
[docs]@staticmethoddefmapping_tables():""" List of mapping tables available to load. Returns (list): A list of tuples, each representing an ID translation table, with the ID types, the data source and the loader class. """MappingTableDefinition=collections.namedtuple('MappingTableDefinition',('id_type_a','id_type_b','resource','input_class','resource_id_type_a','resource_id_type_b',),)MappingTableDefinition.__new__.__defaults__=(None,None)result=[]forresource_attrinRESOURCES_EXPLICIT:resources=getattr(maps,resource_attr)for(id_type_a,id_type_b),inputdefiniteritems(resources):result.append(MappingTableDefinition(id_type_a=id_type_a,id_type_b=id_type_b,resource=resource_attr,input_class=inputdef.__class__.__name__,resource_id_type_a=inputdef._resource_id_type_a,resource_id_type_b=inputdef._resource_id_type_b,))forservice_ids,service_id_type,input_clsinRESOURCES_IMPLICIT:service_ids=(iteritems(service_ids)ifisinstance(service_ids,dict)elsezip(*(service_ids,)*2))forid_type,resource_id_typeinservice_ids:id_type_b='pro'ifservice_id_type=='pro'elseNoneresult.append(MappingTableDefinition(id_type_a=id_type,id_type_b=id_type_b,resource=service_id_type,input_class=input_cls.__name__,resource_id_type_a=resource_id_type,resource_id_type_b=None,))returnresult
[docs]@classmethoddefid_types(cls):""" A list of all identifier types that can be handled by any of the resources. Returns (list): A list of tuples with the identifier type labels used in pypath and in the original resource. If the latter is None, typically the ID type has no name in the original resource. """IdType=collections.namedtuple('IdType',('pypath','original',),)return{IdType(pypath=getattr(mapdef,'id_type_%s'%side),original=getattr(mapdef,'resource_id_type_%s'%side),)formapdefincls.mapping_tables()forsidein('a','b')ifgetattr(mapdef,'id_type_%s'%side)}
[docs]defhas_mapping_table(self,id_type,target_id_type,ncbi_tax_id=None,):""" Tells if a mapping table is loaded. If it's loaded, it resets the expiry timer so the table remains loaded. Returns (bool): True if the mapping table is loaded. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idkey=self.get_table_key(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)ifkeyinself.tables:self.tables[key]._used()returnkeyinself.tables
[docs]defload_mapping(self,resource,**kwargs):""" Loads a single mapping table based on input definition in ``resource``. ``**kwargs`` passed to ``MapReader``. """if(resource.typein{'file','pickle'}andnot(os.path.exists(resource.input)orinputs.get_method(resource.input))):self._log('Could not load mapping: no such ''file or function: `%s`.'%resource.input)returnncbi_tax_id=kwargs.get('ncbi_tax_id',resource.ncbi_tax_id)self._log('Loading mapping table for organism `%s` ''with identifiers `%s` and `%s`, ''input type `%s`'%(ncbi_tax_id,resource.id_type_a,resource.id_type_b,resource.type,))reader=MapReader(param=resource,**kwargs)a_to_b=reader.mapping_table_a_to_bb_to_a=reader.mapping_table_b_to_aforsidesin(('a','b'),('b','a')):table=locals()['%s_to_%s'%sides]iftable:self._log('Sucessfully loaded mapping table for organism `%s` ''with identifiers `%s` to `%s`.'%(str(ncbi_tax_id),getattr(resource,f'id_type_{sides[0]}'),getattr(resource,f'id_type_{sides[1]}'),))self.tables[table.get_key()]=table
[docs]defswissprots(self,uniprots,ncbi_tax_id=None):""" Creates a dict translating a set of potentially secondary and non-reviewed UniProt IDs to primary SwissProt IDs (whenever is possible). """swissprots={}foruniprotinuniprots:swissprots[uniprot]=self.map_name(name=uniprot,id_type='uniprot',target_id_type='uniprot',ncbi_tax_id=ncbi_tax_id,)returnswissprots
[docs]defload_genesymbol5(self,ncbi_tax_id=None):""" Creates a Gene Symbol to UniProt mapping table with the first 5 characters of each Gene Symbol. """ncbi_tax_id=ncbi_tax_idorself.ncbi_tax_idgenesymbol_table=self.which_table(id_type='genesymbol',target_id_type='uniprot',ncbi_tax_id=ncbi_tax_id,)genesymbol_syn_table=self.which_table(id_type='genesymbol-syn',target_id_type='uniprot',ncbi_tax_id=ncbi_tax_id,)genesymbol5_data=collections.defaultdict(set)fortablein(genesymbol_table,genesymbol_syn_table):forgenesymbol,uniprotsiniteritems(table.data):iflen(genesymbol)>=5:genesymbol5=genesymbol[:5]genesymbol5_data[genesymbol5].update(uniprots)mapping_table=MappingTable(data=genesymbol5_data,id_type='genesymbol5',target_id_type='uniprot',ncbi_tax_id=ncbi_tax_id,)self.tables[mapping_table.get_key()]=mapping_table
[docs]defload_uniprot_static(self,keys,ncbi_tax_id=None,):""" Loads mapping tables from the huge static mapping file from UniProt. Takes long to download and process, also requires more memory. This is the last thing we try if everything else failed. """cachedir=cache_mod.get_cachedir()data=dict((key,collections.defaultdict(set))forkeyinkeys)cache_files={}to_load=set()id_type_b='uniprot'# attempting to load them from Pickleforkeyinkeys:mapping_id=common.md5(json.dumps((key,'uniprot_static',)))cachefile=os.path.join(cachedir,mapping_id)cache_files[key]=cachefileifos.path.exists(cachefile):withopen(cachefile,'rb')asfp:data[key]=pickle.load(fp)else:to_load.add(key)# loading the remaining from the big UniProt mapping file:ifto_load:url=urls.urls['uniprot_idmap_ftp']['url']c=curl.Curl(url,silent=False,large=True)prg=progress.Progress(c.size,'Processing ID conversion list',99,)forlineinc.result:prg.step(len(line))line=common.decode(line,'ascii').strip().split('\t')iflen(line)>2andline[1]inself.names_uniprot_static:id_type_a=self.names_uniprot_static[line[1]]key_a_to_b=MappingTableKey(id_type=id_type_a,target_id_type=id_type_b,ncbi_tax_id=ncbi_tax_id,)key_b_to_a=MappingTableKey(id_type=id_type_b,target_id_type=id_type_a,ncbi_tax_id=ncbi_tax_id,)this_uniprot=line[0].split('-')[0]ifkey_a_to_binto_load:data[key_a_to_b][line[2]].add(this_uniprot)ifkey_b_to_ainto_load:data[key_b_to_a][this_uniprot].add(line[2])prg.terminate()forkey,this_datainiteritems(data):pickle.dump(this_data,open(cache_files[key],'wb'))forkey,this_datainiteritems(data):table=MappingTable(data=this_data,id_type=key,target_id_type=id_type_b,ncbi_tax_id=ncbi_tax_id,lifetime=600,)self.tables[key]=table
[docs]defremove_table(self,id_type,target_id_type,ncbi_tax_id):""" Removes the table defined by the ID types and organism. """key=MappingTableKey(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)self.remove_key(key)
[docs]defremove_key(self,key):""" Removes the table with key ``key`` if exists. """ifkeyinself.tables:ifkeyandlen(key)==3:self._log('Removing mapping table `%s` ''to `%s` for organism `%u`.'%key)delself.tables[key]
[docs]defremove_expired(self):""" Removes tables last used a longer time ago than their lifetime. """to_remove=set()forkey,tableiniteritems(self.tables):ifnottableortable._expired():to_remove.add(key)forkeyinto_remove:self.remove_key(key)
[docs]definit(**kwargs):""" Create a new `Mapper` instance under the `mapper` attribute of this module. Returns None. """if'mapper'inglobals():globals()['mapper'].__del__()globals()['mapper']=Mapper(**kwargs)
[docs]defget_mapper(**kwargs):""" The module under its `mapper` attribute has an instance of the `Mapper` object, which manages the ID translations. This function creates the instance if does not exist and returns it. Returns A Mapper object. """if'mapper'notinglobals():init(**kwargs)returnglobals()['mapper']
[docs]defmap_name(name,id_type,target_id_type,ncbi_tax_id=None,strict=False,expand_complexes=True,uniprot_cleanup=True,):""" Translates one instance of one ID type to a different one. Returns set of the target ID type. This function should be used to convert individual IDs. It takes care about everything and ideally you don't need to think on the details. How does it work: looks up dictionaries between the original and target ID type, if doesn't find, attempts to load from the predefined inputs. If the original name is genesymbol, first it looks up among the preferred gene names from UniProt, if not found, it takes an attempt with the alternative gene names. If the gene symbol still couldn't be found, and strict = False, the last attempt only the first 5 characters of the gene symbol matched. If the target name type is uniprot, then it converts all the ACs to primary. Then, for the Trembl IDs it looks up the preferred gene names, and find Swissprot IDs with the same preferred gene name. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID [#]) - refseqp (NCBI RefSeq Protein ID [NP_*|XP_*]) - ensp (Ensembl protein ID [ENSP*]) - enst (Ensembl transcript ID [ENST*]) - ensg (Ensembl genomic DNA ID [ENSG*]) - hgnc (HGNC ID [HGNC:#]) - gi (GI number [#]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """mapper=get_mapper()returnmapper.map_name(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,expand_complexes=expand_complexes,uniprot_cleanup=uniprot_cleanup,)
[docs]defmap_name0(name,id_type,target_id_type,ncbi_tax_id=None,strict=False,expand_complexes=True,uniprot_cleanup=True,):""" Translates the name and returns only one of the resulted IDs. It means in case of ambiguous ID translation, a random one of them will be picked and returned. Recommended to use only if the translation between the given ID types is mostly unambigous and the loss of information can be ignored. See more details at `map_name`. """mapper=get_mapper()returnmapper.map_name0(name=name,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,expand_complexes=expand_complexes,uniprot_cleanup=uniprot_cleanup,)
[docs]defmap_names(names,id_type=None,target_id_type=None,ncbi_tax_id=None,strict=False,expand_complexes=True,uniprot_cleanup=True,):""" Same as ``map_name`` but translates multiple IDs at once. These two functions could be seamlessly implemented as one, still I created separate functions to always make it explicit if a set of translated IDs come from multiple original IDs. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID [#]) - refseqp (NCBI RefSeq Protein ID [NP_*|XP_*]) - ensp (Ensembl protein ID [ENSP*]) - enst (Ensembl transcript ID [ENST*]) - ensg (Ensembl genomic DNA ID [ENSG*]) - hgnc (HGNC ID [HGNC:#]) - gi (GI number [#]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `Mapper.uniprot_cleanup` function at the end. """mapper=get_mapper()returnmapper.map_names(names=names,id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,strict=strict,expand_complexes=expand_complexes,uniprot_cleanup=uniprot_cleanup,)
[docs]deflabel(name,id_type=None,entity_type=None,ncbi_tax_id=9606):""" For any kind of entity, either protein, miRNA or protein complex, returns the preferred human readable label. For proteins this means Gene Symbols, for miRNAs miRNA names, for complexes a series of Gene Symbols. """mapper=get_mapper()returnmapper.label(name=name,id_type=id_type,entity_type=entity_type,ncbi_tax_id=ncbi_tax_id,)
[docs]defguess_type(name,entity_type=None):""" From a string, tries to guess the ID type and optionally the entity type. Returns a tuple of strings: ID type and entity type. """mapper=get_mapper()returnmapper.guess_type(name=name,entity_type=entity_type)
[docs]defid_from_label(label,label_id_type='genesymbol',ncbi_tax_id=None):""" For a label (e.g. Gene Symbol) returns the corresponding IDs (e.g. UniProt IDs). """mapper=get_mapper()returnmapper.id_from_label(label=label,label_id_type=label_id_type,ncbi_tax_id=ncbi_tax_id,)
[docs]defid_from_label0(label,label_id_type='genesymbol',ncbi_tax_id=None):""" For a label (e.g. Gene Symbol) returns a single ID (e.g. UniProt IDs). """mapper=get_mapper()returnmapper.id_from_label0(label=label,label_id_type=label_id_type,ncbi_tax_id=ncbi_tax_id,)
[docs]deftranslation_dict(id_type:str,target_id_type:str,ncbi_tax_id:int|None=None,)->MappingTable|None:""" Identifier translation table as a dict of sets. """mapper=get_mapper()returnmapper.translation_dict(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)
[docs]deftranslation_df(id_type:str,target_id_type:str,ncbi_tax_id:int|None=None,)->MappingTable|None:""" Identifier translation table as a `pandas.DataFrame`. """mapper=get_mapper()returnmapper.translation_df(id_type=id_type,target_id_type=target_id_type,ncbi_tax_id=ncbi_tax_id,)
[docs]defmapping_tables()->list[MappingTableDefinition]:""" A list of built-in mapping tables. If `id_type_b` is `None`, that means translation to all other ID types provided by the same resource is possible. """returnget_mapper().mapping_tables()
[docs]defid_types()->list[IdType]:""" Identifier types with their labels. """returnget_mapper().id_types()