Source code for pypath.utils.reflists

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

from future.utils import iteritems

import os
import json
import datetime
import time

try:
    import cPickle as pickle

except ImportError:
    import pickle

import timeloop

import pypath.inputs.uniprot_db as uniprot_input
import pypath.inputs.mirbase as mirbase_input
import pypath.share.common as common
import pypath.share.session as session_mod
import pypath.share.settings as settings
import pypath.share.cache as cache_mod


# method names for ID types
inputs = {
    'uniprot': 'all_uniprots',
    'swissprot': 'all_swissprots',
    'trembl': 'all_trembls',
    'mirbase': 'mirbase_mature_all',
    'mir-pre': 'mirbase_precursor_all',
}


_reflists_cleanup_timeloop = timeloop.Timeloop()
_reflists_cleanup_timeloop.logger.setLevel(9999)


[docs] class ReferenceListManager(session_mod.Logger):
[docs] def __init__(self, cleanup_period = 10, lifetime = 300): session_mod.Logger.__init__(self, name = 'reflists') @_reflists_cleanup_timeloop.job( interval = datetime.timedelta( seconds = cleanup_period ) ) def _cleanup(): self._remove_expired() _reflists_cleanup_timeloop.start(block = False) self.lifetime = lifetime self.lists = {} self.expiry = {} self.cachedir = cache_mod.get_cachedir() self._log('ReferenceListManager has been created.')
def which_list(self, id_type, ncbi_tax_id = None): ncbi_tax_id = ncbi_tax_id or settings.get('default_organism') key = (id_type, ncbi_tax_id) self.expiry[key] = time.time() if key not in self.lists: self.load(key) if key in self.lists: return self.lists[key] def load(self, key): cachefile = 'reflist_%s_%u.pickle' % key cachefile = os.path.join(self.cachedir, cachefile) if os.path.exists(cachefile): self.lists[key] = pickle.load(open(cachefile, 'rb')) self._log( 'Reference list for ID type `%s` for organism `%u` ' 'has been loaded from `%s`.' % (key + (cachefile,)) ) else: self.lists[key] = self._load(key) pickle.dump(self.lists[key], open(cachefile, 'wb')) self._log( 'Reference list for ID type `%s` for organism `%u` ' 'has been saved to `%s`.' % (key + (cachefile,)) ) def _load(self, key): data = set() input_method = inputs[key[0]] if os.path.exists(input_method): with open(input_method, 'r') as fp: data = {l.strip() for l in fp.readlines()} self._log( 'Reference list for ID type `%s` for organism `%u` has ' 'been loaded from `%s`.' % (key + (input_method,)) ) else: if hasattr(uniprot_input, input_method): input_func = getattr(uniprot_input, input_method) elif hasattr(mirbase_input, input_method): input_func = getattr(mirbase_input, input_method) ncbi_tax_id = key[1] data = set(input_func(organism = ncbi_tax_id)) self._log( 'Reference list for ID type `%s` for organism `%u` has ' 'been loaded by method `%s`.' % (key + (str(input_method),)) ) return data
[docs] def check(self, name, id_type, ncbi_tax_id = None): """ Checks if the identifier ``name`` is in the reference list with the provided ``id_type`` and organism. """ lst = self.which_list(id_type = id_type, ncbi_tax_id = ncbi_tax_id) return name in lst
[docs] def select(self, names, id_type, ncbi_tax_id = None): """ Selects the identifiers in ``names`` which are in the reference list with the provided ``id_type`` and organism. """ names = set(names) lst = self.which_list(id_type = id_type, ncbi_tax_id = ncbi_tax_id) return names & lst
[docs] def is_not(self, names, id_type, ncbi_tax_id = None): """ Returns the identifiers from ``names`` which are not instances of the provided ``id_type`` and from the given organism. """ names = set(names) lst = self.which_list(id_type = id_type, ncbi_tax_id = ncbi_tax_id) return names - lst
def _remove_expired(self): for key, last_used in list(self.expiry.items()): if time.time() - last_used > self.lifetime and key in self.lists: del self.lists[key] del self.expiry[key] def __del__(self): if hasattr(_reflists_cleanup_timeloop, 'stop'): _reflists_cleanup_timeloop.stop()
[docs] def init(): globals()['manager'] = ReferenceListManager()
[docs] def get_manager(): if 'manager' not in globals(): init() return globals()['manager']
[docs] def check(name, id_type, ncbi_tax_id = None): """ Checks if the identifier ``name`` is in the reference list with the provided ``id_type`` and organism. """ manager = get_manager() return manager.check( name = name, id_type = id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def select(names, id_type, ncbi_tax_id = None): """ Selects the identifiers in ``names`` which are in the reference list with the provided ``id_type`` and organism. """ manager = get_manager() return manager.select( names = names, id_type = id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def is_not(names, id_type, ncbi_tax_id = None): """ Returns the identifiers from ``names`` which are not instances of the provided ``id_type`` and from the given organism. """ manager = get_manager() return manager.is_not( names = names, id_type = id_type, ncbi_tax_id = ncbi_tax_id, )
[docs] def get_reflist(id_type, ncbi_tax_id = None): manager = get_manager() return manager.which_list(id_type = id_type, ncbi_tax_id = ncbi_tax_id)