#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the `pypath` python module
#
# Copyright 2014-2023
# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
# Authors: see the file `README.rst`
# Contact: Dénes Türei (turei.denes@gmail.com)
#
# Distributed under the GPLv3 License.
# See accompanying file LICENSE.txt or copy at
# https://www.gnu.org/licenses/gpl-3.0.html
#
# Website: https://pypath.omnipathdb.org/
#
from __future__ import annotations
from typing import Literal
import os
import sys
import shutil
import importlib as imp
import time
import pprint
import copy
import collections
import itertools
import pypath.resources.network as netres
from pypath.core import annot
from pypath.core import intercell
from pypath.core import complex
from pypath.core import enz_sub
from pypath.core import network
from pypath.share import session as session_mod
import pypath.share.settings as settings
import pypath.share.common as common
[docs]
class DatabaseManager(session_mod.Logger):
"""
Builds and serves the databases in OmniPath such as various networks,
enzyme-substrate interactions, protein complexes, annotations and
inter-cellular communication roles. Saves the databases to and loads
them from pickle dumps on demand.
"""
[docs]
def __init__(self, rebuild = False, **kwargs):
session_mod.Logger.__init__(self, name = 'omnipath.dbmanager')
self.timestamp = time.strftime(settings.get('timestamp_format'))
self.param = kwargs
self.rebuild = rebuild
self.datasets = self.get_param('datasets')
self.ensure_dirs()
self.network_dfs = {}
self._log('The OmniPath database manager has been initialized.')
[docs]
def reload(self):
"""
Reloads the object from the module level.
"""
modname = self.__class__.__module__
mod = __import__(modname, fromlist = [modname.split('.')[0]])
imp.reload(mod)
new = getattr(mod, self.__class__.__name__)
setattr(self, '__class__', new)
self.foreach_dataset(method = self.reload_module)
[docs]
def build(self):
"""
Builds all built-in datasets.
"""
self._log(
'Building databases. Rebuild forced: %s.' % str(self.rebuild)
)
self.foreach_dataset(method = self.ensure_dataset)
[docs]
def ensure_dataset(
self,
dataset,
force_reload = False,
force_rebuild = False,
ncbi_tax_id = 9606,
):
"""
Makes sure a dataset is loaded. It loads only if it's not loaded
yet or :py:arg:`force_reload` is ``True``. It only builds if it's
not availabe as a pickle dump or :py:arg:`force_rebuild` is ``True``.
:arg str dataset:
The name of the dataset.
:arg int ncbi_tax_id:
NCBI Taxonomy ID of the organism. Considered only if the dataset
builds for one organism and saved to organism specific pickle
files.
"""
for dep_dataset in self.dataset_dependencies(dataset):
self.ensure_dataset(dep_dataset)
rebuild_dataset = self.get_param('rebuild_%s' % dataset)
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
if (
force_reload or
force_rebuild or
not hasattr(self, _dataset)
):
if (
force_rebuild or
self.rebuild or
rebuild_dataset or
not self.pickle_exists(dataset, ncbi_tax_id = ncbi_tax_id)
):
self.remove_db(dataset, ncbi_tax_id = ncbi_tax_id)
self.build_dataset(dataset, ncbi_tax_id = ncbi_tax_id)
elif (
not hasattr(self, _dataset) or
force_reload
):
self.load_dataset(dataset, ncbi_tax_id = ncbi_tax_id)
[docs]
def dataset_dependencies(self, dataset):
"""
Returns the dependencies of a dataset. E.g. to build `annotations`
`complexes` must be loaded hence the former is dependent on the
latter.
"""
deps = self.get_param('dependencies')
return deps[dataset] if dataset in deps else ()
[docs]
def ensure_dirs(self):
"""
Checks if the directories for tables, figures and pickles exist and
creates them if necessary.
"""
if self.get_param('timestamp_dirs'):
self.tables_dir = os.path.join(
self.get_param('tables_dir'),
self.timestamp
)
self.figures_dir = os.path.join(
self.get_param('figures_dir'),
self.timestamp,
)
settings.setup(
tables_dir = self.tables_dir,
figures_dir = self.figures_dir,
)
os.makedirs(self.get_param('pickle_dir'), exist_ok = True)
for _dir in ('pickle', 'tables', 'figures'):
path = self.get_param('%s_dir' % _dir)
self._log(
'%s directory: `%s` (exists: %s).' % (
_dir.capitalize(),
path,
'yes' if os.path.exists(path) else 'no',
)
)
[docs]
def pickle_path(self, dataset, ncbi_tax_id = 9606):
"""
Returns the path of the pickle dump for a dataset according to
the current settings.
"""
pickle_fname = (
self.get_param('%s_pickle' % dataset) or
'%s.pickle' % dataset
)
if dataset == 'enz_sub':
pickle_fname = pickle_fname % ncbi_tax_id
return os.path.join(
self.get_param('pickle_dir'),
pickle_fname,
)
[docs]
def pickle_exists(self, dataset, ncbi_tax_id = 9606):
"""
Tells if a pickle dump of a particular dataset exists.
"""
return os.path.exists(
self.pickle_path(dataset, ncbi_tax_id = ncbi_tax_id)
)
[docs]
def table_path(self, dataset):
"""
Returns the full path for a table (to be exported or imported).
"""
return os.path.join(
self.get_param('tables_dir'),
self.get_param('%s_tsv' % dataset),
)
[docs]
def build_dataset(self, dataset, ncbi_tax_id = 9606):
"""
Builds a dataset.
"""
self._log('Building dataset `%s`.' % dataset)
args = self.get_build_args(dataset)
self._log('Build param: [%s].' % common.dict_str(args))
mod = self.ensure_module(dataset)
if dataset == 'enz_sub':
args['ncbi_tax_id'] = ncbi_tax_id
if hasattr(mod, 'db'):
delattr(mod, 'db')
db = mod.get_db(**args)
pickle_path = self.pickle_path(dataset, ncbi_tax_id = ncbi_tax_id)
old_pickle_path = '%s.old' % pickle_path
if os.path.exists(pickle_path):
shutil.move(pickle_path, old_pickle_path)
self._log('Saving dataset `%s` to `%s`.' % (dataset, pickle_path))
try:
db.save_to_pickle(pickle_file = pickle_path)
if os.path.exists(old_pickle_path):
os.remove(old_pickle_path)
self._log(
'Saved dataset `%s` to `%s`.' % (
dataset,
pickle_path
)
)
except Exception as e:
exc = sys.exc_info()
self._log_traceback()
os.remove(pickle_path)
self._log(
'Failed to save dataset `%s` to `%s`. '
'The dataset is currently loaded. '
'Try restart Python and re-build the dataset. '
'If the issue persists please report it.' % (
dataset,
pickle_path,
)
)
if os.path.exists(old_pickle_path):
self._log('Restoring the old version of `%s`.' % pickle_path)
shutil.move(old_pickle_path, pickle_path)
self._log('Successfully built dataset `%s`.' % dataset)
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
setattr(self, _dataset, db)
self._add_network_df(dataset, ncbi_tax_id = ncbi_tax_id)
[docs]
def ensure_module(self, dataset, reset = True):
"""
Makes sure the module providing a particular dataset is available
and has no default database loaded yet (:py:attr:`db` attribute
of the module).
"""
mod_str = self.get_param('%s_mod' % dataset)
mod = sys.modules['pypath.core.%s' % mod_str]
if reset and hasattr(mod, 'db'):
delattr(mod, 'db')
return mod
[docs]
def reload_module(self, dataset):
"""
Reloads the module of the database object of a particular dataset.
E.g. in case of network datasets the ``pypath.network`` module
will be reloaded.
"""
mod = self.ensure_module(dataset, reset = False)
imp.reload(mod)
if hasattr(mod, 'db'):
mod.db.reload()
[docs]
def get_build_args(self, dataset):
"""
Retrieves the default database build parameters for a dataset.
"""
args = self.get_param('%s_args' % dataset) or {}
if hasattr(self, 'get_args_%s' % dataset):
args.update(getattr(self, 'get_args_%s' % dataset)())
return args
[docs]
def load_dataset(self, dataset, ncbi_tax_id = 9606):
"""
Loads a dataset, builds it if no pickle dump is available.
"""
pickle_path = self.pickle_path(dataset, ncbi_tax_id = ncbi_tax_id)
self._log('Loading dataset `%s` from `%s`.' % (dataset, pickle_path))
mod = self.ensure_module(dataset)
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
setattr(self, _dataset, mod.get_db(pickle_file = pickle_path))
self._log('Loaded dataset `%s` from `%s`.' % (dataset, pickle_path))
self._add_network_df(dataset, ncbi_tax_id = ncbi_tax_id)
def _dataset_taxid(self, dataset, ncbi_tax_id = 9606):
return '%s%s' % (
dataset,
'_%u' % ncbi_tax_id if dataset == 'enz_sub' else '',
)
# TODO
# the get_args_* methods below will be replaced by the
# pypath.omnipath.databases module
[docs]
def get_args_curated(self):
"""
Returns the arguments for building the curated PPI network dataset.
"""
resources = copy.deepcopy(netres.pathway)
resources.update(copy.deepcopy(netres.enzyme_substrate))
return {'resources': resources}
[docs]
def get_args_tf_target(self):
"""
Returns the arguments for building the TF-target network dataset.
"""
transcription = (
netres.dorothea_expand_levels(
resources = netres.transcription,
levels = self.get_param('tfregulons_levels'),
)
if self.get_param('dorothea_expand_levels') else
netres.transcription
)
return {'resources': transcription}
[docs]
def get_args_tf_mirna(self):
"""
Returns the arguments for building the TF-miRNA network dataset.
"""
return {'resources': netres.tf_mirna}
[docs]
def get_args_mirna_mrna(self):
"""
Returns the arguments for building the miRNA-mRNA network dataset.
"""
return {'resources': netres.mirnatarget}
[docs]
def get_args_lncrna_mrna(self):
"""
Returns the arguments for building the lncRNA-mRNA network dataset.
"""
return {'resources': netres.lncrna_mrna}
[docs]
def get_args_small_molecule(self):
"""
Returns the arguments for building the small molecule-protein
network dataset.
"""
return {'resources': netres.small_molecule}
[docs]
def compile_tables(self):
"""
Compiles the `summaries` table for all datasets. These tables contain
various quantitative descriptions of the data contents.
"""
self.foreach_dataset(method = self.compile_table)
[docs]
def compile_table(self, dataset):
"""
Compiles the `summaries` table for a dataset. These tables contain
various quantitative descriptions of the data contents.
"""
table_path = self.table_path(dataset)
db = self.get_db(dataset)
db.update_summaries()
db.summaries_tab(outfile = table_path)
[docs]
def foreach_dataset(self, method):
"""
Applies a method for each dataset.
"""
for dataset in self.datasets:
_ = method(dataset)
[docs]
def get_db(self, dataset, ncbi_tax_id = 9606):
"""
Returns a dataset object. Loads and builds the dataset if necessary.
:arg int ncbi_tax_id:
NCBI Taxonomy ID of the organism. Considered only if the dataset
builds for one organism and saved to organism specific pickle
files.
"""
self.ensure_dataset(dataset, ncbi_tax_id = ncbi_tax_id)
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
return getattr(self, _dataset)
[docs]
def remove_db(self, dataset, ncbi_tax_id = 9606):
"""
Removes a dataset. Deletes the references to the object
in the module, however if you have references elsewhere in your
code it remains in the memory.
"""
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
if hasattr(self, _dataset):
delattr(self, _dataset)
[docs]
def remove_all(self):
"""
Removes all loaded datasets. Deletes the references to the objects
in the module, however if you have references elsewhere in your
code they remain in the memory.
"""
self.foreach_dataset(method = self.ensure_module)
self.foreach_dataset(method = self.remove_db)
[docs]
def get_param(self, key):
"""
Retrieves a parameter from the :py:attr:`param` dict of the current
object or from the module settings.
"""
return self.param.get(key, settings.get(key))
def _create_network_df(self, dataset = 'omnipath', **kwargs):
graph = self.get_db(dataset)
return self._network_df(graph, **kwargs)
[docs]
def network_df(self, dataset, by_source = False):
"""
Creates a data frame of a network dataset where rows aggregate
information from all resources describing an interaction.
"""
self.ensure_dataset(dataset)
by_source_str = 'by_source' if by_source else 'plain'
return self.network_dfs[dataset][by_source_str]
[docs]
def network_df_by_source(self, dataset = 'omnipath'):
"""
Creates a data frame of a network dataset where each row contains
information from one resource.
"""
self.ensure_dataset(dataset)
return self.network_dfs[dataset]['by_source']
def _network_df(self, obj, **kwargs):
if not isinstance(obj, network.Network):
obj = network.Network.from_igraph(obj)
obj.make_df(**kwargs)
return obj.df
def _add_network_df(self, dataset, ncbi_tax_id = 9606):
_dataset = self._dataset_taxid(dataset, ncbi_tax_id = ncbi_tax_id)
obj = getattr(self, _dataset)
if (
(
hasattr(obj, 'graph') and
hasattr(obj.graph, 'es')
) or
isinstance(obj, network.Network)
):
network_df = self._network_df(obj, by_source = False)
network_df_by_source = self._network_df(obj, by_source = True)
self.network_dfs[dataset] = {}
self.network_dfs[dataset]['plain'] = network_df
self.network_dfs[dataset]['by_source'] = network_df_by_source
self._log('Created network data frames for `%s`.' % dataset)
[docs]
def set_network(self, dataset, by_source = False, **kwargs):
"""
Sets dataset as the default
"""
network_df = self.network_df(dataset, by_source = by_source, **kwargs)
self.ensure_dataset('intercell')
self.intercell.register_network(network_df)
[docs]
def define_dataset(
self,
name: str,
module: Literal[
'annot',
'complex',
'enz_sub',
'intercell',
'network',
],
args: dict | None = None,
pickle: str | None = None,
**param,
):
"""
Add a new dataset definition.
Args
name:
Arbitrary name for the dataset.
module:
A database builder module: this determines the type of the
dataset.
args:
Arguments for the database provider method (typically
called ``get_db``) of the above module.
pickle:
A name for the pickle file, if not provided it will be
named as "<name>_<module>.pickle".
param:
Further parameters, saved directly into the :attr:``param``
dict of this object, however the three arguments above
override values provided this way.
"""
settings.setup(datasets = setting.get('datasets') + [name])
param[f'{name}_pickle'] = pickle or f'{name}_{module}.pickle'
param[f'{name}_mod'] = module
param[f'{name}_args'] = args
self.param.update(param)