Source code for pypath.utils.mapping

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from __future__ import annotations

"""
Performs mapping between IDs of different consensus systems for
proteins and genes, miRNAs, and chemical compounds.

"""

from future.utils import iteritems
from past.builtins import xrange, range

import os
import sys
import math
import re
import importlib as imp
import collections
import functools
import datetime
import time

import urllib

if not hasattr(urllib, 'urlencode'):

    import urllib.parse
    _urllib = urllib
    urllib = _urllib.parse

import json
try:
    import cPickle as pickle
except:
    import pickle

from typing import Iterable, List, Literal, Optional, Set, Union

import pandas as pd
import timeloop

# from pypath:
import pypath.share.progress as progress
import pypath.share.common as common
import pypath_common._constants as _const
import pypath.share.cache as cache_mod
import pypath.internals.maps as maps
import pypath.resources.urls as urls
import pypath.share.curl as curl
import pypath.inputs as inputs
import pypath.inputs.uniprot as uniprot_input
import pypath.inputs.uniprot_db as uniprot_db
import pypath.inputs.pro as pro_input
import pypath.inputs.biomart as biomart_input
import pypath.inputs.unichem as unichem_input
import pypath.inputs.ramp as ramp_input
import pypath.inputs.hmdb as hmdb_input
import pypath.internals.input_formats as input_formats
import pypath.utils.reflists as reflists
import pypath.utils.taxonomy as taxonomy
import pypath.share.settings as settings
import pypath.share.session as session_mod
_logger = session_mod.log()


__all__ = ['MapReader', 'MappingTable', 'Mapper']

_logger = session_mod.Logger(name = 'mapping')
_log = _logger._log

try:
    UNICHEM_NAME_TYPES = set(unichem_input.unichem_sources().values())
except Exception as e:
    exc = sys.exc_info()
    _log('Failed to retrieve UniChem ID types:')
    _logger._log_traceback()
    UNICHEM_NAME_TYPES = ()

RESOURCES_EXPLICIT = ('uniprot', 'basic', 'mirbase', 'ipi')

RESOURCES_IMPLICIT = (
    (
        input_formats.AC_MAPPING,
        'uniprot',
        input_formats.UniprotListMapping,
    ),
    (
        input_formats.PRO_MAPPING,
        'pro',
        input_formats.ProMapping,
    ),
    (
        input_formats.BIOMART_MAPPING,
        'biomart',
        input_formats.BiomartMapping,
    ),
    (
        input_formats.ARRAY_MAPPING,
        'array',
        input_formats.ArrayMapping,
    ),
    (
        {n: n for n in UNICHEM_NAME_TYPES},
        'unichem',
        input_formats.UnichemMapping,
    ),
    (
        dict(
            **{
                it: it
                for it in ramp_input.ramp_id_types_2('compound')
            },
            **input_formats.RAMP_MAPPING,
        ),
        'ramp',
        input_formats.RampMapping,
    ),
    (
        dict(
            **{
                it: it
                for it in hmdb_input.ID_FIELDS
            },
            **input_formats.HMDB_MAPPING,
        ),
        'hmdb',
        input_formats.HmdbMapping,
    ),
)

UNIPROT_ID_TYPES = {
    'uniprot',
    'trembl',
    'swissprot',
    'uniprot-pri',
    'uniprot-sec',
}

"""
Classes for reading and use serving ID mapping data from custom file,
function, UniProt, UniProt ID Mapping, Ensembl BioMart,
PRO (Protein Ontology), miRBase or pickle file.
"""

MappingTableKey = collections.namedtuple(
    'MappingTableKey',
    [
        'id_type',
        'target_id_type',
        'ncbi_tax_id',
    ],
)
MappingTableKey.__new__.__defaults__ = ('protein', 9606)


[docs] class MapReader(session_mod.Logger): """ Reads ID translation data and creates ``MappingTable`` instances. When initializing ID conversion tables for the first time data is downloaded from UniProt and read into dictionaries. It takes a couple of seconds. Data is saved to pickle dumps, this way later the tables load much faster. """
[docs] def __init__( self, param, ncbi_tax_id = None, entity_type = None, load_a_to_b = True, load_b_to_a = False, uniprots = None, lifetime = 300, resource_id_types = None, ): """ Args param (MappingInput): A mapping table definition, any child of the `internals.input_formats.MappingInput` class. ncbi_tax_id (int): NCBI Taxonomy identifier of the organism. entity_type (str): An optional, custom string showing the type of the entities, e.g. `protein`. This is not mandatory for the identification of mapping tables, hence the same name types can't be used for different entities. E.g. if both proteins and miRNAs have Entrez gene IDs then these should be different ID types (e.g. `entrez_protein` and `entrez_mirna`) or both protein and miRNA IDs can be loaded into one mapping table and simply called `entrez`. load_a_to_b (bool): Load the mapping table for translation from `id_type` to `target_id_type`. load_b_to_a (bool): Load the mapping table for translation from `target_id_type` to `id_type`. uniprots (set): UniProt IDs to query in case the source of the mapping table is the UniProt web service. lifetime (int): If this table has not been used for longer than this preiod it is to be removed at next cleanup. Time in seconds. Passed to ``MappingTable``. resource_id_types: Additional mappings between pypath and resource specific identifier type labels. """ session_mod.Logger.__init__(self, name = 'mapping') self.ncbi_tax_id = ( ncbi_tax_id or param.ncbi_tax_id or settings.get('default_organism') ) self._log( 'Reader created for ID translation table, parameters: ' '`ncbi_tax_id=%u, id_a=%s, id_b=%s, ' 'load_a_to_b=%u, load_b_to_a=%u, ' 'input_type=%s (%s)`.' % ( self.ncbi_tax_id, param.id_type_a, param.id_type_b, load_a_to_b, load_b_to_a, param.type, param.__class__.__name__, ) ) self.cachedir = cache_mod.get_cachedir() self.id_type_a = param.id_type_a self.id_type_b = param.id_type_b self.load_a_to_b = load_a_to_b self.load_b_to_a = load_b_to_a self.entity_type = entity_type self.source_type = param.type self.param = param self.lifetime = lifetime self.a_to_b = None self.b_to_a = None self.uniprots = uniprots self._resource_id_types = resource_id_types self.load()
def reload(self): modname = self.__class__.__module__ mod = __import__(modname, fromlist = [modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new)
[docs] def load(self): """ The complete process of loading mapping tables. First sets up the paths of the cache files, then loads the tables from the cache files or the original sources if necessary. Upon successful loading from an original source writes the results to cache files. """ self.use_cache = settings.get('mapping_use_cache') self.setup_cache() if self.use_cache: self.read_cache() if not self.tables_loaded(): # read from the original source self.read() if self.tables_loaded(): # write cache only at successful loading self.write_cache()
@property def mapping_table_a_to_b(self): """ Returns a ``MappingTable`` instance created from the already loaded data. """ return self._get_mapping_table('a', 'b') @property def mapping_table_b_to_a(self): """ Returns a ``MappingTable`` instance created from the already loaded data. """ return self._get_mapping_table('b', 'a')
[docs] def id_type_side(self, id_type): """ Tells if an ID type is on the "a" or "b" (source or target) side in the current mapping table definition. Args id_type (str): An ID type label. Returns Returns the string "a" if `id_type` is on the source side in the mapping table definition, "b" if it is on the target side, None if the `id_type` is not in the definition. """ return ( 'a' if id_type == self.id_type_a else 'b' if id_type == self.id_type_b else None )
def _get_mapping_table(self, *args): data = getattr(self, '%s_to_%s' % args) id_type = getattr(self, 'id_type_%s' % args[0]) target_id_type = getattr(self, 'id_type_%s' % args[1]) if isinstance(data, dict): return MappingTable( data = data, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = self.ncbi_tax_id, lifetime = self.lifetime, )
[docs] def tables_loaded(self): """ Tells if the requested tables have been created. """ return ( (bool(self.a_to_b) or not self.load_a_to_b) and (bool(self.b_to_a) or not self.load_b_to_a) )
[docs] def write_cache(self): """ Exports the ID translation data into pickle files. """ self._write_cache('a', 'b') self._write_cache('b', 'a')
def _write_cache(self, *args): data = getattr(self, '%s_to_%s' % args) if self._to_be_loaded(*args) and data: cachefile = self._attr('cachefile', *args) self._remove_cache_file(*args) pickle.dump(data, open(cachefile, 'wb'))
[docs] def read_cache(self): """ Reads the ID translation data from a previously saved pickle file. """ self._read_cache('a', 'b') self._read_cache('b', 'a')
def _read_cache(self, *args): if self._to_be_loaded(*args): cachefile = self._attr('cachefile', *args) if os.path.exists(cachefile): with open(cachefile, 'rb') as fp: from_cache = pickle.load(fp) setattr( self, '%s_to_%s' % args, from_cache, ) self._log( 'Loading `%s` to `%s` mapping table ' 'from pickle file `%s`.' % ( self.param.id_type_a, self.param.id_type_b, cachefile, ) ) def _to_be_loaded(self, *args): return self._attr('load', *args) def _attr(self, attr, *args): return getattr(self, self._attr_name(attr, *args)) @staticmethod def _attr_name(attr, *args): return '%s_%s_to_%s' % ((attr,) + args)
[docs] def read(self): """ Reads the ID translation data from the original source. """ method = 'read_mapping_%s' % self.source_type if hasattr(self, method): getattr(self, method)()
[docs] def setup_cache(self): """ Constructs the cache file path as md5 hash of the parameters. """ self._setup_cache('a', 'b') self._setup_cache('b', 'a')
def _setup_cache(self, *args): mapping_id_attr = self._attr_name('mapping_id', *args) cachefile_attr = self._attr_name('cachefile', *args) setattr( self, mapping_id_attr, self._get_mapping_id(*args), ) setattr( self, cachefile_attr, os.path.join(self.cachedir, getattr(self, mapping_id_attr)), ) def _get_mapping_id(self, *args): """ Returns an md5 checksum unambigously identifying the mapping table by the identifiers, the direction of translation, the organism and other parameters like, for example, the source URL. """ return common.md5( json.dumps( ( getattr(self, 'id_type_%s' % args[0]), getattr(self, 'id_type_%s' % args[1]), self.ncbi_tax_id, sorted(self.param.__dict__.items()) ) ) ) def _cache_files_exist(self): """ Checks if both cache files are either not necessary or exist. """ return ( self.cache_file_exists('a', 'b') and self.cache_file_exists('b', 'a') ) def _cache_file_exists(self, *args): """ Checks if a cache file is either not necessary or exists. """ return ( not self._attr('load', *args) or os.path.isfile(self._attr('cachefile', *args)) ) def _remove_cache_file(self, *args): cachefile = self._attr('cachefile', *args) if os.path.exists(cachefile): self._log('Removing mapping table cache file `%s`.' % cachefile) os.remove(cachefile)
[docs] def read_mapping_file(self): """ Reads a mapping table from a local file or a function. """ if not os.path.exists(self.param.input): method = inputs.get_method(self.param.input) if not method: return {} else: input_args = ( self.param.input_args if hasattr(self.param, 'input_args') else {} ) infile = method(**input_args) else: infile = open(self.param.input, encoding = 'utf-8', mode = 'r') total = os.path.getsize(self.param.input) a_to_b = collections.defaultdict(set) b_to_a = collections.defaultdict(set) for i, line in enumerate(infile): if self.param.header and i < self.param.header: continue if hasattr(line, 'decode'): line = line.decode('utf-8') if hasattr(line, 'rstrip'): line = line.rstrip().split(self.param.separator) if len(line) < max(self.param.col_a, self.param.col_b): continue id_a = line[self.param.col_a] id_b = line[self.param.col_b] if self.load_a_to_b: a_to_b[id_a].add(id_b) if self.load_b_to_a: b_to_a[id_b].add(id_a) if hasattr(infile, 'close'): infile.close() self.a_to_b = a_to_b if self.load_a_to_b else None self.b_to_a = b_to_a if self.load_b_to_a else None
@staticmethod def _uniprotkb_id_type(id_type: str) -> bool: return input_formats.UniprotListMapping._uniprotkb_id_type( id_type, )
[docs] def read_mapping_uniprot_list(self): """ Builds a mapping table by downloading data from UniProt's upload lists service. """ a_to_b = collections.defaultdict(set) b_to_a = collections.defaultdict(set) swap = False if not self.uniprots: self.set_uniprot_space() # We need a list to query this service, and we have method only for # getting a proteome wide list of UniProt IDs. If the translated # ID type is not UniProt, then first we need to translate the # proteome wide reference list from UniProt to the target ID type. if not self._uniprotkb_id_type(self.param.id_type_a): if self._uniprotkb_id_type(self.param.id_type_b): swap = True self.param.swap_sides() self.load_a_to_b, self.load_b_to_a = ( self.load_b_to_a, self.load_a_to_b, ) upload_ac_list = self.uniprots else: u_target = self._read_mapping_uniprot_list( uniprot_id_type_a = 'UniProtKB_AC-ID', uniprot_id_type_b = self.param.uniprot_id_type_a, ) upload_ac_list = [l.split('\t')[1].strip() for l in u_target] else: upload_ac_list = self.uniprots uniprot_data = self._read_mapping_uniprot_list( upload_ac_list = upload_ac_list, ) ens = ( self.param.id_type_a.startswith('ens') or self.param.id_type_b.startswith('ens') or 'ensembl' in self.param.id_type_a.lower() or 'ensembl' in self.param.id_type_b.lower() ) reens = re.compile(r'(ENS[A-Z]+\d+)\.\d+') for l in uniprot_data: if not l: continue if ens: l = reens.sub(r'\1', l) l = l.strip().split('\t') if self.load_a_to_b: a_to_b[l[0]].add(l[1]) if self.load_b_to_a: b_to_a[l[1]].add(l[0]) if swap: a_to_b, b_to_a = b_to_a, a_to_b self.load_a_to_b, self.load_b_to_a = ( self.load_b_to_a, self.load_a_to_b, ) self.param.swap_sides() self.a_to_b = a_to_b if self.load_a_to_b else None self.b_to_a = b_to_a if self.load_b_to_a else None
[docs] def set_uniprot_space(self, swissprot = None): """ Sets up a search space of UniProt IDs. Args swissprot (bool): Use only SwissProt IDs, not TrEMBL. True loads only SwissProt IDs, False only TrEMBL IDs, None loads both. """ swissprot = self.param.swissprot if swissprot is None else swissprot self.uniprots = uniprot_db.all_uniprots( self.ncbi_tax_id, swissprot = swissprot, )
def _read_mapping_uniprot_list( self, uniprot_id_type_a = None, uniprot_id_type_b = None, upload_ac_list = None, chunk_size = None, ): """ Reads a mapping table from UniProt "upload lists" service. Args uniprot_id_type_a (str): Source ID type label as used in UniProt. uniprot_id_type_b (str): Target ID type label as used in UniProt. upload_ac_list (list): The identifiers to use in the query to the ID Mapping service. By default the list of all UniProt IDs for the organism is used. chunk_size (int): Number of IDs in one query. Too large queries might fail, by default we include 100,000 IDs in one query. """ chunk_size = ( chunk_size or settings.get('uniprot_uploadlists_chunk_size') ) uniprot_id_type_a = uniprot_id_type_a or self.param.uniprot_id_type_a uniprot_id_type_b = uniprot_id_type_b or self.param.uniprot_id_type_b if not upload_ac_list: self._log( 'No identifiers provided, ' 'using all UniProt IDs of the organism.' ) upload_ac_list = self.uniprots upload_ac_list = sorted(upload_ac_list) self._log( 'Querying the UniProt ID Mapping service for ID translation ' 'data. Querying a list of %u IDs.' % len(upload_ac_list) ) run_url = urls.urls['uniprot_idmapping']['run'] poll_result = {} result = [] # loading data in chunks of 10,000 by default for i in range(math.ceil(len(upload_ac_list) / chunk_size)): this_chunk = upload_ac_list[i * chunk_size:(i + 1) * chunk_size] self._log( 'Request to UniProt ID Mapping, chunk #%u with %u IDs.' % ( i, len(this_chunk), ) ) post = { 'from': uniprot_id_type_a, 'to': uniprot_id_type_b, 'ids': ' '.join(sorted(this_chunk)), } accept_json = {'req_headers': ['Accept: application/json']} run_args = {'url': run_url, 'post': post} nocache = {'cache': False, 'large': False} large = {'silent': False, 'large': True} cache_path = curl.Curl.cache_path(**run_args) if not os.path.exists(cache_path): run_c = curl.Curl( **run_args, **nocache, **accept_json) if run_c.status != 200: raise RuntimeError( 'Failed to submit job to UniProt ID Mapping. ' 'See details in the log.' ) jobid = json.loads(run_c.result)['jobId'] self._log( f'Submitted job to UniProt ID Mapping, job ID: `{jobid}`.' ) timeout = settings.get('uniprot_idmapping_timeout') interval = settings.get('uniprot_idmapping_poll_interval') max_polls = math.ceil(timeout / interval) poll_url = urls.urls['uniprot_idmapping']['poll'] % jobid poll_args = {'url': poll_url} | nocache | accept_json for i in range(max_polls): self._log( f'Polling job UniProt ID Mapping job `{jobid}`, ' f'poll {i + 1} of {max_polls}.' ) poll_c = curl.Curl(**poll_args) if poll_c.status != 200: self._log(f'Poll failed with HTTP {poll_c.status}.') continue poll_result = json.loads(poll_c.result) if 'status' in poll_result or 'failedIds' in poll_result: self._log( f'UniProt ID Mapping job `{jobid}` ' 'successfully completed.' ) break elif 'messages' in poll_result: msg = ( 'UniProt ID Mapping job failed: ' + ' '.join(common.to_list(poll_result['messages'])) ) self._log(msg) raise RuntimeError(msg) time.sleep(interval) self._log( 'Getting UniProt ID Mapping results URL ' 'for job `{jobid}`.' ) det_url = urls.urls['uniprot_idmapping']['details'] % jobid det_c = curl.Curl(url = det_url, **nocache, **accept_json) result_url = ( json.loads(det_c.result)['redirectURL']. replace('/idmapping/results/', '/idmapping/stream/'). replace('/results/', '/results/stream/'). __add__('?format=tsv') ) self._log( 'Retrieving UniProt ID Mapping results ' f'from `{result_url}`.' ) with curl.cache_delete_on(): res_c = curl.Curl( url = result_url, cache = cache_path, **large ) else: res_c = curl.Curl(**run_args, **large) result.extend(list(res_c.fileobj)[1:]) return result
[docs] def read_mapping_uniprot(self): """ Downloads ID mappings directly from UniProt. See the names of possible identifiers here: http://www.uniprot.org/help/programmatic_access """ query = uniprot_input.UniprotQuery( reviewed = True if self.param.swissprot else None, organism = self.ncbi_tax_id, fields = self.param._resource_id_type_a, ) self._log(f'UniProt REST API call: `{query.url_plain}`.') trembl = 'trembl' in self.param protein_name = self.param.field == 'protein names' query.name_process = not protein_name and not trembl data = query.perform() if not query.name_process: def maybe_split(v): if trembl and not any(ch.islower() for ch in v): v = common.del_empty(query._FIELDSEP.split(v)) elif protein_name: v = self._process_protein_name(v) return v data = {k: maybe_split(v) for k, v in data.items()} data = {k: common.to_set(v) for k, v in data.items()} self.a_to_b = ( common.swap_dict(data, force_sets = True) if self.load_a_to_b else None ) self.b_to_a = data if self.load_b_to_a else None
def read_mapping_pro(self): pro_data = pro_input.pro_mapping(target_id_type = self.param.id_type) pro_to_other = collections.defaultdict(set) for pro, other in pro_data: pro_to_other[pro].add(other) self.a_to_b = ( None if not self.load_a_to_b else common.swap_dict(pro_to_other, force_sets = True) if self.param.to_pro else dict(pro_to_other) ) self.b_to_a = ( None if not self.load_b_to_a else dict(pro_to_other) if self.param.to_pro else common.swap_dict(pro_to_other, force_sets = True) )
[docs] def read_mapping_biomart(self): """ Loads a mapping table using BioMart data. """ ens_organism = taxonomy.ensure_ensembl_name(self.param.ncbi_tax_id) if not ens_organism: self._log( 'Organism not available in Ensembl: `%u`.' % ( self.param.ncbi_tax_id ) ) return dataset = '%s_gene_ensembl' % ens_organism biomart_data = biomart_input.biomart_query( attrs = self.param.attrs, dataset = dataset, ) a_to_b = collections.defaultdict(set) b_to_a = collections.defaultdict(set) for rec in biomart_data: id_a = getattr(rec, self.param.biomart_id_type_a) id_b = getattr(rec, self.param.biomart_id_type_b) if id_a and id_b: if self.load_a_to_b: a_to_b[id_a].add(id_b) if self.load_b_to_a: b_to_a[id_b].add(id_a) self.a_to_b = dict(a_to_b) if self.load_a_to_b else None self.b_to_a = dict(b_to_a) if self.load_b_to_a else None
[docs] def read_mapping_array(self): """ Loads mapping table between microarray probe IDs and genes. """ probe_mapping = biomart_input.biomart_microarrays( organism = self.param.ncbi_tax_id, vendor = self.param.array_id, gene = self.param.ensembl_id == 'ensg', transcript = self.param.ensembl_id == 'enst', peptide = self.param.ensembl_id == 'ensp', ) a_to_b__probe_to_gene = self.param.id_type_a == self.param.array_id if ( ( a_to_b__probe_to_gene and self.load_a_to_b ) or ( not a_to_b__probe_to_gene and self.load_b_to_a ) ): probe_to_gene = collections.defaultdict(set) for ensembl_id, probes in iteritems(probe_mapping): for probe in probes: probe_to_gene[probe.probe].add(ensembl_id) setattr( self, 'a_to_b' if a_to_b__probe_to_gene else 'b_to_a', dict(probe_to_gene), ) if ( ( a_to_b__probe_to_gene and self.load_b_to_a ) or ( not a_to_b__probe_to_gene and self.load_a_to_b ) ): gene_to_probe = dict( ( ensembl_id, {p.probe for p in probe_ids} ) for ensembl_id, probe_ids in iteritems(probe_mapping) ) setattr( self, 'b_to_a' if a_to_b__probe_to_gene else 'a_to_b', gene_to_probe, )
def _read_mapping_smallmolecule(self): """ Loads a small molecule ID translation table. """ if self.param.input_method: method = inputs.get_method(self.param.input_method) else: mod = globals()[f'{self.source_type}_input'] method = getattr(mod, f'{self.source_type}_mapping') data = method( id_type_a = self.resource_id_type_a, id_type_b = self.resource_id_type_b, ) if self.load_a_to_b: self.a_to_b = data if self.load_b_to_a: self.b_to_a = common.swap_dict(data, force_sets = True) self.ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC
[docs] def read_mapping_ramp(self): """ Loads an ID translation table from RaMP. """ self._read_mapping_smallmolecule()
[docs] def read_mapping_unichem(self): """ Loads an ID translation table from UniChem. """ self._read_mapping_smallmolecule()
[docs] def read_mapping_hmdb(self): """ Loads an ID translation table from th Human Metabolome Database. """ self._read_mapping_smallmolecule()
@staticmethod def _process_protein_name(name): rebr = re.compile(r'\(([^\)]{3,})\)') resq = re.compile(r'\[([^\]]{3,})\]') names = [name.split('(')[0]] names += rebr.findall(name) others = common.flat_list([x.split(';') for x in resq.findall(name)]) others = [x.split(':')[1] if ':' in x else x for x in others] others = [x.split('(')[1] if '(' in x else x for x in others] names += others return {x.strip() for x in names}
[docs] def resource_id_type(self, side = Literal['a', 'b']) -> str | None: """ Resource specific identifier type. """ return ( getattr(self.param, f'resource_id_type_{side}') or self._resource_id_types.get(getattr(self.param, f'id_type_{side}')) )
@property def resource_id_type_a(self) -> str | None: return self.resource_id_type('a') @property def resource_id_type_b(self) -> str | None: return self.resource_id_type('b')
[docs] class MappingTable(session_mod.Logger): """ This is the class directly handling ID translation data. It does not care about loading it or what kind of IDs these only accepts the translation dictionary. lifetime : int If this table has not been used for longer than this preiod it is to be removed at next cleanup. Time in seconds. """
[docs] def __init__( self, data, id_type, target_id_type, ncbi_tax_id, lifetime = 300, ): """ Wrapper around a dictionary of identifier mapping. The dictionary is located in the `data` attribute, keys are the source identifiers, values are sets of target identifiers. Most often the mapping is unambigous, which means one target identifier for each source identifier. Args data (dict): The identifier translation dictionary. id_type (str): The source ID type. target_id_type (str): The target ID type. ncbi_tax_id (int): NCBI Taxonomy identifier of the organism. lifetime (int): Time in seconds to keep the table loaded in the memory. If not used, the table will be unloaded after this time. Each usage resets the expiry time. """ session_mod.Logger.__init__(self, name = 'mapping') self.id_type = id_type self.target_id_type = target_id_type self.ncbi_tax_id = ncbi_tax_id self.data = data self.lifetime = lifetime self._used()
def reload(self): modname = self.__class__.__module__ mod = __import__(modname, fromlist = [modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new) def __getitem__(self, key): self._used() if key in self.data: return self.data[key] return set() def __contains__(self, key): self._used() return key in self.data def __len__(self): return len(self.data) def _used(self): self._last_used = time.time() def _expired(self): return time.time() - self._last_used > self.lifetime
[docs] def get_key(self): """ Creates a mapping table key, a tuple with all the defining properties of the mapping table. """ return MappingTableKey( id_type = self.id_type, target_id_type = self.target_id_type, ncbi_tax_id = self.ncbi_tax_id, )
@property def key(self): return MappingTableKey( id_type = self.id_type, target_id_type = self.target_id_type, ncbi_tax_id = self.ncbi_tax_id, ) def __repr__(self): return '<MappingTable from=%s, to=%s, taxon=%u (%u IDs)>' % ( self.key + (len(self),) ) @property def items(self): return self.data.items @property def keys(self): return self.data.keys @property def values(self): return self.data.values
[docs] class Mapper(session_mod.Logger): default_name_types = settings.get('default_name_types') default_label_types = settings.get('default_label_types') def _get_label_type_to_id_type(default_name_types): label_type_to_id_type = dict( ( label_type, default_name_types[entity_type], ) for entity_type, label_type in iteritems(settings.get('default_label_types')) ) #TODO: some nicer solution label_type_to_id_type['mir-name'] = 'mir-pre' return label_type_to_id_type label_type_to_id_type = _get_label_type_to_id_type(default_name_types)
[docs] def __init__( self, ncbi_tax_id = None, cleanup_period = 10, lifetime = 300, translate_deleted_uniprot = None, keep_invalid_uniprot = None, trembl_swissprot_by_genesymbol = None, ): """ cleanup_period : int Periodically check and remove unused mapping data. Time in seconds. If `None` tables kept forever. lifetime : int If a table has not been used for longer than this preiod it is to be removed at next cleanup. translate_deleted_uniprot : bool Do an extra attempt to translate deleted or obsolete UniProt IDs by retrieving their archived datasheet and use the gene symbol to find the corresponding valid UniProt ID? keep_invalid_uniprot : bool If the target ID is UniProt, keep the results if they fit the format for UniProt IDs (we won't check if they are deleted or from a different taxon). The alternative is to keep only those which are in the list of all UniProt IDs for the given organism. trembl_swissprot_by_genesymbol : bool Attempt to translate TrEMBL IDs to SwissProt by translating to gene symbols and then to SwissProt. """ session_mod.Logger.__init__(self, name = 'mapping') cleanup_period = settings.get( 'mapper_cleanup_interval', cleanup_period ) self._translate_deleted_uniprot = settings.get( 'mapper_translate_deleted_uniprot', translate_deleted_uniprot, ) self._keep_invalid_uniprot = settings.get( 'mapper_keep_invalid_uniprot', keep_invalid_uniprot, ) self._trembl_swissprot_by_genesymbol = settings.get( 'mapper_trembl_swissprot_by_genesymbol', trembl_swissprot_by_genesymbol, ) self._mapper_cleanup_timeloop = timeloop.Timeloop() self._mapper_cleanup_timeloop.logger.setLevel(9999) for job in self._mapper_cleanup_timeloop.jobs: if job.is_alive(): job.stop() job.stopped.set() self._mapper_cleanup_timeloop.jobs = [] @self._mapper_cleanup_timeloop.job( interval = datetime.timedelta( seconds = cleanup_period ) ) def _cleanup(): self.remove_expired() self._mapper_cleanup_timeloop.start(block = False) # regex for matching UniProt AC format self.reuniprot = re.compile(r'^(?:%s)$' % uniprot_input.reac.pattern) self.remipreac = re.compile(r'^MI\d{7}$') self.remimatac = re.compile(r'^MIMAT\d{7}$') self.remipreid = re.compile( r'^[a-z]{3}-' r'(?:mir|MIR|let|lsy|lin)-?' r'\d+-?[A-z\*]*(?:-((?!p)[\w\*\.-])+)?$' ) self.remimatid = re.compile( r'^[a-z]{3}-' r'(?:miR|let|lsy|lin)-?' r'\d+[a-z\*]*(?:-((?!p)[\w\*])+)?(?:-(3|5)p)?$' ) self.cachedir = cache_mod.get_cachedir() self.ncbi_tax_id = ncbi_tax_id or settings.get('default_organism') self.unmapped = [] self.tables = {} self.uniprot_mapped = [] self.trace = [] self.uniprot_static_names = { 'uniprot_id': 'UniProtKB-ID', 'embl': 'EMBL-CDS', 'embl_id': 'EMBL', 'entrez': 'GeneID', 'gi': 'GI', 'refseqp': 'RefSeq', 'refseqn': 'RefSeq_NT', 'ensembl': 'Ensembl', 'ensg': 'ENSEMBL', 'ensp': 'ENSEMBL_PRO_ID', 'enst': 'ENSEMBL_TRS', 'hgnc': 'HGNC', } self.names_uniprot_static = ( common.swap_dict_simple(self.uniprot_static_names) )
[docs] def reload(self): """ Reload the class from the module level. """ modname = self.__class__.__module__ mod = __import__(modname, fromlist = [modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new)
[docs] def get_table_key( self, id_type, target_id_type, ncbi_tax_id = None, ): """ Returns a tuple unambigously identifying a mapping table. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id return MappingTableKey( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def which_table( self, id_type, target_id_type, load = True, ncbi_tax_id = None, ): """ Returns the table which is suitable to convert an ID of id_type to target_id_type. If no such table have been loaded yet, it attempts to load from UniProt. If all attempts failed returns `None`. """ tbl = None ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id def check_loaded(): return self.which_table( id_type = id_type, target_id_type = target_id_type, load = False, ncbi_tax_id = ncbi_tax_id, ) tbl_key = self.get_table_key( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) tbl_key_noorganism = self.get_table_key( *tbl_key[:-1], ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC, ) tbl_key_rev = self.get_table_key( id_type = target_id_type, target_id_type = id_type, ncbi_tax_id = ncbi_tax_id, ) tbl_key_rev_noorganism = self.get_table_key( *tbl_key_rev[:-1], ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC, ) if tbl_key in self.tables: tbl = self.tables[tbl_key] elif tbl_key_noorganism in self.tables: tbl = self.tables[tbl_key_noorganism] elif tbl_key_rev in self.tables: self.create_reverse(tbl_key_rev) tbl = self.tables[tbl_key_rev] elif tbl_key_rev_noorganism in self.tables: self.create_reverse(tbl_key_rev_noorganism) tbl = self.tables[tbl_key_rev_noorganism] elif load: self._log( 'Requested to load ID translation table from ' '`%s` to `%s`, organism: %u.' % ( id_type, target_id_type, ncbi_tax_id, ) ) if id_type == 'complex' or target_id_type == 'complex': raise ValueError('Can not translate protein complexes.') id_types = (id_type, target_id_type) id_types_rev = tuple(reversed(id_types)) resource = None for resource_attr in RESOURCES_EXPLICIT: resources = getattr(maps, resource_attr) if id_types in resources: resource = resources[id_types] load_a_to_b = True load_b_to_a = False elif id_types_rev in resources: resource = resources[id_types_rev] load_a_to_b = False load_b_to_a = True if resource: self._log( 'Chosen built-in defined ID translation table: ' 'resource=%s, id_type_a=%s, id_type_b=%s' % ( resource_attr, resource.id_type_a, resource.id_type_b, ) ) self.load_mapping( resource = resource, load_a_to_b = load_a_to_b, load_b_to_a = load_b_to_a, ncbi_tax_id = ncbi_tax_id, ) tbl = check_loaded() break if tbl is not None: break if tbl is None: basic_services = {'hmdb', 'ramp', 'uniprot', 'unichem'} for (service_ids, service_id_type, input_cls) in ( RESOURCES_IMPLICIT ): if ( ( input_cls.possible( id_type, target_id_type, ncbi_tax_id, ) and id_type != target_id_type ) or ( service_id_type == 'pro' and ( ( id_type in service_ids or target_id_type in service_ids ) and ( id_type == service_id_type or target_id_type == service_id_type ) ) ) or ( service_id_type == 'biomart' and ( ( id_type in service_ids and target_id_type in service_ids ) ) ) or ( service_id_type == 'array' and ( ( id_type in service_ids and target_id_type in {'ensg', 'enst', 'ensp'} ) or ( target_id_type in service_ids and id_type in {'ensg', 'enst', 'ensp'} ) ) ) ): if target_id_type == service_id_type: _id_type, _target_id_type = ( target_id_type, id_type, ) load_a_to_b = False load_b_to_a = True else: _id_type, _target_id_type = ( id_type, target_id_type, ) load_a_to_b = True load_b_to_a = False self._log( 'Chosen ID translation table from service: ' 'service=%s, id_type_a=%s, id_type_b=%s' % ( service_id_type, _id_type, _target_id_type, ) ) if service_id_type in {'hmdb', 'ramp', 'unichem'}: ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC tbl_key = tbl_key_noorganism tbl_key_rev = tbl_key_rev_noorganism # for uniprot/idmapping or PRO or array # we create here the mapping params this_param = input_cls( id_type_a = _id_type, id_type_b = _target_id_type, ncbi_tax_id = ncbi_tax_id, ) reader = MapReader( param = this_param, ncbi_tax_id = ncbi_tax_id, load_a_to_b = load_a_to_b, load_b_to_a = load_b_to_a, uniprots = None, lifetime = 300, resource_id_types = service_ids, ) self.tables[tbl_key] = getattr( reader, 'mapping_table_%s_to_%s' % ( reader.id_type_side(tbl_key.id_type), reader.id_type_side(tbl_key.target_id_type), ) ) tbl = check_loaded() if tbl: break if tbl is None and id_type == 'genesymbol5': self.load_genesymbol5(ncbi_tax_id = ncbi_tax_id) tbl = check_loaded() if tbl is None: if ( settings.get('mapping_uniprot_static') and id_type in self.uniprot_static_names and target_id_type == 'uniprot' ): self.load_uniprot_static([id_type]) tbl = check_loaded() if tbl is None: self._log( 'Could not find suitable ID translation table ' f'between id types `{id_type}` and `{target_id_type}` ' f'for organism `{ncbi_tax_id}`.' ) if hasattr(tbl, '_used'): tbl._used() return tbl
[docs] @staticmethod def reverse_mapping(mapping_table): """ Creates an opposite direction `MappingTable` by swapping the dictionary inside an existing `MappingTable` object. Args mapping_table (MappingTable): A `MappingTable` object. Returns A new `MappingTable` object. """ rev_data = common.swap_dict(mapping_table.data) return MappingTable( data = rev_data, id_type = mapping_table.target_id_type, target_id_type = mapping_table.id_type, ncbi_tax_id = mapping_table.ncbi_tax_id, lifetime = mapping_table.lifetime, )
[docs] def reverse_key(self, key): """ For a mapping table key returns a new key with the identifiers reversed. Args key (tuple): A mapping table key. Returns A tuple representing a mapping table key, identifiers swapped. """ return ( self.get_table_key( id_type = key.target_id_type, target_id_type = key.id_type, ncbi_tax_id = key.ncbi_tax_id, ) )
[docs] def create_reverse(self, key): """ Creates a mapping table with ``id_type`` and ``target_id_type`` (i.e. direction of the ID translation) swapped. """ table = self.tables[key] rev_key = self.reverse_key(key) self.tables[rev_key] = self.reverse_mapping(table)
[docs] def map_name0( self, name, id_type = None, target_id_type = None, ncbi_tax_id = None, strict = False, expand_complexes = None, uniprot_cleanup = None, ): """ Translates the name and returns only one of the resulted IDs. It means in case of ambiguous ID translation, a random one of them will be picked and returned. Recommended to use only if the translation between the given ID types is mostly unambigous and the loss of information can be ignored. See more details at `map_name`. """ names = self.map_name( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, expand_complexes = expand_complexes, uniprot_cleanup = uniprot_cleanup, ) return list(names)[0] if names else None
[docs] @common.ignore_unhashable @functools.lru_cache(maxsize = int(1e5)) def map_name( self, name, id_type = None, target_id_type = None, ncbi_tax_id = None, strict = False, expand_complexes = True, uniprot_cleanup = True, ): """ Translates one instance of one ID type to a different one. Returns set of the target ID type. This function should be used to convert individual IDs. It takes care about everything and ideally you don't need to think on the details. How does it work: looks up dictionaries between the original and target ID type, if doesn't find, attempts to load from the predefined inputs. If the original name is genesymbol, first it looks up among the preferred gene names from UniProt, if not found, it takes an attempt with the alternative gene names. If the gene symbol still couldn't be found, and strict = False, the last attempt only the first 5 characters of the gene symbol matched. If the target name type is uniprot, then it converts all the ACs to primary. Then, for the Trembl IDs it looks up the preferred gene names, and find Swissprot IDs with the same preferred gene name. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID \[#\]) - refseqp (NCBI RefSeq Protein ID \[NP\_\*|XP\_\*\]) - ensp (Ensembl protein ID \[ENSP\*\]) - enst (Ensembl transcript ID \[ENST\*\]) - ensg (Ensembl genomic DNA ID \[ENSG\*\]) - hgnc (HGNC ID \[HGNC:#\]) - gi (GI number \[#\]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """ if not name: return set() ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id # we support translating from more name types # at the same time if isinstance(id_type, (list, set, tuple)): return set.union( *( self.map_name( name = name, id_type = this_id_type, target_id_type = target_id_type, strict = strict, ncbi_tax_id = ncbi_tax_id, ) for this_id_type in id_type ) ) # complexes if hasattr(name, 'components'): if expand_complexes: return set(name.components.keys()) else: return {name} # translating from an ID type to the same ID type? elif id_type == target_id_type: if target_id_type != 'uniprot' or not uniprot_cleanup: # no need for translation return {name} else: # we still try to search the primary UniProt mapped_names = {name} # actual translation comes here elif id_type.startswith('refseq'): # RefSeq is special mapped_names = self._map_refseq( refseq = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, ) elif id_type == 'ensp': mapped_names = self._map_ensp( ensp = name, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) elif target_id_type == 'ensp': mapped_names = self._map_to_ensp( name = name, id_type = id_type, ncbi_tax_id = ncbi_tax_id, ) elif ( ( id_type in input_formats.ARRAY_MAPPING and not target_id_type.startswith('ens') ) or ( target_id_type in input_formats.ARRAY_MAPPING and not id_type.startswith('ens') ) ): # microarray probe IDs we are able to directly translate # only to and from Ensembl gene, transcript and protein IDs # if the other ID is different (such as uniprot), we translate # in two steps, via Ensembl peptide ID: mapped_names = self.chain_map( name = name, id_type = id_type, by_id_type = 'ensp', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, expand_complexes = expand_complexes, uniprot_cleanup = uniprot_cleanup, ) else: # all the other ID types mapped_names = self._map_name( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) # as ID translation tables for PRO IDs are not organism specific # we need an extra step to limit the results to the target organism if id_type == 'pro' and target_id_type == 'uniprot': mapped_names = ( mapped_names & reflists.get_reflist( id_type = 'uniprot', ncbi_tax_id = ncbi_tax_id, ) ) # by default the uniprot-genesymbol tables contain only SwissProt if id_type == 'uniprot' and target_id_type == 'genesymbol': mapped_names = self._map_name( name = name, id_type = 'trembl', target_id_type = 'genesymbol', ncbi_tax_id = ncbi_tax_id, ) if not mapped_names: uniprots = self._map_name( name = name, id_type = 'uniprot-sec', target_id_type = 'uniprot-pri', ncbi_tax_id = ncbi_tax_id, ) if uniprots: mapped_names = self.map_names( names = uniprots, id_type = 'uniprot', target_id_type = 'genesymbol', ncbi_tax_id = ncbi_tax_id, ) # further attempts to set it right if # first attempt was not successful # for miRNAs if the translation from mature miRNA name failed # we still try if maybe it is a hairpin name # or the other way around if not mapped_names and id_type in {'mir-mat-name', 'mir-name'}: for id_type0, id_type1, target_id_type0, target_id_type1 in ( ('mir-name', 'mir-mat-name', 'mir-pre', 'mirbase'), ('mir-mat-name', 'mir-name', 'mirbase', 'mir-pre'), ): if id_type == id_type0: mapped_names = self._map_name( name = name, id_type = id_type1, target_id_type = target_id_type1, ncbi_tax_id = ncbi_tax_id, ) if mapped_names and target_id_type == target_id_type0: mapped_names = self.map_names( names = mapped_names, id_type = target_id_type1, target_id_type = target_id_type0, ncbi_tax_id = ncbi_tax_id, ) if mapped_names: break # for genesymbol, we automatically try 2 steps mapping via uniprot if ( not mapped_names and ( id_type == 'genesymbol' or target_id_type == 'genesymbol' ) and id_type not in UNIPROT_ID_TYPES and target_id_type not in UNIPROT_ID_TYPES ): mapped_names = self.chain_map( name = name, id_type = id_type, by_id_type = 'uniprot', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if not mapped_names: # maybe it should be all uppercase (e.g. human gene symbols)? mapped_names = self._map_name( name = name.upper(), id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if ( not mapped_names and id_type not in {'uniprot', 'trembl', 'uniprot-sec'} ): # maybe should be capitalized (e.g. rodent gene symbols)? mapped_names = self._map_name( name = name.capitalize(), id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if ( not mapped_names and id_type not in {'uniprot', 'trembl', 'uniprot-sec'} ): # maybe it should be all lowercase? mapped_names = self._map_name( name = name.lower(), id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if ( not mapped_names and id_type.startswith('ens') and '.' in name ): # trying to split the part after the dot: mapped_names = self._map_name( name = name.upper().split('.')[0], id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if ( not mapped_names and ':' in name ): # trying to remove the prefix which sometimes # shows the ID type, e.g. CHEBI:4956 should become 4956 mapped_names = self._map_name( name = common.remove_prefix(name, ':'), id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) # if a gene symbol could not be translated by the default # conversion table, containing only the primary gene symbols # in next step we try the secondary (synonym) gene symbols if ( not mapped_names and id_type == 'genesymbol' ): mapped_names = self._map_name( name = name, id_type = 'genesymbol-syn', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) # for gene symbols we might try one more thing, # sometimes the source gene symbol missing some isoform # information or number because it refers to the first # or all isoforms or subtypes; or the opposite: the # original resource contains a gene symbol with a number # appended which is not part of the official primary # gene symbol # # here we try to translate by adding a number `1` or # by matching only the first few letters; # obviously we can not exclude mistranslation here # # by setting `strict = True` this step is disabled if not strict and not mapped_names: mapped_names = self._map_name( name = '%s1' % name, id_type = 'genesymbol', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if not mapped_names and target_id_type == 'uniprot': mapped_names = self._map_name( name = name, id_type = 'genesymbol5', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) # for UniProt IDs we do a few more steps to # try to find out the primary SwissProt ID if uniprot_cleanup and target_id_type == 'uniprot': mapped_names = self.uniprot_cleanup( uniprots = mapped_names, ncbi_tax_id = ncbi_tax_id, ) return mapped_names
[docs] def uniprot_cleanup(self, uniprots, ncbi_tax_id = None): """ We use this function as a standard callback when the target ID type is UniProt. It checks if the format of the IDs are correct, if they are part of the organism proteome, attempts to translate secondary and deleted IDs to their primary, recent counterparts. Args uniprots (str,set): One or more UniProt IDs. ncbi_tax_id (int): The NCBI Taxonomy identifier of the organism. Returns Set of checked and potentially translated UniProt iDs. Elements which do not fit the criteria will be discarded. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id uniprots = common.to_set(uniprots) # step 1: translate secondary IDs to primary uniprots = self.primary_uniprot(uniprots) # step 2: translate TrEMBL to SwissProt by gene symbols if self._trembl_swissprot_by_genesymbol: uniprots = self.trembl_swissprot( uniprots, ncbi_tax_id = ncbi_tax_id, ) # step 3: translate deleted IDs by gene symbols if self._translate_deleted_uniprot: uniprots = self.translate_deleted_uniprots_by_genesymbol( uniprots ) # step 4: check if the IDs exist in the proteome of the organism if not self._keep_invalid_uniprot: uniprots = self.only_valid_uniprots( uniprots, ncbi_tax_id = ncbi_tax_id, ) # step 5: ensure the format validity uniprots = self.only_uniprot_ac(uniprots) return uniprots
[docs] def map_names( self, names, id_type = None, target_id_type = None, ncbi_tax_id = None, strict = False, expand_complexes = True, uniprot_cleanup = True, ): """ Same as ``map_name`` but translates multiple IDs at once. These two functions could be seamlessly implemented as one, still I created separate functions to always make it explicit if a set of translated IDs come from multiple original IDs. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID \[#\]) - refseqp (NCBI RefSeq Protein ID \[NP\_\*|XP\_\*\]) - ensp (Ensembl protein ID \[ENSP\*\]) - enst (Ensembl transcript ID \[ENST\*\]) - ensg (Ensembl genomic DNA ID \[ENSG\*\]) - hgnc (HGNC ID \[HGNC:#\]) - gi (GI number \[#\]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """ return set.union( *( self.map_name( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, ) for name in names ) ) if names else set()
[docs] def chain_map( self, name, id_type, by_id_type, target_id_type, ncbi_tax_id = None, **kwargs ): """ Translate IDs which can not be directly translated in two steps: from `id_type` to `via_id_type` and from there to `target_id_type`. Args name (str): The original name to be converted. id_type (str): The type of the name. by_id_type (str): The intermediate name type. target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): The NCBI Taxonomy identifier of the organism. kwargs: Passed to `map_name`. Returns Set of IDs of type `target_id_type`. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id mapped_names = self.map_names( names = self.map_name( name = name, id_type = id_type, target_id_type = by_id_type, ncbi_tax_id = ncbi_tax_id, **kwargs ), id_type = by_id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, **kwargs ) return mapped_names
def _map_refseq( self, refseq, id_type, target_id_type, ncbi_tax_id = None, strict = False, ): """ ID translation adapted to the specialities of RefSeq IDs. """ mapped_names = set() ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id # try first as it is mapped_names = self._map_name( name = refseq, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) # then with the number at the end removed # this is disabled if `strict = True` if not mapped_names and not strict: mapped_names = self._map_name( name = refseq.split('.')[0], id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if not mapped_names and not strict: rstem = refseq.split('.')[0] # try some other numbers # this risky and is disabled if `strict = True` for n in xrange(49): mapped_names.update( self._map_name( name = '%s.%u' % (rstem, n), id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) ) return mapped_names def _map_ensp( self, ensp, target_id_type, ncbi_tax_id = None, ): """ Special ID translation from ENSP (Ensembl peptide IDs). """ mapped_names = set() ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id # try first UniProt ID Mapping # then Ensembl BioMart for id_type in ('ensp', 'ensp_biomart'): if not mapped_names: mapped_names = self._map_name( name = ensp, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if not mapped_names: tax_ensp = '%u.%s' % (ncbi_tax_id, ensp) # this uses UniProt ID Mapping with STRING ID type mapped_names = self._map_name( name = tax_ensp, id_type = 'ensp_string', target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) return mapped_names def _map_to_ensp( self, name, id_type, ncbi_tax_id = None, ): """ Special ID translation to ENSP (Ensembl peptide IDs). """ mapped_names = set() ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id # try first UniProt ID Mapping # then Ensembl BioMart for target_id_type in ('ensp', 'ensp_biomart'): if not mapped_names: mapped_names = self._map_name( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if not mapped_names: # this uses UniProt ID Mapping with STRING type mapped_names = self._map_name( name = name, id_type = id_type, target_id_type = 'ensp_string', ncbi_tax_id = ncbi_tax_id, ) mapped_names = {n.split('.')[-1] for n in mapped_names} return mapped_names def _map_name( self, name, id_type, target_id_type, ncbi_tax_id = None, ): """ Once we have defined the name type and the target name type, this function looks it up in the most suitable dictionary. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id tbl = self.which_table( id_type, target_id_type, ncbi_tax_id = ncbi_tax_id, ) return tbl[name] if tbl else set()
[docs] def translation_dict( self, id_type: str, target_id_type: str, ncbi_tax_id: int | None = None, ) -> MappingTable | None: """ Translation table as a dict. """ return self.which_table( id_type, target_id_type, ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id, )
[docs] def translation_df( self, id_type: str, target_id_type: str, ncbi_tax_id: int | None = None, ) -> pd.DataFrame | None: """ Translation table as a data frame. """ tbl = self.translation_dict(id_type, target_id_type, ncbi_tax_id) if tbl: return pd.DataFrame( ( (source_id, target_id) for source_id, target_ids in tbl.data.items() for target_id in target_ids ), columns = [id_type, target_id_type], )
# # ID specific translation methods #
[docs] def label( self, name, entity_type = None, id_type = None, ncbi_tax_id = None, ): """ For any kind of entity, either protein, miRNA or protein complex, returns the preferred human readable label. For proteins this means Gene Symbols, for miRNAs miRNA names, for complexes a series of Gene Symbols. """ if isinstance(name, _const.LIST_LIKE): return [ self.label( _name, entity_type = entity_type, id_type = id_type, ncbi_tax_id = ncbi_tax_id, ) for _name in name ] elif hasattr(name, 'genesymbol_str'): return name.genesymbol_str elif isinstance(name, str): ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id entity_type = ( entity_type or ( 'small_molecule' if ncbi_tax_id == _const.NOT_ORGANISM_SPECIFIC else 'protein' ) ) if name.startswith('MIMAT'): return map_name0( name, id_type or 'mirbase', 'mir-mat-name', ncbi_tax_id = ncbi_tax_id, ) or name elif name.startswith('MI'): return self.map_name0( name, id_type or 'mir-pre', 'mir-name', ncbi_tax_id = ncbi_tax_id, ) or name elif entity_type in self.default_label_types: id_type = id_type or self.default_name_types[entity_type] target_id_type = self.default_label_types[entity_type] return self.map_name0( name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) or name else: return self.map_name0( name, id_type or 'uniprot', 'genesymbol', ncbi_tax_id = ncbi_tax_id, ) or name else: return str(name)
[docs] def identifier( self, label: Union[str, Iterable[str]], ncbi_tax_id: Optional[int] = None, id_type: Optional[str] = None, entity_type: Optional[ Literal[ 'drug', 'lncrna', 'mirna', 'protein', 'small_molecule', ] ] = None, ) -> Union[Set[str], List[Set[str]]]: """ For a label returns the corresponding primary identifier. The type of default identifiers is determined by the settings module. Note, this kind of translation is not always unambigous, one gene symbol might correspond to multiple UniProt IDs. """ if not common.is_str(label): return [ self.identifier( _label, entity_type = entity_type, id_type = id_type, ncbi_tax_id = ncbi_tax_id, ) for _label in label ] elif hasattr(label, 'components'): return label.__str__() elif common.is_str(label): ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id entity_type = ( entity_type or ( 'small_molecule' if ncbi_tax_id == _const.NOT_ORGANISM_SPECIFIC else 'protein' ) ) id_type = ( id_type or settings.get('default_label_types')[entity_type] ) target_id_type = settings.get('default_name_types')[entity_type] return self.map_name( label, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) else: return str(name)
def identifier0( self, label: Union[str, Iterable[str]], ncbi_tax_id: Optional[int] = None, id_type: Optional[str] = None, entity_type: Optional[ Literal[ 'drug', 'lncrna', 'mirna', 'protein', 'small_molecule', ] ] = None, ) -> Union[str, List[str]]: args = locals() _ = args.pop('self') ids = self.identifier(**args) return ( common.first(ids) if isinstance(label, str) else list(map(common.first, ids)) )
[docs] def guess_type(self, name, entity_type = None): """ From a string, tries to guess the ID type and optionally the entity type. Returns a tuple of strings: ID type and entity type. """ if ( ( not entity_type or entity_type == 'protein' ) and self.reuniprot.match(name) ): return 'uniprot', 'protein' if not entity_type or entity_type == 'mirna': if self.remipreac.match(name): return 'mir-pre', 'mirna' if self.remimatac.match(name): return 'mirbase', 'mirna' if self.remimatid.match(name): return 'mir-mat-name', 'mirna' if self.remipreid.match(name): return 'mir-name', 'mirna' return None, entity_type
def id_from_label( self, label, label_id_type = 'genesymbol', ncbi_tax_id = None, ): if label_id_type in self.label_type_to_id_type: ids = self.map_name( label, label_id_type, self.label_type_to_id_type[label_id_type], ncbi_tax_id = ncbi_tax_id, ) return ids or {label} def id_from_label0( self, label, label_id_type = 'genesymbol', ncbi_tax_id = None, ): return next( self.id_from_label( label = label, label_id_type = label_id_type, ncbi_tax_id = ncbi_tax_id ).__iter__() )
[docs] def primary_uniprot(self, uniprots, ncbi_tax_id = None): """ For an iterable of UniProt IDs returns a set with the secondary IDs changed to the corresponding primary IDs. Anything what is not a secondary UniProt ID left intact. """ primaries = set() ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id for uniprot in uniprots: primary = self.map_name( name = uniprot, id_type = 'uniprot-sec', target_id_type = 'uniprot-pri', ncbi_tax_id = ncbi_tax_id, ) if primary: primaries.update(primary) else: # most probably this UniProt is already primary primaries.add(uniprot) return primaries
[docs] def trembl_swissprot(self, uniprots, ncbi_tax_id = None): """ For an iterable of TrEMBL and SwissProt IDs, returns a set with only SwissProt, mapping from TrEMBL to gene symbols, and then back to SwissProt. If this kind of translation is not successful for any of the IDs it will be kept in the result, no matter if it's not a SwissProt ID. If the """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id swissprots = set() for uniprot in uniprots: swissprot = None genesymbols = self.map_name( name = uniprot, id_type = 'trembl', target_id_type = 'genesymbol', ncbi_tax_id = ncbi_tax_id, ) this_swissprots = self.map_names( names = genesymbols, id_type = 'genesymbol', target_id_type = 'swissprot', ncbi_tax_id = ncbi_tax_id, ) if not this_swissprots: swissprots.add(uniprot) else: swissprots.update(this_swissprots) return swissprots
def translate_deleted_uniprots_by_genesymbol( self, uniprots, ncbi_tax_id = None, ): if isinstance(uniprots, str): return self.translate_deleted_uniprot_by_genesymbol( uniprots, ncbi_tax_id = ncbi_tax_id, ) else: ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id return set.union(*( self.translate_deleted_uniprot_by_genesymbol( uniprot, ncbi_tax_id = ncbi_tax_id, ) for uniprot in uniprots )) if uniprots else set()
[docs] def translate_deleted_uniprot_by_genesymbol( self, uniprot, ncbi_tax_id = None, ): """ Due to potentially ambiguous translation always returns set. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id if uniprot_db.is_uniprot(uniprot, organism = ncbi_tax_id): return {uniprot} elif self.other_organism_uniprot(uniprot, ncbi_tax_id = ncbi_tax_id): return set() else: genesymbol, taxid = self.deleted_uniprot_genesymbol(uniprot) if genesymbol and taxid == ncbi_tax_id: return self.map_name( genesymbol, 'genesymbol', 'uniprot', ncbi_tax_id = ncbi_tax_id, uniprot_cleanup = False, ) return {uniprot}
[docs] def other_organism_uniprot(self, uniprot, ncbi_tax_id = None): """ Tells if ``uniprot`` is an UniProt ID from some other organism than ``ncbi_tax_id``. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id uniprot_taxid = taxonomy.uniprot_taxid(uniprot) return uniprot_taxid and uniprot_taxid != ncbi_tax_id
def deleted_uniprot_genesymbol(self, uniprot): return uniprot_input.deleted_uniprot_genesymbol(uniprot) def only_valid_uniprots(self, uniprots, ncbi_tax_id = None): ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id if isinstance(uniprots, str): return self.valid_uniprot(uniprots, ncbi_tax_id = ncbi_tax_id) else: return { uniprot for uniprot in uniprots if uniprot_db.is_uniprot(uniprot, organism = ncbi_tax_id) }
[docs] def valid_uniprot(self, uniprot, ncbi_tax_id = None): """ If the UniProt ID ``uniprot`` exist in the proteome of the organism ``ncbi_tax_id`` returns the ID, otherwise returns None. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id if uniprot_db.is_uniprot(uniprot, organism = ncbi_tax_id): return uniprot
[docs] def only_uniprot_ac(self, uniprots): """ For one or more strings returns only those which match the format of UniProt accession numbers. The format is defined here: https://www.uniprot.org/help/accession_numbers If string provided, returns string or None. If iterable provided, returns set (potentially empty if none of the strings are valid). """ if isinstance(uniprots, str): return self._only_uniprot_ac(uniprots) else: return { validated for validated in ( self._only_uniprot_ac(uniprot) for uniprot in uniprots ) if validated }
def _only_uniprot_ac(self, uniprot): return uniprot if uniprot_input.valid_uniprot(uniprot) else None # # Mapping table management methods #
[docs] @staticmethod def mapping_tables(): """ List of mapping tables available to load. Returns (list): A list of tuples, each representing an ID translation table, with the ID types, the data source and the loader class. """ MappingTableDefinition = collections.namedtuple( 'MappingTableDefinition', ( 'id_type_a', 'id_type_b', 'resource', 'input_class', 'resource_id_type_a', 'resource_id_type_b', ), ) MappingTableDefinition.__new__.__defaults__ = (None, None) result = [] for resource_attr in RESOURCES_EXPLICIT: resources = getattr(maps, resource_attr) for (id_type_a, id_type_b), inputdef in iteritems(resources): result.append( MappingTableDefinition( id_type_a = id_type_a, id_type_b = id_type_b, resource = resource_attr, input_class = inputdef.__class__.__name__, resource_id_type_a = inputdef._resource_id_type_a, resource_id_type_b = inputdef._resource_id_type_b, ) ) for service_ids, service_id_type, input_cls in RESOURCES_IMPLICIT: service_ids = ( iteritems(service_ids) if isinstance(service_ids, dict) else zip(*(service_ids,) * 2) ) for id_type, resource_id_type in service_ids: id_type_b = 'pro' if service_id_type == 'pro' else None result.append( MappingTableDefinition( id_type_a = id_type, id_type_b = id_type_b, resource = service_id_type, input_class = input_cls.__name__, resource_id_type_a = resource_id_type, resource_id_type_b = None, ) ) return result
[docs] @classmethod def id_types(cls): """ A list of all identifier types that can be handled by any of the resources. Returns (list): A list of tuples with the identifier type labels used in pypath and in the original resource. If the latter is None, typically the ID type has no name in the original resource. """ IdType = collections.namedtuple( 'IdType', ( 'pypath', 'original', ), ) return { IdType( pypath = getattr(mapdef, 'id_type_%s' % side), original = getattr(mapdef, 'resource_id_type_%s' % side), ) for mapdef in cls.mapping_tables() for side in ('a', 'b') if getattr(mapdef, 'id_type_%s' % side) }
[docs] def has_mapping_table( self, id_type, target_id_type, ncbi_tax_id = None, ): """ Tells if a mapping table is loaded. If it's loaded, it resets the expiry timer so the table remains loaded. Returns (bool): True if the mapping table is loaded. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id key = self.get_table_key( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) if key in self.tables: self.tables[key]._used() return key in self.tables
[docs] def load_mapping( self, resource, **kwargs ): """ Loads a single mapping table based on input definition in ``resource``. ``**kwargs`` passed to ``MapReader``. """ if ( resource.type in {'file', 'pickle'} and not ( os.path.exists(resource.input) or inputs.get_method(resource.input) ) ): self._log( 'Could not load mapping: no such ' 'file or function: `%s`.' % resource.input ) return ncbi_tax_id = kwargs.get('ncbi_tax_id', resource.ncbi_tax_id) self._log( 'Loading mapping table for organism `%s` ' 'with identifiers `%s` and `%s`, ' 'input type `%s`' % ( ncbi_tax_id, resource.id_type_a, resource.id_type_b, resource.type, ) ) reader = MapReader(param = resource, **kwargs) a_to_b = reader.mapping_table_a_to_b b_to_a = reader.mapping_table_b_to_a for sides in (('a', 'b'), ('b', 'a')): table = locals()['%s_to_%s' % sides] if table: self._log( 'Sucessfully loaded mapping table for organism `%s` ' 'with identifiers `%s` to `%s`.' % ( str(ncbi_tax_id), getattr(resource, f'id_type_{sides[0]}'), getattr(resource, f'id_type_{sides[1]}'), ) ) self.tables[table.get_key()] = table
[docs] def swissprots(self, uniprots, ncbi_tax_id = None): """ Creates a dict translating a set of potentially secondary and non-reviewed UniProt IDs to primary SwissProt IDs (whenever is possible). """ swissprots = {} for uniprot in uniprots: swissprots[uniprot] = self.map_name( name = uniprot, id_type = 'uniprot', target_id_type = 'uniprot', ncbi_tax_id = ncbi_tax_id, ) return swissprots
[docs] def load_genesymbol5(self, ncbi_tax_id = None): """ Creates a Gene Symbol to UniProt mapping table with the first 5 characters of each Gene Symbol. """ ncbi_tax_id = ncbi_tax_id or self.ncbi_tax_id genesymbol_table = self.which_table( id_type = 'genesymbol', target_id_type = 'uniprot', ncbi_tax_id = ncbi_tax_id, ) genesymbol_syn_table = self.which_table( id_type = 'genesymbol-syn', target_id_type = 'uniprot', ncbi_tax_id = ncbi_tax_id, ) genesymbol5_data = collections.defaultdict(set) for table in (genesymbol_table, genesymbol_syn_table): for genesymbol, uniprots in iteritems(table.data): if len(genesymbol) >= 5: genesymbol5 = genesymbol[:5] genesymbol5_data[genesymbol5].update(uniprots) mapping_table = MappingTable( data = genesymbol5_data, id_type = 'genesymbol5', target_id_type = 'uniprot', ncbi_tax_id = ncbi_tax_id, ) self.tables[mapping_table.get_key()] = mapping_table
[docs] def load_uniprot_static( self, keys, ncbi_tax_id = None, ): """ Loads mapping tables from the huge static mapping file from UniProt. Takes long to download and process, also requires more memory. This is the last thing we try if everything else failed. """ cachedir = cache_mod.get_cachedir() data = dict((key, collections.defaultdict(set)) for key in keys) cache_files = {} to_load = set() id_type_b = 'uniprot' # attempting to load them from Pickle for key in keys: mapping_id = common.md5( json.dumps( ( key, 'uniprot_static', ) ) ) cachefile = os.path.join(cachedir, mapping_id) cache_files[key] = cachefile if os.path.exists(cachefile): with open(cachefile, 'rb') as fp: data[key] = pickle.load(fp) else: to_load.add(key) # loading the remaining from the big UniProt mapping file: if to_load: url = urls.urls['uniprot_idmap_ftp']['url'] c = curl.Curl(url, silent = False, large = True) prg = progress.Progress( c.size, 'Processing ID conversion list', 99, ) for line in c.result: prg.step(len(line)) line = common.decode(line, 'ascii').strip().split('\t') if len(line) > 2 and line[1] in self.names_uniprot_static: id_type_a = self.names_uniprot_static[line[1]] key_a_to_b = MappingTableKey( id_type = id_type_a, target_id_type = id_type_b, ncbi_tax_id = ncbi_tax_id, ) key_b_to_a = MappingTableKey( id_type = id_type_b, target_id_type = id_type_a, ncbi_tax_id = ncbi_tax_id, ) this_uniprot = line[0].split('-')[0] if key_a_to_b in to_load: data[key_a_to_b][line[2]].add(this_uniprot) if key_b_to_a in to_load: data[key_b_to_a][this_uniprot].add(line[2]) prg.terminate() for key, this_data in iteritems(data): pickle.dump(this_data, open(cache_files[key], 'wb')) for key, this_data in iteritems(data): table = MappingTable( data = this_data, id_type = key, target_id_type = id_type_b, ncbi_tax_id = ncbi_tax_id, lifetime = 600, ) self.tables[key] = table
[docs] def remove_table(self, id_type, target_id_type, ncbi_tax_id): """ Removes the table defined by the ID types and organism. """ key = MappingTableKey( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, ) self.remove_key(key)
[docs] def remove_key(self, key): """ Removes the table with key ``key`` if exists. """ if key in self.tables: if key and len(key) == 3: self._log( 'Removing mapping table `%s` ' 'to `%s` for organism `%u`.' % key ) del self.tables[key]
[docs] def remove_expired(self): """ Removes tables last used a longer time ago than their lifetime. """ to_remove = set() for key, table in iteritems(self.tables): if not table or table._expired(): to_remove.add(key) for key in to_remove: self.remove_key(key)
def __del__(self): if hasattr(self._mapper_cleanup_timeloop, 'stop'): for job in self._mapper_cleanup_timeloop.jobs: if job.is_alive(): job.stop() job.stopped.set()
[docs] def init(**kwargs): """ Create a new `Mapper` instance under the `mapper` attribute of this module. Returns None. """ if 'mapper' in globals(): globals()['mapper'].__del__() globals()['mapper'] = Mapper(**kwargs)
[docs] def get_mapper(**kwargs): """ The module under its `mapper` attribute has an instance of the `Mapper` object, which manages the ID translations. This function creates the instance if does not exist and returns it. Returns A Mapper object. """ if 'mapper' not in globals(): init(**kwargs) return globals()['mapper']
[docs] def map_name( name, id_type, target_id_type, ncbi_tax_id = None, strict = False, expand_complexes = True, uniprot_cleanup = True, ): """ Translates one instance of one ID type to a different one. Returns set of the target ID type. This function should be used to convert individual IDs. It takes care about everything and ideally you don't need to think on the details. How does it work: looks up dictionaries between the original and target ID type, if doesn't find, attempts to load from the predefined inputs. If the original name is genesymbol, first it looks up among the preferred gene names from UniProt, if not found, it takes an attempt with the alternative gene names. If the gene symbol still couldn't be found, and strict = False, the last attempt only the first 5 characters of the gene symbol matched. If the target name type is uniprot, then it converts all the ACs to primary. Then, for the Trembl IDs it looks up the preferred gene names, and find Swissprot IDs with the same preferred gene name. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID \[#\]) - refseqp (NCBI RefSeq Protein ID \[NP\_\*|XP\_\*\]) - ensp (Ensembl protein ID \[ENSP\*\]) - enst (Ensembl transcript ID \[ENST\*\]) - ensg (Ensembl genomic DNA ID \[ENSG\*\]) - hgnc (HGNC ID \[HGNC:#\]) - gi (GI number \[#\]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `uniprot_cleanup` function at the end. """ mapper = get_mapper() return mapper.map_name( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, expand_complexes = expand_complexes, uniprot_cleanup = uniprot_cleanup, )
[docs] def map_name0( name, id_type, target_id_type, ncbi_tax_id = None, strict = False, expand_complexes = True, uniprot_cleanup = True, ): """ Translates the name and returns only one of the resulted IDs. It means in case of ambiguous ID translation, a random one of them will be picked and returned. Recommended to use only if the translation between the given ID types is mostly unambigous and the loss of information can be ignored. See more details at `map_name`. """ mapper = get_mapper() return mapper.map_name0( name = name, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, expand_complexes = expand_complexes, uniprot_cleanup = uniprot_cleanup, )
[docs] def map_names( names, id_type = None, target_id_type = None, ncbi_tax_id = None, strict = False, expand_complexes = True, uniprot_cleanup = True, ): """ Same as ``map_name`` but translates multiple IDs at once. These two functions could be seamlessly implemented as one, still I created separate functions to always make it explicit if a set of translated IDs come from multiple original IDs. Args name (str): The original name to be converted. id_type (str): The type of the name. Available by default: - genesymbol (gene name) - entrez (Entrez Gene ID \[#\]) - refseqp (NCBI RefSeq Protein ID \[NP\_\*|XP\_\*\]) - ensp (Ensembl protein ID \[ENSP\*\]) - enst (Ensembl transcript ID \[ENST\*\]) - ensg (Ensembl genomic DNA ID \[ENSG\*\]) - hgnc (HGNC ID \[HGNC:#\]) - gi (GI number \[#\]) - embl (DDBJ/EMBL/GeneBank CDS accession) - embl_id (DDBJ/EMBL/GeneBank accession) And many more, see the code of ``pypath.internals.input_formats`` target_id_type (str): The name type to translate to, more or less the same values are available as for ``id_type``. ncbi_tax_id (int): NCBI Taxonomy ID of the organism. strict (bool): In case a Gene Symbol can not be translated, try to add number "1" to the end, or try to match only its first five characters. This option is rarely used, but it makes possible to translate some non-standard gene names typically found in old, unmaintained resources. expand_complexes (bool): When encountering complexes, translated the IDs of its components and return a set of IDs. The alternative behaviour is to return the `Complex` objects. uniprot_cleanup (bool): When the `target_id_type` is UniProt ID, call the `Mapper.uniprot_cleanup` function at the end. """ mapper = get_mapper() return mapper.map_names( names = names, id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, strict = strict, expand_complexes = expand_complexes, uniprot_cleanup = uniprot_cleanup, )
[docs] def label(name, id_type = None, entity_type = None, ncbi_tax_id = 9606): """ For any kind of entity, either protein, miRNA or protein complex, returns the preferred human readable label. For proteins this means Gene Symbols, for miRNAs miRNA names, for complexes a series of Gene Symbols. """ mapper = get_mapper() return mapper.label( name = name, id_type = id_type, entity_type = entity_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def guess_type(name, entity_type = None): """ From a string, tries to guess the ID type and optionally the entity type. Returns a tuple of strings: ID type and entity type. """ mapper = get_mapper() return mapper.guess_type(name = name, entity_type = entity_type)
[docs] def id_from_label(label, label_id_type = 'genesymbol', ncbi_tax_id = None): """ For a label (e.g. Gene Symbol) returns the corresponding IDs (e.g. UniProt IDs). """ mapper = get_mapper() return mapper.id_from_label( label = label, label_id_type = label_id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def id_from_label0(label, label_id_type = 'genesymbol', ncbi_tax_id = None): """ For a label (e.g. Gene Symbol) returns a single ID (e.g. UniProt IDs). """ mapper = get_mapper() return mapper.id_from_label0( label = label, label_id_type = label_id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def translation_dict( id_type: str, target_id_type: str, ncbi_tax_id: int | None = None, ) -> MappingTable | None: """ Identifier translation table as a dict of sets. """ mapper = get_mapper() return mapper.translation_dict( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def translation_df( id_type: str, target_id_type: str, ncbi_tax_id: int | None = None, ) -> MappingTable | None: """ Identifier translation table as a `pandas.DataFrame`. """ mapper = get_mapper() return mapper.translation_df( id_type = id_type, target_id_type = target_id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def mapping_tables() -> list[MappingTableDefinition]: """ A list of built-in mapping tables. If `id_type_b` is `None`, that means translation to all other ID types provided by the same resource is possible. """ return get_mapper().mapping_tables()
[docs] def id_types() -> list[IdType]: """ Identifier types with their labels. """ return get_mapper().id_types()