Source code for pypath.utils.taxonomy

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from __future__ import annotations

from future.utils import iteritems

import time
import datetime
import itertools

import timeloop

import pypath.share.common as common
import pypath.share.session as session
import pypath.share.settings as settings
import pypath_common._constants as _const
import pypath.inputs.uniprot as uniprot_input
import pypath.inputs.ensembl as ensembl_input

_logger = session.Logger(name = 'taxonomy')
_log = _logger._log

db = {}
_cleanup_period = settings.get('mapper_cleanup_interval')
_lifetime = 300
_last_used = {}
NOT_ORGANISM_SPECIFIC = _const.NOT_ORGANISM_SPECIFIC

# XXX: Shouldn't we keep all functions and variables separated
#      (together among them)?
taxids = {
    9606: 'human',
    10090: 'mouse',
    10116: 'rat',
    9031: 'chicken',
    9913: 'cow',
    9986: 'rabbit',
    9940: 'sheep',
    10141: 'guinea pig',
    10036: 'hamster',
    7227: 'fruit fly',
    9615: 'dog',
    9823: 'pig',
    8355: 'frog',
    9091: 'quail',
    9796: 'horse',
    9925: 'goat',
    89462: 'water buffalo',
    9598: 'monkey',
    9103: 'turkey',
    9685: 'cat',
    7604: 'starfish',
    7609: 'spiny starfish',
    1213717: 'torpedo',
    9669: 'ferret',
    8839: 'duck',
    9593: 'gorilla',
    7460: 'honeybee',
    8407: 'european common frog',
    9544: 'rhesus macaque',
}

taxids2 = dict(
    (
        t.taxon_id,
        t.common_name.lower()
    )
    for t in ensembl_input.ensembl_organisms()
)

taxa = common.swap_dict_simple(taxids)
taxa2 = common.swap_dict_simple(taxids2)


taxa_synonyms = {
    'bovine': 'cow',
    'western gorilla': 'gorilla',
}


phosphoelm_taxids = {
    9606: 'Homo sapiens',
    10090: 'Mus musculus',
    9913: 'Bos taurus',
    9986: 'Oryctolagus cuniculus',
    9615: 'Canis familiaris',
    10029: 'Cricetulus griseus',
    9267: 'Didelphis virginiana',
    9031: 'Gallus gallus',
    10036: 'Mesocricetus auratus',
    9940: 'Ovis aries',
    10116: 'Rattus norvegicus',
    9823: 'Sus scrofa',
    8355: 'Xenopus laevis',
    10141: 'Cavia porcellus',
    9796: 'Equus caballus',
    7227: 'Drosophila melanogaster',
    487: 'Neisseria meningitidis',
    562: 'Escherichia coli',
    5207: 'Cryptococcus neoformans',
    470: 'Acinetobacter baumannii',
    1280: 'Staphylococcus aureus',
    4932: 'Saccharomyces cerevisiae',
    34: 'Myxococcus xanthus',
    1392: 'Bacillus anthracis',
    210: 'Helicobacter pylori',
    6239: 'Caenorhabditis elegans',
}


phosphoelm_taxids.update(
    [
        (
            t.taxon_id,
            t.scientific_name
        )
        for t in ensembl_input.ensembl_organisms()
    ]
)


dbptm_taxids = {
    9606: 'HUMAN',
    10090: 'MOUSE',
    7227: 'DROME',
    10116: 'RAT',
    559292: 'YEAST',
    284812: 'SCHPO',
    4081: 'SOLLC',
    3702: 'ARATH',
    9940: 'SHEEP',
    9913: 'BOVIN',
    9925: 'CAPHI',
    44689: 'DICDI',
    4577: 'MAIZE',
    9823: 'PIG',
    9615: 'CANLF',
    6239: 'CAEEL',
    8455: 'XENLA',
    83333: 'ECOLI',
    1891767: 'SV40',
}


mirbase_taxids = {
    9606: 'hsa',
    10090: 'mmu',
    10116: 'rno',
    7227: 'dme',
}


ensembl_taxids = dict(
    (
        t.taxon_id,
        t.ensembl_name
    )
    for t in ensembl_input.ensembl_organisms()
)


nonstandard_taxids = {
    'drosophila': 7227,
    'c.elegans': 6239,
    'xenopus': 8355,
    'Synechocystis_sp.': 1142,
}


[docs] def shorten_latin_name(name: str, dot: bool = True) -> str: """ For a complete latin name, returns its shortened version. In short latin names the genus name is marked only by its initial. """ if name: name = name.split() return f'{name[0][0].upper()}{"." if dot else ""} {"".join(name[1:])}'
[docs] def short_latin_names(long_names: dict[str, int]) -> dict[str, int]: """ For a dict of long latin names returns a dict with all names shortened. """ return { shorten_latin_name(k, dot = dot): v for k, v in long_names.items() for dot in (True, False) }
[docs] def ensure_common_name(taxon_id: str | int, lower: bool = False) -> str | None: """ Common English name of an organism. Args: taxon_id: Organism name or NCBI Taxonomy ID. lower: Return lowercase name. Default is capitalized. """ common_name = ( # priority for these common names taxids.get(taxon_id, None) or _ensure_name(taxon_id, 'common') ) return common_name.lower() if lower else common_name.capitalize()
[docs] def ensure_latin_name(taxon_id): return _ensure_name(taxon_id, 'latin')
[docs] def ensure_ensembl_name(taxon_id): return _ensure_name(taxon_id, 'ensembl')
def _ensure_name(taxon_id, name_type): ncbi_tax_id = ensure_ncbi_tax_id(taxon_id) ncbi_to_name = get_db('ncbi_to_%s' % name_type) if ncbi_tax_id in ncbi_to_name: return ncbi_to_name[ncbi_tax_id] _log( 'Could not find %s taxon name for `%s`.' % ( name_type, str(taxon_id), ) )
[docs] def taxid_from_common_name(taxon_name): if common.is_str(taxon_name): taxon_name = taxon_name.strip() taxon_name_l = taxon_name.lower() taxon_name_c = taxon_name.capitalize() if ( taxon_name is None or not taxon_name_l or taxon_name in {'none', 'unknown'} ): return None if taxon_name_l in taxa_synonyms: return taxid_from_common_name(taxa_synonyms[taxon_name_l]) if taxon_name_l in taxa: return taxa[taxon_name_l] if taxon_name_l in taxa2: return taxa2[taxon_name_l] common_to_ncbi = get_db('common') if taxon_name in common_to_ncbi: return common_to_ncbi[taxon_name] if taxon_name_c in common_to_ncbi: return common_to_ncbi[taxon_name_c]
[docs] def taxid_from_latin_name(taxon_name): if taxon_name in latin_name_to_ncbi_tax_id: return latin_name_to_ncbi_tax_id[taxon_name] if taxon_name in short_latin_name_to_ncbi_tax_id: return short_latin_name_to_ncbi_tax_id[taxon_name] latin_to_ncbi = get_db('latin') if taxon_name in latin_to_ncbi: return latin_to_ncbi[taxon_name]
[docs] def taxid_from_dbptm_taxon_name(taxon_name): if taxon_name in dbptm_to_ncbi_tax_id: return dbptm_to_ncbi_tax_id[taxon_name]
[docs] def taxid_from_nonstandard(taxon_name): if taxon_name in nonstandard_taxids: return nonstandard_taxids[taxon_name]
[docs] def taxid_from_ensembl_name(taxon_name): if taxon_name in ensembl_name_to_ncbi_tax_id: return ensembl_name_to_ncbi_tax_id[taxon_name]
[docs] def ensure_ncbi_tax_id(taxon_id): """ For taxon names of various formats returns NCBI Taxonomy ID if possible. Handles English names, scientific names and other common language synonyms and database specific codenames. """ if isinstance(taxon_id, int): return taxon_id else: if hasattr(taxon_id, 'strip'): taxon_id = taxon_id.strip() if common.is_str(taxon_id) and '(' in taxon_id: part0, part1 = taxon_id.split('(', maxsplit = 1) ncbi_tax_id = ( ensure_ncbi_tax_id(part0) or ensure_ncbi_tax_id(part1.split(')', maxsplit = 1)[0]) ) elif hasattr(taxon_id, 'isdigit') and taxon_id.isdigit(): ncbi_tax_id = int(taxon_id) else: ncbi_tax_id = ( taxid_from_dbptm_taxon_name(taxon_id) or taxid_from_nonstandard(taxon_id) or taxid_from_common_name(taxon_id) or taxid_from_latin_name(taxon_id) or taxid_from_ensembl_name(taxon_id) ) if not ncbi_tax_id: _log('Could not map to NCBI Taxonomy ID: `%s`.' % str(taxon_id)) return ncbi_tax_id
[docs] def uniprot_taxid(uniprot): """ For a UniProt ID returns its NCBI Taxonomy ID. """ uniprot_to_taxid = get_db('swissprot') if uniprot in uniprot_to_taxid: return uniprot_to_taxid[uniprot]
dbptm_to_ncbi_tax_id = common.swap_dict_simple(dbptm_taxids) latin_name_to_ncbi_tax_id = common.swap_dict_simple(phosphoelm_taxids) short_latin_name_to_ncbi_tax_id = short_latin_names(latin_name_to_ncbi_tax_id) ensembl_name_to_ncbi_tax_id = common.swap_dict_simple(ensembl_taxids) _cleanup_timeloop = timeloop.Timeloop() _cleanup_timeloop.logger.setLevel(9999) @_cleanup_timeloop.job( interval = datetime.timedelta( seconds = _cleanup_period ) ) def _cleanup(): keys = list(globals()['db'].keys()) for key in keys: if time.time() - globals()['_last_used'][key] > _lifetime: _remove(key) _cleanup_timeloop.start(block = False) def _remove(key): if key in globals()['db']: _logger._log( 'Removing taxonomy data `%s`.' % key ) del globals()['db'][key] if key in globals()['_last_used']: del globals()['_last_used'][key]
[docs] def get_db(key): if key not in globals()['db']: init_db(key) if key in globals()['db']: globals()['_last_used'][key] = time.time() return globals()['db'][key] else: return {}
[docs] def init_db(key): ncbi_data = uniprot_input.uniprot_ncbi_taxids_2() this_db = None swap = False _key = key if key.startswith('ncbi_to_'): swap = True _key = key.rsplit('_', maxsplit = 1)[-1] if _key == 'latin': this_db = dict( ( taxon.latin, taxon.ncbi_id, ) for taxon in ncbi_data.values() ) if not swap: this_db.update(short_latin_names(this_db)) elif _key == 'common': this_db = ( dict( ( k.capitalize(), v ) for k, v in itertools.chain( iteritems(taxa), iteritems(taxa2) ) ) ) this_db.update( dict( ( taxon.english, taxon.ncbi_id, ) for taxon in ncbi_data.values() if taxon.english ) ) elif _key == 'swissprot': uniprot_data = uniprot_input.uniprot_taxonomy() latin_to_ncbi = get_db('latin') this_db = dict( ( swissprot, latin_to_ncbi[name], ) for swissprot, names in iteritems(uniprot_data) for name in names if name in latin_to_ncbi ) elif _key == 'ensembl': this_db = ensembl_name_to_ncbi_tax_id if swap: this_db = common.swap_dict(this_db, force_sets = True) this_db = {k: min(v, key = len) for k, v in this_db.items()} else: this_db.update({k.lower(): v for k, v in this_db.items()}) if this_db: globals()['db'][key] = this_db globals()['_last_used'][key] = time.time()