#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of the `pypath` python module
#
# Copyright 2014-2023
# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
# Authors: see the file `README.rst`
# Contact: Dénes Türei (turei.denes@gmail.com)
#
# Distributed under the GPLv3 License.
# See accompanying file LICENSE.txt or copy at
# https://www.gnu.org/licenses/gpl-3.0.html
#
# Website: https://pypath.omnipathdb.org/
#
from __future__ import annotations
from future.utils import iteritems
from past.builtins import xrange, range, reduce
import os
import sys
import copy
import importlib as imp
import collections
import itertools
import traceback
import dill as pickle
import numpy as np
import pandas as pd
import pypath.inputs.cellphonedb as cellphonedb
import pypath.inputs.lrdb as lrdb
import pypath.inputs.uniprot_db as uniprot_db
import pypath.share.common as common
import pypath_common._constants as _const
import pypath.share.settings as settings
import pypath.utils.mapping as mapping
import pypath.utils.reflists as reflists
import pypath.utils.uniprot as utils_uniprot
import pypath.internals.resource as resource
import pypath.utils.go as go
import pypath.core.intercell_annot as intercell_annot
import pypath.core.common as core_common
import pypath.share.session as session_mod
import pypath.internals.annot_formats as annot_formats
import pypath.core.complex as complex
import pypath.internals.intera as intera
import pypath.core.entity as entity
#TODO this should be part of json files
protein_sources_default = {
'Dgidb',
'Membranome',
'Exocarta',
'Vesiclepedia',
'Matrisome',
'Surfaceome',
'CellSurfaceProteinAtlas',
'CellSurfaceProteinAtlasCellType',
'HumanPlasmaMembraneReceptome',
'Matrixdb',
'Locate',
'GOIntercell',
'CellPhoneDB',
'Ramilowski2015',
'Ramilowski2015Location',
'Kirouac2010',
'GuideToPharmacology',
'Adhesome',
'Integrins',
'Opm',
'Topdb',
'Hgnc',
'Zhong2015',
'HumanProteinAtlas',
'HumanProteinAtlasSubcellular',
'HumanProteinAtlasSecretome',
'Comppi',
'SignorPathways',
'SignalinkPathways',
'SignalinkFunctions',
'KeggPathways',
'KeggPathwaysPC',
'NetpathPathways',
'Cpad',
'Disgenet',
'Kinasedotcom',
'Phosphatome',
'Tfcensus',
'Intogen',
'CancerGeneCensus',
'Cancersea',
'Msigdb',
'Lrdb',
'Baccin2019',
'Almen2009',
'Phobius',
'Icellnet',
'Cellcellinteractions',
'Italk',
'Embrace',
'UniprotLocations',
'UniprotFamilies',
'UniprotTopologies',
'UniprotTissues',
'UniprotKeywords',
'Tcdb',
'Mcam',
'Gpcrdb',
'Celltalkdb',
'Cellchatdb',
'Connectomedb',
'Talklr',
'Humancellmap',
'Cellcall',
#'Biogps',
'Cellinker',
'Scconnect',
'Cancerdrugsdb',
'Progeny',
'Celltypist',
'Cytosig',
'Wang',
'Panglaodb',
'Lambert2018',
'InterPro',
}
#TODO this should be part of json files
complex_sources_default = {
'CellPhoneDBComplex',
'CorumFuncat',
'CorumGO',
'IcellnetComplex',
'CellchatdbComplex',
'CellinkerComplex',
'ScconnectComplex',
}
#TODO this should be part of json files
default_fields = {
'Matrisome': ('mainclass', 'subclass'),
'Locate': ('location',),
'Vesiclepedia': ('vesicle',),
'Exocarta': ('vesicle',),
'Ramilowski_location': ('location',),
'HPA': ('tissue', 'level'),
'CellPhoneDB': (
'receptor',
'adhesion',
'cytoplasm',
'peripheral',
'secretion',
'secreted',
'transporter',
'transmembrane',
'extracellular',
),
'CellPhoneDB_Complex': (
'receptor',
'adhesion',
'cytoplasm',
'peripheral',
'secretion',
'secreted',
'transporter',
'transmembrane',
'extracellular',
),
'Cpad': (
'cancer',
'effect_on_cancer',
),
'Disgenet': (
'disease',
),
}
[docs]
class CustomAnnotation(session_mod.Logger):
[docs]
def __init__(
self,
class_definitions = None,
excludes = None,
excludes_extra = None,
build = True,
pickle_file = None,
annotdb_pickle_file = None,
composite_resource_name = None,
):
"""
:param tuple class_definitions:
A series of annotation class definitions, each represented by
an instance of ``pypath.internals.annot_formats.AnnotDef``.
These definitions carry the attributes and instructions to
populate the classes.
:param dict excludes:
A dict with parent category names (strings) or category keys
(tuples) as keys and sets if identifiers as values.
The identifiers in this dict will be excluded from all the
respective categories while building the database. E.g. if
the UniProt ID `P00533` (EGFR) is in the set under the key of
`adhesion` it will be excluded from the category `adhesion` and
all it's direct children.
:param dict excludes_extra:
Same kind of dict as `excludes` but it will be added to the
built-in default. The built in and the provided extra sets
will be merged. If you want to overwrite or modify the built-in
sets provide your custom dict as `excludes`.
:param bool build:
Execute the build upon instantiation or set up an empty object
the build can be executed on later.
"""
if not hasattr(self, '_log_name'):
session_mod.Logger.__init__(self, name = 'annot')
self.pickle_file = pickle_file
self.annotdb_pickle_file = annotdb_pickle_file
self._class_definitions_provided = class_definitions
self._excludes_original = excludes or {}
self._excludes_extra_original = excludes_extra or {}
self.network = None
self.classes = {}
self.consensus_scores = {}
self.composite_numof_resources = {}
self.composite_resource_name = (
composite_resource_name or
settings.get('annot_composite_database_name')
)
if build:
self.load()
[docs]
def reload(self):
"""
Reloads the object from the module level.
"""
imp.reload(core_common)
modname = self.__class__.__module__
mod = __import__(modname, fromlist = [modname.split('.')[0]])
imp.reload(mod)
new = getattr(mod, self.__class__.__name__)
setattr(self, '__class__', new)
imp.reload(annot_formats)
new_annotkey = annot_formats.AnnotDefKey
new_annotgroup = annot_formats.AnnotationGroup
for key, cls in iteritems(self.classes):
key.__class__ = new_annotkey
cls.__class__ = new_annotgroup
def load(self):
if self.pickle_file and os.path.exists(self.pickle_file):
self.load_from_pickle(pickle_file = self.pickle_file)
else:
self.pre_build()
self.build()
self.post_load()
def pre_build(self):
self.update_excludes()
def build(self):
self.ensure_annotdb()
self._class_definitions = {}
self.add_class_definitions(self._class_definitions_provided or {})
self.classes = {}
self.populate_classes()
def post_load(self):
pass
def update_excludes(self):
self._excludes = collections.defaultdict(set)
for label, group in itertools.chain(
iteritems(self._excludes_original),
iteritems(self._excludes_extra_original),
):
self._excludes[label].update(group)
self._excludes = dict(self._excludes)
def ensure_annotdb(self):
self.annotdb = get_db(pickle_file = self.annotdb_pickle_file)
def add_class_definitions(self, class_definitions):
class_definitions = copy.deepcopy(class_definitions)
if not isinstance(class_definitions, dict):
class_definitions = dict(
(
classdef.key,
classdef
) for classdef in class_definitions
)
self._class_definitions.update(class_definitions)
self.update_parents()
[docs]
def update_parents(self):
"""
Creates a dict :py:attr:``children`` with parent class names as keys
and sets of children class keys as values. Also a dict
:py:attr:``parents`` with children class keys as keys and parent
class keys as values.
"""
children = collections.defaultdict(set)
parents = collections.defaultdict(set)
collect_parents = collections.defaultdict(set)
# collecting the potential parents
for key, classdef in iteritems(self._class_definitions):
if classdef.source == 'composite':
collect_parents[classdef.name].add(key)
# assigning children to parents
for key, classdef in iteritems(self._class_definitions):
parent = key[1]
if parent in collect_parents:
for parent_key in collect_parents[parent]:
children[parent_key].add(key)
parents[key].add(parent_key)
parents[key[0]].add(parent_key)
parents[(key[0], key[1])].add(parent_key)
parents[(key[0], key[2])].add(parent_key)
parents[key[2]].add(parent_key)
self.children = dict(children)
self.parents = dict(parents)
[docs]
def populate_classes(self, update = False):
"""
Creates a classification of proteins according to the custom
annotation definitions.
"""
if self.pickle_file:
self.load_from_pickle(pickle_file = self.pickle_file)
return
for classdef in self._class_definitions.values():
if classdef.key not in self.classes or update:
self.create_class(classdef)
self.populate_scores()
[docs]
def populate_scores(self):
"""
Creates the consensus score dictionaries based on the number of
resources annotating an entity for each composite category.
"""
for classdef in self._class_definitions.values():
if classdef.source != 'composite':
continue
components = self._execute_operation(
classdef.resource,
execute = False,
only_generic = True,
)
name = classdef.name
#components = self._collect_by_parent(
#classdef.resource,
#only_generic = True,
#)
n_resources = len(components)
n_resources_by_entity = dict(
collections.Counter(
itertools.chain(*components)
)
)
self.composite_numof_resources[name] = n_resources
self.consensus_scores[name] = n_resources_by_entity
def load_from_pickle(self, pickle_file):
self._log('Loading from pickle `%s`.' % pickle_file)
with open(pickle_file, 'rb') as fp:
(
self.classes,
self.consensus_scores,
self.composite_numof_resources,
self.parents,
self.children,
self.composite_resource_name,
self._class_definitions,
self._excludes,
) = pickle.load(fp)
self._update_complex_attribute_classes()
self._log('Loaded from pickle `%s`.' % pickle_file)
def save_to_pickle(self, pickle_file):
self._log('Saving to pickle `%s`.' % pickle_file)
self._update_complex_attribute_classes()
with open(pickle_file, 'wb') as fp:
pickle.dump(
obj = (
self.classes,
self.consensus_scores,
self.composite_numof_resources,
self.parents,
self.children,
self.composite_resource_name,
self._class_definitions,
self._excludes,
),
file = fp,
protocol = pickle.HIGHEST_PROTOCOL,
)
self._log('Saved to pickle `%s`.' % pickle_file)
def _update_complex_attribute_classes(self):
complex.ComplexAggregator._update_complex_attribute_classes_static(
self.classes.keys(),
mod = sys.modules[__name__],
)
[docs]
def create_class(self, classdef, override = False):
"""
Creates a category of entities by processing a custom definition.
"""
if classdef.enabled or override:
self.classes[classdef.key] = self.process_annot(classdef)
[docs]
def process_annot(self, classdef):
"""
Processes an annotation definition and returns a set of identifiers.
"""
members = set()
if not classdef.enabled:
return members
self._log(
'Processing custom annotation definition '
'`%s` (parent: `%s`, resource: `%s`).' % classdef.key
)
if isinstance(classdef.resource, set):
members = classdef.resource
elif isinstance(classdef.resource, str):
if classdef.resource in self.annotdb.annots:
if not classdef.args:
members = (
self.annotdb.annots[classdef.resource].to_set()
)
else:
members = (
self.annotdb.annots[classdef.resource].select(
**classdef.args
)
)
# Automatically include direct complex annotations
cplex_resource = '%s_complex' % classdef.resource
if cplex_resource in self.annotdb.annots:
classdef_args = classdef._asdict()
classdef_args['resource'] = cplex_resource
cplex_classdef = annot_formats.AnnotDef(**classdef_args)
members.update(
self.process_annot(cplex_classdef)
)
elif (
classdef.resource.startswith('~') or
classdef.resource.startswith('#')
):
members = self._execute_operation(
annot_formats.AnnotOp(annots = classdef.resource)
)
else:
self._log('Resource not found: %s' % classdef.resource)
elif callable(classdef.resource):
members = classdef.resource(**(classdef.args or {}))
elif isinstance(classdef.resource, annot_formats.AnnotOp):
members = self._execute_operation(classdef.resource)
for avoid in classdef.avoid:
op = annot_formats.AnnotOp(
annots = (
members,
self.select(avoid)
),
op = set.difference,
)
members = self._execute_operation(op)
for limit in classdef.limit:
op = annot_formats.AnnotOp(
annots = (
members,
self.select(limit)
),
op = set.intersection,
)
members = self._execute_operation(op)
if classdef.exclude:
members = members - classdef.exclude
if classdef.parent in self._excludes:
members = members - self._excludes[classdef.parent]
if classdef.key in self._excludes:
members = members - self._excludes[classdef.key]
transmitter, receiver = self._get_transmitter_receiver(classdef)
self._log(
'Finished processing custom annotation definition '
'`%s` (parent: `%s`, resource: `%s`). Resulted a set of %u '
'entities.' % (classdef.key + (len(members),))
)
return annot_formats.AnnotationGroup(
members = members,
name = classdef.name,
parent = classdef.parent,
aspect = classdef.aspect,
resource = classdef.resource_name, # the actual database name
scope = classdef.scope,
source = classdef.source, # resource_specific / composite
transmitter = transmitter,
receiver = receiver,
)
def _execute_operation(self, annotop, execute = True, **kwargs):
"""
Executes a set operation on anntation sets.
"""
if self._is_short_notation(annotop):
annots = self._collect_by_parent(annotop, **kwargs)
op = set.union
elif self._is_short_notation(annotop.annots):
annots = self._collect_by_parent(annotop.annots, **kwargs)
op = annotop.op
else:
annots = tuple(
self.select(_annot, execute = execute, **kwargs)
for _annot in annotop.annots
if (
not hasattr(_annot, 'enabled') or
_annot.enabled
)
)
annots = tuple(itertools.chain(*(
(a,) if isinstance(a, set) else a
for a in annots
)))
op = annotop.op
if execute:
annots = op(*(
a if isinstance(a, set) else set(a)
for a in annots
))
return annots
def _collect_by_parent(self, parent, only_generic = False):
"""
Processes the shorthand (single string) notation
`[#name]~parent[~resource]`.
Returns tuple of sets.
"""
name, parent, resource = self._process_short_notation(parent)
return tuple(
self.select(classdef.key)
for classdef in self._class_definitions.values()
if (
classdef.parent == parent and
(
not resource or
classdef.resource_name == resource
) and
classdef.enabled and
not (
classdef.name == classdef.parent and (
classdef.source == 'composite' or
classdef.resource_name == resource
)
) and
(
not only_generic or
classdef.scope == 'generic'
)
)
)
@staticmethod
def _process_short_notation(shortdef):
"""
Extracts name, parent and resource froms the shorthand (single string)
notation `[#name]~parent[~resource]`.
"""
parent = shortdef
name = None
resource = None
if parent.startswith('#'):
name, parent = parent.split('~', maxsplit = 1)
name = name.strip('#')
parent = parent.strip('~')
parent_resource = parent.split('~')
if len(parent_resource) == 2:
parent, resource = parent_resource
return name, parent, resource
@staticmethod
def _is_short_notation(obj):
return (
isinstance(obj, str) and (
obj.startswith('~') or
obj.startswith('#')
)
)
def _get_transmitter_receiver(self, classdef):
transmitter = classdef.transmitter
receiver = classdef.receiver
if transmitter is None or receiver is None:
name, parent, resource = classdef.key
for key, parentdef in iteritems(self._class_definitions):
if (
parentdef.name == parent and
(
parentdef.source == 'composite' or
parentdef.resource == self.composite_resource_name
)
):
transmitter = (
transmitter
if transmitter is not None else
parentdef.transmitter
)
receiver = (
receiver
if receiver is not None else
parentdef.receiver
)
break
return transmitter, receiver
def _select(
self,
name,
parent = None,
resource = None,
entity_type = None,
execute = True,
**kwargs
):
"""
Retrieves a class by its name and loads it if hasn't been loaded yet
but the name present in the class definitions.
"""
selected = None
if self._is_short_notation(name):
annots = self._collect_by_parent(name, **kwargs)
annots = tuple(
a if isinstance(a, set) else set(a)
for a in annots
)
selected = set.union(*annots) if execute else annots
else:
if isinstance(name, tuple):
name, parent, resource = name
if not parent or not resource:
if not parent:
parent = self.get_parent(name = name, resource = resource)
parent = parent.name if parent else None
if not resource:
resource = self.get_resource(name = name, parent = parent)
key = annot_formats.AnnotDefKey(name, parent, resource)
if key not in self.classes and key in self._class_definitions:
self.create_class(self._class_definitions[key])
if key in self.classes:
selected = self.classes[key]
if selected is not None:
return self._filter_entity_type(
selected,
entity_type = entity_type,
)
self._log(
'No such annotation class: `name=%s, '
'parent=%s, resource=%s`' % key
)
[docs]
def select(
self,
definition,
parent = None,
resource = None,
entity_type = None,
**kwargs
):
"""
Retrieves a class by its name or definition. The definition can be
a class name (string) or a set of entities, or an AnnotDef object
defining the contents based on original resources or an AnnotOp
which defines the contents as an operation over other definitions.
"""
selected = (
self._execute_operation(definition)
if isinstance(definition, annot_formats.AnnotOp) else
self.process_annot(definition)
if isinstance(definition, annot_formats.AnnotDef) else
definition
if isinstance(definition, annot_formats._set_type) else
self._select(*definition)
if isinstance(definition, (tuple, list)) else
self._select(**definition)
if isinstance(definition, dict) else
self._select(
definition,
parent = parent,
resource = resource,
**kwargs
)
)
return self._filter_entity_type(selected, entity_type = entity_type)
# synonym for old name
get_class = select
[docs]
def labels(
self,
name,
parent = None,
resource = None,
entity_type = None,
):
"""
Same as ``select`` but returns a list of labels (more human readable).
"""
return mapping.label(
self.select(
name = name,
parent = parent,
resource = resource,
entity_type = entity_type,
)
)
[docs]
def show(
self,
name,
parent = None,
resource = None,
**kwargs
):
"""
Same as ``select`` but prints a table to the console with basic
information from the UniProt datasheets.
"""
utils_uniprot.info(
*self.select(
definition = name,
parent = parent,
resource = resource,
entity_type = 'protein',
),
**kwargs
)
[docs]
def quality_check_table(
self,
path = None,
fmt = 'tsv',
only_swissprot = True,
top = None,
**kwargs
):
"""
Exports a table in tsv format for quality check and browsing purposes.
Each protein represented in one row of this table with basic data
from UniProt and the list of annotation categories from this
database.
:param str path:
Path for the exported file.
:param str fmt:
Format: either `tsv` or `latex`.
"""
features = kwargs['features'] if 'features' in kwargs else ()
proteins = list(self.get_proteins())
if only_swissprot:
proteins = reflists.select(proteins, 'swissprot')
genesymbols = [mapping.label(uniprot) for uniprot in proteins]
proteins = [
uniprot
for uniprot, genesymbol in
sorted(
(
(uniprot, genesymbol)
for uniprot, genesymbol in
zip(proteins, genesymbols)
),
key = lambda it: it[1],
)
][:top]
tbl = utils_uniprot.collect(proteins, *features)
tbl['intercell_composite'] = [
', '.join(
cls
for cls in self.classes_by_entity(uniprot, labels = True)
if cls.endswith(self.composite_resource_name)
)
for uniprot in proteins
]
tbl['intercell_all'] = [
', '.join(
cls
for cls in self.classes_by_entity(uniprot, labels = True)
if not cls.endswith(self.composite_resource_name)
)
for uniprot in proteins
]
if fmt == 'tsv':
result = common.tsv_table(tbl = tbl, path = path, **kwargs)
elif 'tex' in fmt:
if 'colformat' not in kwargs:
kwargs['colformat'] = r'rllrrK{25mm}LK{20mm}K{20mm}K{25mm}L'
result = common.latex_table(tbl = tbl, path = path, **kwargs)
else:
result = tbl
return result
def _key(self, name, parent = None, resource = None):
return name if isinstance(name, tuple) else (name, parent, resource)
def get_class_scope(self, name, parent = None, resource = None):
key = self._key(name, parent, resource)
return self.classes[key].scope
def get_resource(self, name, parent = None, resource = None):
key = self._key(name, parent, resource)
return self.classes[key].resource
def get_aspect(self, name, parent = None, resource = None):
key = self._key(name, parent, resource)
return self.classes[key].aspect
def get_source(self, name, parent = None, resource = None):
key = self._key(name, parent, resource)
return self.classes[key].source
[docs]
def get_parents(self, name, parent = None, resource = None):
"""
As names should be unique for resources, a combination of a name and
resource determines the parent category. This method looks up the
parent for a pair of name and resource.
"""
parent = parent or name
keys = (
(name, parent, resource),
(name, name, resource),
(name, resource),
(name, parent),
(parent, resource),
)
for key in keys:
if key in self.parents:
return self.parents[key]
def get_parent(self, name, parent = None, resource = None):
parents = self.get_parents(
name = name,
parent = parent,
resource = resource,
)
return (
sorted(parents, key = lambda par: par[0])[0]
if parents else
None
)
[docs]
def get_resources(self, name, parent = None):
"""
Returns a set with the names of all resources defining a category
with the given name and parent.
"""
parent = parent or name
return {
key[2]
for key in self._class_definitions.keys()
if key[0] == name and key[1] == parent
}
def consensus_score(self, name, entity):
if name in self.consensus_scores:
if entity in self.consensus_scores[name]:
return self.consensus_scores[name][entity]
return 0
def consensus_score_normalized(self, name, entity):
score = self.consensus_score(name, entity)
if not np.isnan(score):
n_resources = self.composite_numof_resources[name]
score = score / float(n_resources)
return score
[docs]
def get_resource(self, name, parent = None):
"""
For a category name and its parent returns a single resource name.
If a category belonging to the composite database matches the name
and the parent the name of the composite database will be returned,
otherwise the resource name first in alphabetic order.
"""
resources = self.get_resources(name = name, parent = parent)
return (
self.composite_resource_name
if self.composite_resource_name in resources else
sorted(resources)[0]
if resources else
None
)
def get_class_label(self, name, parent = None, resource = None):
cls = self.select(name, parent = parent, resource = resource)
return cls.label
def __len__(self):
return len(self.classes)
def __contains__(self, other):
return (
other in self.classes or
any(other in v for v in self.classes.values)
)
@staticmethod
def sets(*args):
return annot_formats.AnnotationGroup.sets(*args)
@staticmethod
def union(*args):
return annot_formats.AnnotationGroup.union(*args)
@staticmethod
def intersection(*args):
return annot_formats.AnnotationGroup.intersection(*args)
@staticmethod
def difference(*args):
return annot_formats.AnnotationGroup.difference(*args)
@staticmethod
def symmetric_difference(*args):
return annot_formats.AnnotationGroup.symmetric_difference(*args)
@staticmethod
def isdisjoint(*args):
return annot_formats.AnnotationGroup.isdisjoint(*args)
@staticmethod
def _filter_entity_type(group, entity_type):
if hasattr(group, 'filter_entity_type'):
group = group.filter_entity_type(entity_type = entity_type)
else:
group = entity.Entity.filter_entity_type(
group,
entity_type = entity_type,
)
return group
[docs]
def make_df(self, all_annotations = False, full_name = False):
"""
Creates a ``pandas.DataFrame`` where each record assigns a
molecular entity to an annotation category. The data frame will
be assigned to the ``df`` attribute.
"""
self._log('Creating data frame from custom annotation.')
header = [
'category',
'parent',
'database',
'scope',
'aspect',
'source',
'uniprot',
'genesymbol',
'entity_type',
'consensus_score',
]
dtypes = {
'category': 'category',
'parent': 'category',
'database': 'category',
'scope': 'category',
'aspect': 'category',
'source': 'category',
'uniprot': 'category',
'genesymbol': 'category',
'entity_type': 'category',
'consensus_score': 'uint16',
}
if full_name:
header.insert(-1, 'full_name')
dtypes['full_name'] = 'category'
# this won't be needed any more I guess
#self.collect_classes()
self.df = pd.DataFrame(
[
# annotation category, entity id
[
annotgroup.name,
annotgroup.parent,
annotgroup.resource,
annotgroup.scope,
annotgroup.aspect,
annotgroup.source,
uniprot.__str__(),
(
mapping.map_name0(uniprot, 'uniprot', 'genesymbol')
if isinstance(uniprot, str) else
'COMPLEX:%s' % uniprot.genesymbol_str
if hasattr(uniprot, 'genesymbol_str') else
uniprot.__str__()
),
] +
# full name
(
[
'; '.join(
mapping.map_name(
uniprot,
'uniprot',
'protein-name',
)
),
]
if full_name else []
) +
# entity type and consensus score
[
(
'complex'
if hasattr(uniprot, 'genesymbol_str') else
'mirna'
if uniprot.startswith('MIMAT') else
'protein'
),
self.consensus_score(
annotgroup.name,
uniprot,
),
] +
# all annotations
(
[self.annotdb.all_annotations_str(uniprot)]
if all_annotations else
[]
)
for key, annotgroup in iteritems(self.classes)
for uniprot in annotgroup
],
columns = header + (
['all_annotations'] if all_annotations else []
),
).astype(dtypes)
self._log(
'Custom annotation data frame has been created. '
'Memory usage: %s.' % common.df_memory_usage(self.df)
)
[docs]
def get_df(self):
"""
Returns the data frame of custom annotations. If it does not exist yet
builds the data frame.
"""
if not hasattr(self, 'df'):
self.make_df()
return self.df
[docs]
def counts(self, entity_type = 'protein', labels = True, **kwargs):
"""
Returns a dict with number of elements in each class.
:param bool labels:
Use keys or labels as keys in the returned dict.
All other arguments passed to ``iter_classes``.
"""
return dict(
(
cls.label if labels else cls.key,
cls.count_entity_type(entity_type = entity_type)
)
for cls in self.iter_classes(**kwargs)
if len(cls) > 0
)
# synonym
counts_by_class = counts
def counts_df(self, groupby = None, **kwargs):
df = self.filtered(**kwargs)
# n.b. pandas is horrible, I can't understand how it could got
# released for production use, how one can build business on it???
groupby = groupby or ['category', 'parent', 'database']
df = df.groupby(groupby)
counts = df.uniprot.nunique().reset_index()
counts.rename(columns = {'uniprot': 'n_uniprot'}, inplace = True)
df = df.agg('head', n = 1).reset_index()
df.drop(
['uniprot', 'entity_type', 'genesymbol', 'index'],
axis = 1,
inplace = True,
)
df = df.merge(counts, on = groupby)
return df
def iter_classes(self, **kwargs):
return self.filter_classes(
classes = self.classes.values(),
**kwargs
)
[docs]
@staticmethod
def filter_classes(classes, **kwargs):
"""
Returns a list of annotation classes filtered by their attributes.
``kwargs`` contains attributes and values.
"""
classes = classes
return (
cls
for cls in classes
if all(
common.eq(val, getattr(cls, attr))
for attr, val in iteritems(kwargs)
)
)
[docs]
def filter(self, entity_type = None, **kwargs):
"""
Filters the annotated entities by annotation class attributes and
``entity_type``. ``kwargs`` passed to ``filter_classes``.
"""
return set(
itertools.chain(*(
cls.filter_entity_type(entity_type = entity_type)
for cls in self.iter_classes(**kwargs)
))
)
def filter_entity_type(self, cls, entity_type = None):
return cls.filter_entity_type(entity_type = entity_type)
[docs]
def network_df(
self,
annot_df = None,
network = None,
combined_df = None,
network_args = None,
annot_args = None,
annot_args_source = None,
annot_args_target = None,
entities = None,
entities_source = None,
entities_target = None,
only_directed = False,
only_undirected = False,
only_signed = None,
only_effect = None,
only_proteins = False,
swap_undirected = True,
undirected_orientation = None,
entities_or = False,
):
"""
Combines the annotation data frame and a network data frame.
Creates a ``pandas.DataFrame`` where each record is an interaction
between a pair of molecular enitities labeled by their annotations.
network : pypath.network.Network,pandas.DataFrame
A ``pypath.network.Network`` object or a data frame with network
data.
combined_df : pandas.DataFrame
Optional, a network data frame already combined with annotations
for filtering only.
resources : set,None
Use only these network resources.
entities : set,None
Limit the network only to these molecular entities.
entities_source : set,None
Limit the source side of network connections only to these
molecular entities.
entities_target : set,None
Limit the target side of network connections only to these
molecular entities.
annot_args : dict,None
Parameters for filtering annotation classes; note, the defaults
might include some filtering, provide an empty dict if you want
no filtering at all; however this might result in huge data
frame and consequently memory issues. Passed to the ``filtered``
method.
annot_args_source : dict,None
Same as ``annot_args`` but only for the source side of the
network connections. These override ``annot_args`` but all the
criteria not defined here will be applied from ``annot_args``.
annot_args_target : dict,None
Same as ``annot_args`` but only for the target side of the
network connections. These override ``annot_args`` but all the
criteria not defined here will be applied from ``annot_args``.
only_directed : bool
Use only the directed interactions.
only_undirected : bool
Use only the undirected interactions. Specifically for retrieving
and counting the interactions without direction information.
only_effect : int,None
Use only the interactions with this effect. Either -1 or 1.
only_proteins : bool
Use only the interactions where each of the partners is a protein
(i.e. not complex, miRNA, small molecule or other kind of entity).
swap_undirected : bool
Convert undirected interactions to a pair of mutual interactions.
undirected_orientation : str,None
Ignore the direction at all interactions and make sure all of
them have a uniform orientation. If `id`, all interactions will
be oriented by the identifiers of the partenrs; if `category`,
the interactions will be oriented by the categories of the
partners.
"""
if hasattr(self, 'interclass_network'):
combined_df = self.interclass_network
param_str = ', '.join([
'network_args=[%s]' % common.dict_str(network_args),
'annot_args=[%s]' % common.dict_str(annot_args),
'annot_args_source=[%s]' % common.dict_str(annot_args_source),
'annot_args_target=[%s]' % common.dict_str(annot_args_target),
'entities=%s' % common.none_or_len(entities),
'entities_source=%s' % common.none_or_len(entities_source),
'entities_target=%s' % common.none_or_len(entities_target),
'only_directed=%s' % only_directed,
'only_undirected=%s' % only_undirected,
'only_signed=%s' % only_signed,
'only_effect=%s' % only_effect,
'only_proteins=%s' % only_proteins,
'swap_undirected=%s' % swap_undirected,
'entities_or=%s' % entities_or,
])
if combined_df is not None:
self._log(
'Using previously created network-annotation data frame. '
'Parameters %s' % param_str
)
network_df = None
else:
self._log(
'Combining custom annotation with network data frame. '
'Parameters %s' % param_str
)
network_df = (
self._network_df(network)
if network is not None else
self.network
)
if network_df is None and combined_df is None:
self._log('No network provided, no default network set.')
return
_network_args = {
'only_proteins': only_proteins,
'only_effect': only_effect,
'only_signed': only_signed,
'only_directed': only_directed,
'only_undirected': only_undirected,
'entities': entities,
'source_entities': entities_source,
'target_entities': entities_target,
'swap_undirected': swap_undirected,
'entities_or': entities_or,
}
_network_args.update(network_args or {})
if not entities_or:
entities_source = entities_source or entities or set()
entities_target = entities_target or entities or set()
_annot_args_source = (annot_args or {}).copy()
_annot_args_source.update(annot_args_source)
_annot_args_source['entities'] = entities_source
_annot_args_target = (annot_args or {}).copy()
_annot_args_target.update(annot_args_target)
_annot_args_target['entities'] = entities_target
if only_proteins:
_annot_args_source['entity_type'] = 'protein'
_annot_args_target['entity_type'] = 'protein'
if combined_df is None:
network_df = core_common.filter_network_df(
df = network_df,
**_network_args
)
annot_df_source = self.filtered(
annot_df = annot_df,
**_annot_args_source
)
annot_df_target = self.filtered(
annot_df = annot_df,
**_annot_args_target
)
annot_network_df = pd.merge(
network_df,
annot_df_source,
suffixes = ['', '_a'],
how = 'inner',
left_on = 'id_a',
right_on = 'uniprot',
)
annot_network_df.id_a = annot_network_df.id_a.astype('category')
annot_network_df = pd.merge(
annot_network_df,
annot_df_target,
suffixes = ['_a', '_b'],
how = 'inner',
left_on = 'id_b',
right_on = 'uniprot',
)
annot_network_df.id_b = annot_network_df.id_b.astype('category')
# these columns are duplicates
annot_network_df.drop(
labels = ['type_a', 'type_b', 'uniprot_a', 'uniprot_b'],
inplace = True,
axis = 'columns',
)
else:
combined_df = core_common.filter_network_df(
df = combined_df,
**_network_args
)
combined_df = self.filtered(
annot_df = combined_df,
postfix = '_a',
**_annot_args_source
)
combined_df = self.filtered(
annot_df = combined_df,
postfix = '_b',
**_annot_args_target
)
annot_network_df = combined_df
if undirected_orientation:
# which columns we consider for the orientation
by = undirected_orientation
by = by if by in {'id', 'category'} else 'category'
by_col_a = getattr(annot_network_df, '%s_a' % by)
by_col_b = getattr(annot_network_df, '%s_b' % by)
# indices of the records with the wrong orientation
idx_wrong_orient = [a > b for a, b in zip(by_col_a, by_col_b)]
# split the data frame
wrong_orient = annot_network_df.iloc[idx_wrong_orient].copy()
good_orient = annot_network_df.iloc[
np.logical_not(idx_wrong_orient)
].copy()
column_order = list(annot_network_df.columns)
# swap the orientation
column_map = dict(
(
col,
common.swap_suffix(col)
)
for col in column_order
)
wrong_orient = wrong_orient.rename(columns = column_map)
# make sure the column order is correct
wrong_orient = wrong_orient[column_order]
# concatenate the slices
orientation_swapped = pd.concat([good_orient, wrong_orient])
orientation_swapped = orientation_swapped.drop_duplicates(
subset = [
'id_a',
'id_b',
'type',
'category_a',
'category_b',
'parent_a',
'parent_b',
'source_a',
'source_b',
'scope_a',
'scope_b',
'entity_type_a',
'entity_type_b',
]
)
# removing direction and effect columns
# as they are not valid any more
orientation_swapped.drop(
['directed', 'effect'],
axis = 1,
inplace = True,
)
annot_network_df = orientation_swapped
self._log(
'Combined custom annotation data frame with network data frame. '
'Memory usage: %s.' % common.df_memory_usage(annot_network_df)
)
return annot_network_df
# this became a synonym
filter_interclass_network = network_df
[docs]
def set_interclass_network_df(self, **kwargs):
"""
Creates a data frame of the whole inter-class network and keeps it
assigned to the instance in order to make subsequent queries faster.
"""
self.unset_interclass_network_df()
self.interclass_network = self.get_interclass_network_df(**kwargs)
[docs]
def get_interclass_network_df(self, **kwargs):
"""
If the an interclass network is already present the ``network``
and other ``kwargs`` provided not considered. Otherwise these
are passed to ``network_df``.
"""
return (
self.interclass_network
if hasattr(self, 'interclass_network') else
self.network_df(**kwargs)
)
def unset_interclass_network_df(self):
if hasattr(self, 'interclass_network'):
del self.interclass_network
#
# Below only thin wrappers to make the interface more intuitive
# without knowing the argument names
#
#
# Building a network of connections between classes
#
def inter_class_network(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
def inter_class_network_undirected(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
kwargs.update({'only_undirected': True})
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
def inter_class_network_directed(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
kwargs.update({'only_directed': True})
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
def inter_class_network_signed(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
kwargs.update({'only_signed': True})
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
def inter_class_network_stimulatory(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
kwargs.update({
'only_directed': True,
'only_effect': 1,
})
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
def inter_class_network_inhibitory(
self,
annot_args_source = None,
annot_args_target = None,
network = None,
**kwargs
):
kwargs.update({
'only_directed': True,
'only_effect': -1,
})
return self.network_df(
network = network,
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
#
# Counting connections between classes (total)
#
def count_inter_class_connections(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
# synonym
count_inter_class_connections_all = count_inter_class_connections
def count_inter_class_connections_undirected(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network_undirected(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
def count_inter_class_connections_directed(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network_directed(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
def count_inter_class_connections_signed(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network_signed(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
def count_inter_class_connections_stimulatory(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network_stimulatory(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
def count_inter_class_connections_inhibitory(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
return self.inter_class_network_inhibitory(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(['id_a', 'id_b'], as_index = False).ngroups
#
# Class to class connection counts
#
[docs]
def class_to_class_connections(self, **kwargs):
"""
``kwargs`` passed to ``filter_interclass_network``.
"""
network = self.network_df(**kwargs)
self._log('Counting connections between classes.')
return (
network.groupby(
['category_a', 'category_b', 'id_a', 'id_b']
).size().groupby(
level = ['category_a', 'category_b']
).size()
)
def class_to_class_connections_undirected(self, **kwargs):
param = {
'only_undirected': True,
}
kwargs.update(param)
c2c = self.class_to_class_connections(**kwargs)
c2c_rev = dict(
(
(cls1, cls0),
val
)
for (cls0, cls1), val in zip(c2c.index, c2c)
if cls0 != cls1
)
return common.sum_dicts(c2c, c2c_rev)
def class_to_class_connections_directed(self, **kwargs):
param = {
'only_directed': True,
}
kwargs.update(param)
return self.class_to_class_connections(**kwargs)
def class_to_class_connections_signed(self, **kwargs):
param = {
'only_signed': True,
}
kwargs.update(param)
return self.class_to_class_connections(**kwargs)
def class_to_class_connections_stimulatory(self, **kwargs):
param = {
'only_effect': 1,
}
kwargs.update(param)
return self.class_to_class_connections(**kwargs)
def class_to_class_connections_inhibitory(self, **kwargs):
param = {
'only_effect': -1,
}
kwargs.update(param)
return self.class_to_class_connections(**kwargs)
#
# Inter-class degrees
#
[docs]
def degree_inter_class_network(
self,
annot_args_source = None,
annot_args_target = None,
degrees_of = 'target',
**kwargs
):
"""
degrees_of : str
Either *source* or *target*. Count the degrees for the source
or the target class.
"""
id_cols = ('id_a', 'id_b')
groupby, unique = (
id_cols
if degrees_of == 'source' else
reversed(id_cols)
)
degrees = (
self.inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
).groupby(groupby)[unique].nunique()
)
return degrees[degrees != 0]
def degree_inter_class_network_undirected(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
kwargs.update({'only_undirected': True})
return (
self.degree_inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
)
def degree_inter_class_network_directed(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
kwargs.update({'only_directed': True})
return (
self.degree_inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
)
def degree_inter_class_network_stimulatory(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
kwargs.update({
'only_directed': True,
'only_effect': 1,
})
return (
self.degree_inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
)
def degree_inter_class_network_inhibitory(
self,
annot_args_source = None,
annot_args_target = None,
**kwargs
):
kwargs.update({
'only_directed': True,
'only_effect': -1,
})
return (
self.degree_inter_class_network(
annot_args_source = annot_args_source,
annot_args_target = annot_args_target,
**kwargs
)
)
def degree_inter_class_network_2(
self,
degrees_of = 'target',
sum_by_class = True,
**kwargs
):
network = self.network_df(**kwargs)
id_cols = ('id_a', 'id_b')
groupby, unique = (
id_cols
if degrees_of == 'source' else
reversed(id_cols)
)
if sum_by_class:
groupby_cat = (
'category_a'
if degrees_of == 'source' else
'category_b'
)
groupby = [groupby, groupby_cat]
degrees = network.groupby(groupby)[unique].nunique()
if sum_by_class:
degrees = degrees.groupby(groupby_cat).sum()
return degrees[degrees != 0]
def degree_inter_class_network_undirected_2(self, **kwargs):
kwargs.update({'only_undirected': True, 'degrees_of': 'source'})
deg_source = self.degree_inter_class_network_2(**kwargs)
kwargs.update({'only_undirected': True, 'degrees_of': 'target'})
deg_target = self.degree_inter_class_network_2(**kwargs)
return common.sum_dicts(deg_source, deg_target)
def degree_inter_class_network_directed_2(self, **kwargs):
kwargs.update({'only_directed': True})
return self.degree_inter_class_network_2(**kwargs)
def degree_inter_class_network_stimulatory_2(self, **kwargs):
kwargs.update({'only_effect': 1})
return self.degree_inter_class_network_2(**kwargs)
def degree_inter_class_network_inhibitory_2(self, **kwargs):
kwargs.update({'only_effect': -1})
return self.degree_inter_class_network_2(**kwargs)
#
# End of wrappers
#
[docs]
def register_network(self, network):
"""
Sets ``network`` as the default network dataset for the instance.
All methods afterwards will use this network.
Also it discards the interclass network data frame if it present to
make sure future queries will address the network registered here.
"""
self.unset_interclass_network_df()
self.network = self._network_df(network)
@staticmethod
def _network_df(network):
if not hasattr(network, 'df') and hasattr(network, 'make_df'):
network.make_df()
return (
network.df
if hasattr(network, 'df') else
network
)
def filtered(
self,
annot_df = None,
entities = None,
**kwargs
):
annot_df = self.get_df() if annot_df is None else annot_df
return self.filter_df(
annot_df = annot_df,
entities = entities,
**kwargs
)
@classmethod
def filter_df(
cls,
annot_df,
entities = None,
postfix = None,
**kwargs
):
query = cls._process_query_args(
df = annot_df,
entities = entities,
args = kwargs,
postfix = postfix,
)
args = cls._args_add_postfix(args, postfix)
query = ' and '.join(query)
return annot_df.query(query) if query else annot_df
@staticmethod
def _process_query_args(df, args, entities = None, postfix = None):
query = []
for col, val in iteritems(args):
col = '%s%s' % (col, postfix) if postfix else col
if val is not None and col in df.columns:
op = '==' if isinstance(val, _const.SIMPLE_TYPES) else 'in'
q = '%s %s %s' % (col, op, '@args["%s"]' % col)
query.append(q)
if entities:
entity_cols = {'id', 'genesymbol', 'uniprot'}
if postfix:
entity_cols = {
'%s%s' % (col, postfix)
for col in entity_cols
}
entity_cols = entity_cols & set(df.columns)
q = '(%s)' % (
' or '.join(
'%s in @entities' % col
for col in entity_cols
)
)
query.append(q)
return query
@staticmethod
def _args_add_postfix(args, postfix):
if postfix:
args = dict(
(
'%s%s' % (key, postfix),
val
)
for key, val in iteritems(args)
)
return args
def export(self, fname, **kwargs):
self.make_df()
self.df.to_csv(fname, **kwargs)
[docs]
def classes_by_entity(self, element, labels = False):
"""
Returns a set of class keys with the classes containing at least
one of the elements.
:param str,set element:
One or more element (entity) to search for in the classes.
:param bool labels:
Return labels instead of keys.
"""
element = common.to_set(element)
return set(
cls.label if labels else key
for key, cls in iteritems(self.classes)
if element & cls
)
def entities_by_resource(self, entity_types = None, **kwargs):
by_resource = collections.defaultdict(set)
for key, cls in iteritems(self.classes):
by_resource[cls.resource].update(
cls.filter_entity_type(entity_type = entity_types)
)
return dict(by_resource)
# TODO: this kind of methods should be implemented by metaprogramming
def proteins_by_resource(self):
return self.entities_by_resource(entity_types = 'protein')
def complexes_by_resource(self):
return self.entities_by_resource(entity_types = 'complex')
def mirnas_by_resource(self):
return self.entities_by_resource(entity_types = 'mirna')
def counts_by_resource(self, entity_types = None):
return dict(
(
resource,
len(entities)
)
for resource, entities in iteritems(
self.entities_by_resource(entity_types = entity_types)
)
)
def get_entities(self, entity_types = None):
return entity.Entity.filter_entity_type(
set.union(*(set(a) for a in self.classes.values()))
if self.classes else
(),
entity_type = entity_types,
)
# TODO: this kind of methods should be implemented by metaprogramming
def get_proteins(self):
return self.get_entities(entity_types = 'protein')
def get_complexes(self):
return self.get_entities(entity_types = 'complex')
def get_mirnas(self):
return self.get_entities(entity_types = 'mirna')
def numof_entities(self, entity_types = None):
return len(self.get_entities(entity_types = entity_types))
# TODO: this kind of methods should be implemented by metaprogramming
def numof_proteins(self):
return self.numof_entities(entity_types = 'protein')
def numof_complexes(self):
return self.numof_entities(entity_types = 'complex')
def numof_mirnas(self):
return self.numof_entities(entity_types = 'mirna')
def numof_classes(self):
return len(self.classes)
def numof_records(self, entity_types = None):
return sum(
cls.count_entity_type(entity_type = entity_types)
for cls in self.classes.values()
)
# TODO: this kind of methods should be implemented by metaprogramming
def numof_protein_records(self):
return self.numof_records(entity_types = 'protein')
def numof_complex_records(self):
return self.numof_records(entity_types = 'complex')
def numof_mirna_records(self):
return self.numof_records(entity_types = 'mirna')
[docs]
def resources_in_category(self, key):
"""
Returns a list of resources contributing to the definition of
a category.
"""
if not isinstance(key, tuple):
key = (key, key, self.composite_resource_name)
if key in self.children:
return sorted({child.resource for child in self.children[key]})
def all_resources(self):
return sorted({grp.resource for grp in self.classes.values()})
def __getitem__(self, item):
if isinstance(item, tuple) and item in self.classes:
return self.classes[item]
else:
return self.classes_by_entity(item)
[docs]
def browse(self, start: int = 0, **kwargs):
"""
Print gene information as a table.
Presents information about annotation classes as ascii tables printed
in the terminal. If one class provided, prints one table. If multiple
classes provided, prints a table for each of them one by one
proceeding to the next one once you hit return. If no classes
provided goes through all classes.
``kwargs`` passed to ``pypath.utils.uniprot.info``.
"""
classes = dict(
(
cls.label,
cls.filter_entity_type(entity_type = 'protein')
)
for cls in self.iter_classes(**kwargs)
)
utils_uniprot.browse(groups = classes, start = start, **kwargs)
[docs]
class AnnotationBase(resource.AbstractResource):
_dtypes = {
'uniprot': 'category',
'genesymbol': 'category',
'entity_type': 'category',
'source': 'category',
'label': 'category',
'value': 'object',
'record_id': 'int32',
}
[docs]
def __init__(
self,
name,
ncbi_tax_id = 9606,
input_method = None,
input_args = None,
entity_type = 'protein',
swissprot_only = True,
proteins = (),
complexes = (),
reference_set = (),
infer_complexes = None,
dump = None,
primary_field = None,
check_ids = True,
**kwargs
):
"""
Represents annotations for a set of proteins.
Loads the data from the original resource and provides methods
to query the annotations.
:arg str name:
A custom name for the annotation resource.
:arg int ncbi_tax_id:
NCBI Taxonomy identifier.
:arg callable,str input_method:
Either a callable or the name of a method in any submodules of
the ``pypath.inputs`` module. Should return a dict with
UniProt IDs as keys or an object suitable for ``process_method``.
:arg dict input_args:
Arguments for the ``input_method``.
"""
session_mod.Logger.__init__(self, name = 'annot')
input_args = input_args or {}
input_args.update(kwargs)
resource.AbstractResource.__init__(
self,
name = name,
ncbi_tax_id = ncbi_tax_id,
input_method = input_method,
input_args = input_args,
dump = dump,
data_attr_name = 'annot',
)
self.entity_type = entity_type
self.primary_field = primary_field
infer_complexes = (
infer_complexes
if isinstance(infer_complexes, bool) else
settings.get('annot_infer_complexes')
)
self.infer_complexes = (
infer_complexes and
self.entity_type == 'protein'
)
self.proteins = proteins
self.complexes = complexes
self.reference_set = reference_set
self.swissprot_only = swissprot_only
self.check_ids = check_ids
self.load()
[docs]
def reload(self):
"""
Reloads the object from the module level.
"""
modname = self.__class__.__module__
mod = __import__(modname, fromlist = [modname.split('.')[0]])
imp.reload(mod)
new = getattr(mod, self.__class__.__name__)
setattr(self, '__class__', new)
[docs]
def load(self):
"""
Loads the annotation data by calling the input method.
Infers annotations for complexes in the complex database if
py:attr:``infer_complexes`` is True.
"""
self._log('Loading annotations from `%s`.' % self.name)
self.set_reference_set()
resource.AbstractResource.load(self)
self._ensure_swissprot()
self._update_primary_field()
if self.infer_complexes:
self.add_complexes_by_inference()
self._log(
'Loaded annotations from `%s`: %u molecules, %u annotations.' % (
self.name,
self.numof_entities(),
self.numof_records(),
)
)
def _update_primary_field(self):
self.primary_field = (
self.primary_field or
self.get_names()[0]
if self.get_names() else
None
)
def _ensure_swissprot(self):
if (
self.ncbi_tax_id == _const.NOT_ORGANISM_SPECIFIC or
not self.check_ids
):
return
new = collections.defaultdict(set)
for uniprot, annots in iteritems(self.annot):
if entity.Entity._is_protein(uniprot):
swissprots = mapping.map_name(
uniprot,
'uniprot',
'uniprot',
ncbi_tax_id = self.ncbi_tax_id,
)
for swissprot in swissprots:
new[swissprot].update(annots)
else:
new[uniprot].update(annots)
self.annot = dict(new)
[docs]
def add_complexes_by_inference(self, complexes = None):
"""
Creates complex annotations by in silico inference and adds them
to this annotation set.
"""
complex_annotation = self.complex_inference(complexes = complexes)
self.annot.update(complex_annotation)
[docs]
def complex_inference(self, complexes = None):
"""
Annotates all complexes in `complexes`, by default in the default
complex database (existing in the `complex` module or generated
on demand according to the module's current settings).
Returns
-------
Dict with complexes as keys and sets of annotations as values.
Complexes with no valid information in this annotation resource
won't be in the dict.
Parameters
----------
complexes : iterable
Iterable yielding complexes.
"""
self._log('Inferring complex annotations from `%s`.' % self.name)
if not complexes:
import pypath.core.complex as complex
complexdb = complex.get_db()
complexes = complexdb.complexes.values()
complex_annotation = collections.defaultdict(set)
for cplex in complexes:
this_cplex_annot = self.annotate_complex(cplex)
if this_cplex_annot is not None:
complex_annotation[cplex].update(this_cplex_annot)
return complex_annotation
[docs]
def annotate_complex(self, cplex):
"""
Infers annotations for a single complex.
"""
if (
not all(comp in self for comp in cplex.components.keys()) or
self._eq_fields is None
):
# this means no annotation for this complex
return None
elif not self._eq_fields:
# here empty set means the complex belongs
# to the class of enitities covered by this
# annotation
return set()
elif callable(self._eq_fields):
# here a custom method combines the annotations
# we look at all possible combinations of the annotations
# of the components, but most likely each component have
# only one annotation in this case
return set(
self._eq_fields(*annots)
for annots in itertools.product(
*(
self.annot[comp]
for comp in cplex.components.keys()
)
)
)
elif hasattr(self, '_merge'):
return self._merge(*(
self.annot[comp]
for comp in cplex.components.keys()
))
else:
groups = collections.defaultdict(set)
empty_args = {}
cls = None
components = set(cplex.components.keys())
for comp in cplex.components.keys():
for comp_annot in self.annot[comp]:
if cls is None:
cls = comp_annot.__class__
empty_args = dict(
(f, None)
for f in comp_annot._fields
if f not in self._eq_fields
)
groups[
tuple(
getattr(comp_annot, f)
for f in self._eq_fields
)
].add(comp)
return set(
# the characteristic attributes of the group
# and the remaining left empty
cls(
**dict(zip(self._eq_fields, key)),
**empty_args
)
# checking all groups
for key, group in iteritems(groups)
# and accepting the ones covering all members of the complex
if group == components
) or None
def _update_complex_attribute_classes(self):
complex.ComplexAggregator._update_complex_attribute_classes_static(
self.annot.keys(),
mod = sys.modules[__name__],
)
[docs]
def load_proteins(self):
"""
Retrieves a set of all UniProt IDs to have a base set of the entire
proteome.
"""
self.uniprots = set(
uniprot_db.all_uniprots(organism = self.ncbi_tax_id)
)
[docs]
@staticmethod
def get_reference_set(
proteins = (),
complexes = (),
use_complexes = False,
ncbi_tax_id = 9606,
swissprot_only = True,
):
"""
Retrieves the reference set i.e. the set of all entities which
potentially have annotation in this resource. Typically this is the
proteome of the organism from UniProt optionally with all the protein
complexes from the complex database.
"""
proteins = (
proteins or
sorted(
uniprot_db.all_uniprots(
organism = ncbi_tax_id,
swissprot = swissprot_only,
)
)
)
if use_complexes:
import pypath.core.complex as complex
complexes = (
complexes or
sorted(complex.all_complexes())
)
reference_set = sorted(
itertools.chain(
proteins,
complexes,
)
)
return proteins, complexes, reference_set
def _get_reference_set(self):
return self.get_reference_set(
proteins = self.proteins,
complexes = self.complexes,
use_complexes = self.has_complexes(),
ncbi_tax_id = self.ncbi_tax_id,
swissprot_only = self.swissprot_only,
)
[docs]
def set_reference_set(self):
"""
Assigns the reference set to the :py:attr``reference_set`` attribute.
The reference set is the set of all entities which
potentially have annotation in this resource. Typically this is the
proteome of the organism from UniProt optionally with all the protein
complexes from the complex database.
"""
if not self.reference_set:
if self.ncbi_tax_id == _const.NOT_ORGANISM_SPECIFIC:
proteins, complexes, reference_set = (set(),) * 3
else:
proteins, complexes, reference_set = self._get_reference_set()
self.proteins = proteins
self.complexes = complexes
self.reference_set = reference_set
def has_complexes(self):
return self.entity_type == 'complex' or self.infer_complexes
def _process_method(self, *args, **kwargs):
"""
By default it converts a set to dict of empty sets in order to make
it compatible with other methods.
Derived classes might override.
"""
self.annot = dict((u, set()) for u in self.data)
[docs]
def select(self, method = None, entity_type = None, **kwargs):
"""
Retrieves a subset by filtering based on ``kwargs``.
Each argument should be a name and a value or set of values.
Elements having the provided values in the annotation will be
returned.
Returns a set of UniProt IDs.
"""
result = set()
names = set(self.get_names())
if not all(k in names for k in kwargs.keys()):
raise ValueError('Unknown field names: %s' % (
', '.join(sorted(set(kwargs.keys()) - names))
)
)
for uniprot, annot in iteritems(self.annot):
for a in annot:
# we either call a method on all records
# or check against conditions provided in **kwargs
if (
not callable(method) or
method(a)
) and all(
(
# simple agreement
(
getattr(a, name) == value
)
# custom method returns bool
or
(
callable(value)
and
value(getattr(a, name))
)
# multiple value in annotation slot
# and value is a set: checking if they have
# any in common
or
(
isinstance(getattr(a, name), _const.LIST_LIKE)
and
isinstance(value, set)
and
set(getattr(a, name)) & value
)
# search value is a set, checking if contains
# the record's value
or
(
isinstance(value, set)
and
getattr(a, name) in value
)
# record's value contains multiple elements
# (set, list or tuple), checking if it contains
# the search value
or
(
isinstance(getattr(a, name), _const.LIST_LIKE)
and
value in getattr(a, name)
)
)
for name, value in iteritems(kwargs)
):
result.add(uniprot)
break
result = entity.Entity.filter_entity_type(result, entity_type)
return result
# synonym for old name
get_subset = select
[docs]
def labels(self, method = None, **kwargs):
"""
Same as ``select`` but returns a list of labels (more human readable).
"""
return mapping.label(self.select(method = method, **kwargs))
[docs]
def show(self, method = None, table_param = None, **kwargs):
"""
Same as ``select`` but prints a table to the console with basic
information from the UniProt datasheets.
"""
table_param = table_param or {}
utils_uniprot.info(
*self.select(method = method, **kwargs),
**table_param
)
[docs]
def get_subset_bool_array(self, reference_set = None, **kwargs):
"""
Returns a boolean vector with True and False values for each entity
in the reference set. The values represent presence absence data
in the simplest case, but by providing ``kwargs`` any kind of matching
and filtering is possible. ``kwargs`` are passed to the ``select``
method.
"""
reference_set = reference_set or self.reference_set
subset = self.get_subset(**kwargs)
return np.array([
entity in subset
for entity in reference_set
])
[docs]
def to_bool_array(self, reference_set):
"""
Returns a presence/absence boolean array for a reference set.
"""
total = self.to_set()
return np.array([
entity in total
for entity in reference_set
])
[docs]
def to_set(self):
"""
Returns the entities present in this annotation resource as a set.
"""
return set(self.annot.keys())
@staticmethod
def _entity_types(entity_types):
return (
{entity_types}
if isinstance(entity_types, str) else
entity_types
)
[docs]
def all_entities(self, entity_types = None):
"""
All entities annotated in this resource.
"""
entity_types = self._entity_types(entity_types)
return sorted((
k for k in self.annot.keys()
if self._match_entity_type(k, entity_types)
))
[docs]
def all_proteins(self):
"""
All UniProt IDs annotated in this resource.
"""
return sorted((
k for k in self.annot.keys()
if self.is_protein(k)
))
[docs]
def all_complexes(self):
"""
All protein complexes annotated in this resource.
"""
return sorted((
k
for k in self.annot.keys()
if self.is_complex(k)
))
[docs]
def all_mirnas(self):
"""
All miRNAs annotated in this resource.
"""
return sorted((
k for k in self.annot.keys()
if self.is_mirna(k)
))
@staticmethod
def is_protein(key):
return entity.Entity._is_protein(key)
@staticmethod
def is_mirna(key):
return entity.Entity._is_mirna(key)
@staticmethod
def is_complex(key):
return entity.Entity._is_complex(key)
@classmethod
def get_entity_type(cls, key):
return entity.Entity._get_entity_type(key)
@classmethod
def _match_entity_type(cls, key, entity_types):
return not entity_types or cls.get_entity_type(key) in entity_types
[docs]
def numof_records(self, entity_types = None):
"""
The total number of annotation records.
"""
entity_types = self._entity_types(entity_types)
return sum(
max(len(a), 1)
for k, a in iteritems(self.annot)
if self._match_entity_type(k, entity_types)
)
def numof_protein_records(self):
return self.numof_records(entity_types = {'protein'})
def numof_mirna_records(self):
return self.numof_records(entity_types = {'mirna'})
def numof_complex_records(self):
return self.numof_records(entity_types = {'complex'})
[docs]
def numof_entities(self):
"""
The number of annotated entities in the resource.
"""
return len(self.annot)
def _numof_entities(self, entity_types = None):
entity_types = self._entity_types(entity_types)
return len([
k for k in self.annot.keys()
if self._match_entity_type(k, entity_types)
])
def numof_proteins(self):
return self._numof_entities(entity_types = {'protein'})
def numof_mirnas(self):
return self._numof_entities(entity_types = {'mirna'})
def numof_complexes(self):
return self._numof_entities(entity_types = {'complex'})
def __repr__(self):
return (
'<%s annotations: %u records about %u entities>' % (
self.name,
self.numof_records(),
self.numof_entities(),
)
)
[docs]
def to_array(self, reference_set = None, use_fields = None):
"""
Returns an entity vs feature array. In case of more complex
annotations this might be huge.
"""
use_fields = (
use_fields or (
default_fields[self.name]
if self.name in default_fields else
None
)
)
self._log(
'Creating boolean array from `%s` annotation data.' % self.name
)
reference_set = reference_set or self.reference_set
all_fields = self.get_names()
fields = use_fields or all_fields
ifields = tuple(
i for i, field in enumerate(all_fields) if field in fields
)
result = [
(
(self.name,),
self.to_bool_array(reference_set = reference_set)
)
]
for i in xrange(len(fields)):
this_ifields = ifields[:i + 1]
this_fields = fields[:i + 1]
value_combinations = set(
tuple(annot[j] for j in this_ifields)
for annots in self.annot.values()
for annot in annots
)
value_combinations = sorted(
values
for values in value_combinations
if not any(
isinstance(v, (type(None), float, int))
for v in values
)
)
for values in value_combinations:
labels = tuple(
'not-%s' % this_fields[ival]
if isinstance(val, bool) and not val else
this_fields[ival]
if isinstance(val, bool) and val else
val
for ival, val in enumerate(values)
)
this_values = dict(zip(this_fields, values))
this_array = self.get_subset_bool_array(
reference_set = reference_set,
**this_values
)
result.append(
(
(self.name,) + labels,
this_array,
)
)
self._log(
'Boolean array has been created from '
'`%s` annotation data.' % self.name
)
return (
tuple(r[0] for r in result),
np.vstack([r[1] for r in result]).T
)
@property
def has_fields(self):
return any(self.annot.values())
[docs]
def make_df(self, rebuild = False):
"""
Compiles a ``pandas.DataFrame`` from the annotation data.
The data frame will be assigned to :py:attr``df``.
"""
self._log('Creating dataframe from `%s` annotations.' % self.name)
if hasattr(self, 'df') and not rebuild:
self._log('Data frame already exists, rebuild not requested.')
return
discard = {'n/a', None}
columns = [
'uniprot',
'genesymbol',
'entity_type',
'source',
'label',
'value',
'record_id',
]
has_fields = self.has_fields
records = []
irec = 0
for element, annots in iteritems(self.annot):
if not element:
continue
entity_type = self.get_entity_type(element)
genesymbol_str = (
'COMPLEX:%s' % element.genesymbol_str
if hasattr(element, 'genesymbol_str') else
'COMPLEX:%s' % (
complex.get_db().complexes[element].genesymbol_str
)
if element.startswith('COMPLEX:') else
(
mapping.label(
element,
entity_type = entity_type,
ncbi_tax_id = self.ncbi_tax_id,
) or
''
)
)
if not has_fields:
records.append([
element.__str__(),
genesymbol_str,
entity_type,
self.name,
'in %s' % self.name,
'yes',
irec,
])
irec += 1
for annot in annots:
for label, value in zip(annot._fields, annot):
if value in discard:
continue
if isinstance(value, (set, list, tuple)):
value = ';'.join(map(str, value))
records.append([
element.__str__(),
genesymbol_str,
entity_type,
self.name,
label,
str(value),
irec,
])
irec += 1
self.df = pd.DataFrame(
records,
columns = columns,
).astype(self._dtypes)
[docs]
def coverage(self, other):
"""
Calculates the coverage of the annotation i.e. the proportion of
entities having at least one record in this annotation resource
for an arbitrary set of entities.
"""
other = other if isinstance(other, set) else set(other)
return len(self & other) / len(self)
def proportion(self, other):
other = other if isinstance(other, set) else set(other)
return len(self & other) / len(other) if other else .0
[docs]
def subset_intersection(self, universe, **kwargs):
"""
Calculates the proportion of entities in a subset occuring in the
set ``universe``. The subset is selected by passing ``kwargs`` to the
``select`` method.
"""
subset = self.get_subset(**kwargs)
return len(subset & universe) / len(subset)
[docs]
def get_values(self, name, exclude_none = True):
"""
Returns the set of all possible values of a field. E.g. if the
records of this annotation have a field ``cell_type`` then calling
this method can tell you that across all records the values of
this field might be ``{'macrophage', 'epithelial_cell', ...}``.
"""
values = {
val
for aset in self.annot.values()
for a in aset
for val in (
# to support tuple values
getattr(a, name)
if isinstance(getattr(a, name), _const.LIST_LIKE) else
(getattr(a, name),)
)
}
if exclude_none:
values.discard(None)
return values
[docs]
def get_names(self):
"""
Returns the list of field names in the records. The annotation
consists of uniform records and each entity might be annotated
with one or more records. Each record is a tuple of fields, for
example ``('cell_type', 'expression_level', 'score')``.
"""
names = ()
for values in self.annot.values():
if values:
for val in values:
names = val._fields
break
break
return names
def __and__(self, other):
return other & self.to_set()
def __or__(self, other):
return other | self.to_set()
def __sub__(self, other):
return self.to_set() - other
def __len__(self):
return self.numof_entities()
def __getitem__(self, item):
if not isinstance(item, _const.SIMPLE_TYPES):
return set.union(
*(
self.annot[it]
for it in item
if it in self
)
)
elif item in self:
return self.annot[item]
elif self.primary_field:
return self.select(**{self.primary_field: item})
def __contains__(self, item):
return item in self.annot
[docs]
def numof_references(self):
"""
Some annotations contain references. The field name for references
is always ``pmid`` (PubMed ID). This method collects and counts the
references across all records.
"""
return len(set(self.all_refs()))
[docs]
def curation_effort(self):
"""
Counts the reference-record pairs.
"""
return len(self.all_refs())
[docs]
def all_refs(self):
"""
Some annotations contain references. The field name for references
is always ``pmid`` (PubMed ID). This method collects the references
across all records. Returns *list*.
"""
if 'pmid' in self.get_names():
return [
a.pmid
for aa in self.annot.values()
for a in aa
if a.pmid
]
return []
@property
def summary(self):
return {
'n_total': self.numof_entities(),
'n_records_total': self.numof_records(),
'n_proteins': self.numof_proteins(),
'pct_proteins': self.proportion(self.proteins) * 100,
'n_complexes': self.numof_complexes(),
'pct_complexes': self.proportion(
complex.get_db().complexes.keys()
) * 100,
'n_mirnas': self.numof_mirnas(),
'pct_mirnas': (
self.proportion(reflists.get_reflist('mirbase')) * 100
),
'n_protein_records': self.numof_protein_records(),
'n_complex_records': self.numof_complex_records(),
'n_mirna_records': self.numof_mirna_records(),
'references': self.numof_references(),
'curation_effort': self.curation_effort(),
'records_per_entity': (
self.numof_protein_records() / self.numof_proteins()
if self.numof_proteins() else
self.numof_records() / self.numof_entities()
if self.numof_entities() else
0
),
'complex_annotations_inferred': bool(self.numof_proteins()),
'fields': ', '.join(self.get_names()),
'name': self.name,
}
[docs]
def browse(
self,
field: str | list[str] | dict[str, str] | None = None,
start: int = 0,
**kwargs
):
"""
Print gene information as a table.
Presents information about annotation categories as ascii tables
printed in the terminal. If one category provided, prints one table.
If multiple categories provided, prints a table for each of them one
by one proceeding to the next one once you hit return. If no categories
provided goes through all levels of the primary category.
Args
field:
The field to browse categories by.
* If None the primary field will be selected.
If this annotation resource doesn't have fields, all proteins
will be presented as one single category.
* If a string it will be considered a field name and it will
browse through all levels of this field.
* If a ``list``, set or tuple, it will be considered either a
``list`` of field names or a list of values from the primary
field. In the former case all combinations of the values of
the fields will be presented, in the latter case the browsing
will be limited to the levels of the primary field contained
in ``field``.
* If a ``dict``, keys are supposed to be field names and values
as list of levels. If any of the values are None, all levels
from that field will be used.
start:
Start browsing from this category. E.g. if there are 500
categories and start is 250 it will skip everything before the
250th.
kwargs:
Passed to ``pypath.utils.uniprot.info``.
"""
if not field and not self.primary_field:
uniprots = entity.Entity.only_proteins(self.to_set())
utils_uniprot.info(uniprots, **kwargs)
return
field = field or self.primary_field
if isinstance(field, str):
# all values of the field
field = {field: self.get_values(field)}
elif isinstance(field, _const.LIST_LIKE):
if set(field) & set(self.get_names()):
# a set of fields provided
field = dict(
(
fi,
self.get_values(fi)
)
for fi in field
)
else:
# a set of values provided
field = {self.primary_field: field}
elif isinstance(field, dict):
field = dict(
(
fi,
vals or self.get_values(fi)
)
for fi, vals in iteritems(field)
)
else:
sys.stdout.write(
'Could not recognize field definition, '
'please refer to the docs.\n'
)
sys.stdout.flush()
return
# otherwise we assume `field` is a dict of fields and values
field_keys = list(field.keys())
field_values = [field[k] for k in field_keys]
values = sorted(itertools.product(*field_values))
total = len(values)
groups = {}
for vals in values:
args = dict(zip(field_keys, vals))
proteins = entity.Entity.only_proteins(self.select(**args))
if not proteins:
continue
label = (
vals[0]
if len(vals) == 1 else
', '.join(
'%s: %s' % (
key,
str(val)
)
for key, val in iteritems(args)
)
)
groups[label] = proteins
utils_uniprot.browse(groups = groups, start = start, **kwargs)
[docs]
class Membranome(AnnotationBase):
_eq_fields = ('membrane', 'side')
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Membranome',
input_method = 'membranome.membranome_annotations',
**kwargs
)
def _process_method(self):
record = collections.namedtuple(
'MembranomeAnnotation',
['membrane', 'side'],
)
_annot = collections.defaultdict(set)
for a in self.data:
_annot[a[0]].add(record(a[1], a[2]))
self.annot = dict(_annot)
[docs]
class Exocarta(AnnotationBase):
_eq_fields = ('tissue', 'vesicle')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
if 'database' not in kwargs:
kwargs['database'] = 'exocarta'
AnnotationBase.__init__(
self,
name = kwargs['database'].capitalize(),
ncbi_tax_id = ncbi_tax_id,
input_method = 'exocarta._get_exocarta_vesiclepedia',
**kwargs,
)
def _process_method(self):
record = collections.namedtuple(
'%sAnnotation' % self.name,
['pmid', 'tissue', 'vesicle'],
)
_annot = collections.defaultdict(set)
missing_name = False
for a in self.data:
if not a[1]:
missing_name = True
continue
uniprots = mapping.map_name(a[1], 'genesymbol', 'uniprot')
for u in uniprots:
for vesicle in (
a[3][3]
if self.name == 'Vesiclepedia' else
('Exosomes',)
):
_annot[u].add(record(a[3][0], a[3][2], vesicle))
self.annot = dict(_annot)
if missing_name:
self._log(
'One or more names were missing while processing '
'annotations from %s. Best if you check your cache '
'file and re-download the data if it\' corrupted.' % (
self.name
)
)
[docs]
class Vesiclepedia(Exocarta):
_eq_fields = ('tissue', 'vesicle')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
Exocarta.__init__(
self,
ncbi_tax_id = ncbi_tax_id,
database = 'vesiclepedia',
**kwargs
)
[docs]
class Embrace(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'EMBRACE',
ncbi_tax_id = ncbi_tax_id,
input_method = 'embrace.embrace_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Baccin2019(AnnotationBase):
_eq_fields = ('mainclass', 'subclass', 'location')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'Baccin2019',
ncbi_tax_id = ncbi_tax_id,
input_method = 'baccin2019.baccin2019_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Almen2009(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'Almen2009',
ncbi_tax_id = ncbi_tax_id,
input_method = 'almen2009.almen2009_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Italk(AnnotationBase):
_eq_fields = ('mainclass', 'subclass')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'iTALK',
ncbi_tax_id = ncbi_tax_id,
input_method = 'italk.italk_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Cellcellinteractions(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'CellCellInteractions',
ncbi_tax_id = ncbi_tax_id,
input_method = (
'cellcellinteractions.'
'cellcellinteractions_annotations'
),
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Matrisome(AnnotationBase):
_eq_fields = ('mainclass', 'subclass')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'Matrisome',
ncbi_tax_id = ncbi_tax_id,
input_method = 'matrisome.matrisome_annotations',
**kwargs,
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Surfaceome(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Surfaceome',
input_method = 'surfaceome.surfaceome_annotations',
**kwargs
)
def _process_method(self):
_annot = collections.defaultdict(set)
record = collections.namedtuple(
'SurfaceomeAnnotation',
['score', 'mainclass', 'subclasses']
)
record.__defaults__ = (None, None)
for uniprot, a in iteritems(self.data):
_annot[uniprot].add(
record(
a[0],
a[1],
tuple(sorted(a[2])) if a[2] else None,
)
)
self.annot = dict(_annot)
[docs]
class Adhesome(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Adhesome',
input_method = 'adhesome.adhesome_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Cancersea(AnnotationBase):
_eq_fields = ('state',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'CancerSEA',
input_method = 'cancersea.cancersea_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Hgnc(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'HGNC',
input_method = 'hgnc.hgnc_genegroups',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Zhong2015(AnnotationBase):
_eq_fields = ('type',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Zhong2015',
input_method = 'zhong2015.zhong2015_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Opm(AnnotationBase):
_eq_fields = ('membrane',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'OPM',
input_method = 'opm.opm_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Phobius(AnnotationBase):
_eq_fields = (
'tm_helices',
'signal_peptide',
'cytoplasmic',
'non_cytoplasmic',
)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Phobius',
input_method = 'phobius.phobius_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Topdb(AnnotationBase):
_eq_fields = ('membrane',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'TopDB',
input_method = 'topdb.topdb_annotations',
input_args = {
'ncbi_tax_id': ncbi_tax_id,
},
ncbi_tax_id = ncbi_tax_id,
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Cpad(AnnotationBase):
_eq_fields = (
'effect_on_pathway',
'pathway',
'effect_on_cancer',
'cancer' ,
)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'CPAD',
input_method = 'cpad.cpad_annotations',
ncbi_tax_id = ncbi_tax_id,
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Disgenet(AnnotationBase):
_eq_fields = (
'disease',
'type',
)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'DisGeNet',
input_method = 'disgenet.disgenet_annotations',
ncbi_tax_id = ncbi_tax_id,
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Msigdb(AnnotationBase):
_eq_fields = (
'collection',
'geneset',
)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
AnnotationBase.__init__(
self,
name = 'MSigDB',
input_method = 'msigdb.msigdb_annotations',
ncbi_tax_id = ncbi_tax_id,
input_args = {'exclude': ()},
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Integrins(AnnotationBase):
_eq_fields = ()
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Integrins',
input_method = 'integrins.get_integrins',
**kwargs
)
[docs]
class Lrdb(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'LRdb',
input_method = 'lrdb.lrdb_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class HumanProteinAtlas(AnnotationBase):
_eq_fields = ('organ', 'tissue', 'status', 'level', 'pathology')
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'HPA_tissue',
input_method = 'proteinatlas.proteinatlas_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class HumanProteinAtlasSubcellular(AnnotationBase):
_eq_fields = ('location',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'HPA_subcellular',
input_method = (
'proteinatlas.proteinatlas_subcellular_annotations'
),
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class HumanProteinAtlasSecretome(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'HPA_secretome',
input_method = 'proteinatlas.proteinatlas_secretome_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class CancerGeneCensus(AnnotationBase):
_eq_fields = None
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'CancerGeneCensus',
input_method = 'cosmic.cancer_gene_census_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Intogen(AnnotationBase):
_eq_fields = ('type', 'role')
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'IntOGen',
input_method = 'intogen.intogen_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Comppi(AnnotationBase):
_eq_fields = ('location',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'ComPPI',
input_method = 'comppi.comppi_locations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class Ramilowski2015Location(AnnotationBase):
_eq_fields = ('location',)
[docs]
def __init__(self, **kwargs):
AnnotationBase.__init__(
self,
name = 'Ramilowski_location',
input_method = 'ramilowski2015.ramilowski_locations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class CellSurfaceProteinAtlas(AnnotationBase):
_eq_fields = ('high_confidence', 'tm', 'gpi', 'uniprot_cell_surface')
[docs]
def __init__(
self,
ncbi_tax_id = 9606,
**kwargs
):
"""
The name of this resource abbreviated as `CSPA`.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'CSPA',
ncbi_tax_id = ncbi_tax_id,
input_method = 'cspa.cspa_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class CellSurfaceProteinAtlasCellType(AnnotationBase):
_eq_fields = ('cell_type',)
[docs]
def __init__(
self,
ncbi_tax_id = 9606,
**kwargs
):
"""
The name of this resource abbreviated as `CSPA`.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'CSPA_celltype',
ncbi_tax_id = ncbi_tax_id,
input_method = 'cspa.cspa_cell_type_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class HumanPlasmaMembraneReceptome(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
"""
The name of this resource abbreviated as `HPMR`.
"""
AnnotationBase.__init__(
self,
name = 'HPMR',
input_method = 'hpmr.hpmr_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
del self.data
[docs]
class Kinasedotcom(AnnotationBase):
_eq_fields = ('group', 'family')
[docs]
def __init__(self, **kwargs):
"""
Kinases from `kinase.com`.
"""
AnnotationBase.__init__(
self,
name = 'kinase.com',
input_method = 'kinasedotcom.kinasedotcom_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Tfcensus(AnnotationBase):
_eq_fields = ()
[docs]
def __init__(self, **kwargs):
"""
Transcription factors from TF census (Vaquerizas et al 2009).
"""
AnnotationBase.__init__(
self,
name = 'TFcensus',
input_method = 'tfcensus.tfcensus_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Dgidb(AnnotationBase):
_eq_fields = ('category',)
[docs]
def __init__(self, **kwargs):
"""
Druggable proteins from DGIdb (Drug Gene Interaction Database).
"""
AnnotationBase.__init__(
self,
name = 'DGIdb',
input_method = 'dgidb.dgidb_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Phosphatome(AnnotationBase):
_eq_fields = ()
[docs]
def __init__(self, **kwargs):
"""
The list of phosphatases from Chen et al, Science Signaling (2017)
Table S1.
"""
AnnotationBase.__init__(
self,
name = 'Phosphatome',
input_method = 'phosphatome.phosphatome_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
del self.data
[docs]
class Matrixdb(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Protein annotations from MatrixDB.
"""
AnnotationBase.__init__(
self,
name = 'MatrixDB',
ncbi_tax_id = ncbi_tax_id,
input_method = 'matrixdb.matrixdb_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class SignorPathways(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway annotations from Signor.
"""
AnnotationBase.__init__(
self,
name = 'SIGNOR',
ncbi_tax_id = ncbi_tax_id,
input_method = 'signor.signor_pathway_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class SignalinkPathways(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway annotations from SignaLink.
"""
AnnotationBase.__init__(
self,
name = 'SignaLink_pathway',
ncbi_tax_id = ncbi_tax_id,
input_method = 'signalink.signalink_pathway_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class SignalinkFunctions(AnnotationBase):
_eq_fields = ('function',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Functional annotations from SignaLink.
"""
AnnotationBase.__init__(
self,
name = 'SignaLink_function',
ncbi_tax_id = ncbi_tax_id,
input_method = 'signalink.signalink_function_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class KeggPathways(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway annotations from KEGG.
"""
AnnotationBase.__init__(
self,
name = 'KEGG',
ncbi_tax_id = ncbi_tax_id,
input_method = 'kegg.kegg_pathway_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class KeggPathwaysPC(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway annotations from KEGG via PathwayCommons.
"""
AnnotationBase.__init__(
self,
name = 'KEGG-PC',
ncbi_tax_id = ncbi_tax_id,
input_method = 'kegg.kegg_pathway_annotations_pathwaycommons',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class NetpathPathways(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway annotations from NetPath.
"""
AnnotationBase.__init__(
self,
name = 'NetPath',
ncbi_tax_id = ncbi_tax_id,
input_method = 'netpath.netpath_pathway_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Locate(AnnotationBase):
_eq_fields = ('location',)
[docs]
def __init__(
self,
ncbi_tax_id = 9606,
literature = True,
external = True,
predictions = False,
**kwargs
):
input_args = {
'organism': ncbi_tax_id or 9606,
'literature': literature,
'external': external,
'predictions': predictions,
}
AnnotationBase.__init__(
self,
name = 'LOCATE',
input_method = 'locate.locate_localizations',
ncbi_tax_id = ncbi_tax_id,
input_args = input_args,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class GOCustomIntercell(go.GOCustomAnnotation):
[docs]
def __init__(
self,
categories = None,
go_annot = None,
ncbi_tax_id = 9606,
**kwargs
):
"""
Same as :class:``pypath.go.GOCustomAnnotation``
initialized with the categories defined in
``pypath.intercell_annot.intercell_categories``.
"""
categories = categories or intercell_annot.go_combined_classes
go.GOCustomAnnotation.__init__(
self,
categories = categories,
go_annot = go_annot,
ncbi_tax_id = ncbi_tax_id,
)
[docs]
class GOIntercell(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(
self,
categories = None,
go_annot = None,
ncbi_tax_id = 9606,
**kwargs
):
"""
Annotation of proteins based on their roles in intercellular
communication from Gene Ontology.
"""
self.categories = categories
self.go_annot = go_annot
AnnotationBase.__init__(
self,
name = 'GO_Intercell',
ncbi_tax_id = ncbi_tax_id,
**kwargs
)
[docs]
def load(self):
record = collections.namedtuple(
'GOIntercellAnnotation',
('mainclass',),
)
annot = GOCustomIntercell(
categories = self.categories,
go_annot = self.go_annot,
ncbi_tax_id = self.ncbi_tax_id,
)
annot_uniprots = annot.get_annotations()
_annot = collections.defaultdict(set)
for mainclass, uniprots in iteritems(annot_uniprots):
for uniprot in uniprots:
_annot[uniprot].add(record(mainclass = mainclass))
self.annot = dict(_annot)
def _process_method(self, *args, **kwargs):
pass
[docs]
class CellPhoneDB(AnnotationBase):
_eq_fields = ('receptor', 'peripheral', 'secreted', 'transmembrane')
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellPhoneDB',
input_method = 'cellphonedb.cellphonedb_protein_annotations',
ncbi_tax_id = 9606,
**kwargs
)
def _process_method(self, *args, **kwargs):
self.annot = dict(
(uniprot, {annot, })
for uniprot, annot in
iteritems(self.data)
)
[docs]
class Icellnet(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'ICELLNET',
input_method = 'icellnet.icellnet_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Cellcall(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellCall',
input_method = 'cellcall.cellcall_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Cellinker(AnnotationBase):
_eq_fields = ('role', 'location', 'type')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'Cellinker',
input_method = 'cellinker.cellinker_annotations',
ncbi_tax_id = ncbi_tax_id,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Scconnect(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'scConnect',
input_method = 'scconnect.scconnect_annotations',
ncbi_tax_id = ncbi_tax_id,
complexes = False,
)
def _process_method(self, *args, **kwargs):
self.annot = dict(
(k, v)
for k, v in iteritems(self.data)
if not entity.Entity._is_complex(k)
)
delattr(self, 'data')
[docs]
class Biogps(AnnotationBase):
_eq_fields = ('dataset', 'sample', 'probe')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
ncbi_tax_id = ncbi_tax_id or 9606
AnnotationBase.__init__(
self,
name = 'BioGPS',
input_method = 'biogps.biogps_annotations',
input_args = {
'organism': ncbi_tax_id,
},
ncbi_tax_id = ncbi_tax_id,
complexes = (),
infer_complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Cellchatdb(AnnotationBase):
_eq_fields = ('role', 'category')
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellChatDB',
input_method = 'cellchatdb.cellchatdb_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Celltalkdb(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellTalkDB',
input_method = 'celltalkdb.celltalkdb_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Connectomedb(AnnotationBase):
_eq_fields = ('role', 'location')
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'connectomeDB2020',
input_method = 'connectomedb.connectomedb_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Talklr(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'talklr',
input_method = 'talklr.talklr_annotations',
ncbi_tax_id = 9606,
complexes = False,
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class IcellnetComplex(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'ICELLNET_complex',
input_method = 'icellnet.icellnet_annotations',
ncbi_tax_id = 9606,
entity_type = 'complex',
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class CellchatdbComplex(AnnotationBase):
_eq_fields = ('role', 'category')
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellChatDB_complex',
input_method = 'cellchatdb.cellchatdb_annotations',
ncbi_tax_id = 9606,
entity_type = 'complex',
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class CellPhoneDBComplex(CellPhoneDB):
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CellPhoneDB_complex',
input_method = 'cellphonedb.cellphonedb_complex_annotations',
ncbi_tax_id = 9606,
entity_type = 'complex',
**kwargs
)
[docs]
class CellinkerComplex(AnnotationBase):
_eq_fields = ('role', 'location', 'type')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'Cellinker_complex',
input_method = 'cellinker.cellinker_complex_annotations',
ncbi_tax_id = ncbi_tax_id,
entity_type = 'complex',
)
def _process_method(self, *args, **kwargs):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class ScconnectComplex(AnnotationBase):
_eq_fields = ('role',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'scConnect_complex',
input_method = 'scconnect.scconnect_annotations',
ncbi_tax_id = ncbi_tax_id,
entity_type = 'complex',
)
def _process_method(self, *args, **kwargs):
self.annot = dict(
(k, v)
for k, v in iteritems(self.data)
if entity.Entity._is_complex(k)
)
delattr(self, 'data')
[docs]
class HpmrComplex(AnnotationBase):
[docs]
def __init__(self, **kwargs):
_ = kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'HPMR_complex',
input_method = 'hpmr.hpmr_complexes',
ncbi_tax_id = 9606,
entity_type = 'complex',
**kwargs
)
def _process_method(self, *args, **kwargs):
self.annot = dict(
(cplex.__str__(), set())
for cplex in self.data
)
del self.data
[docs]
class Corum(AnnotationBase):
[docs]
def __init__(self, name, annot_attr, **kwargs):
self._annot_attr = annot_attr
AnnotationBase.__init__(
self,
name = name,
input_method = 'corum.corum_complexes',
entity_type = 'complex',
**kwargs
)
def _process_method(self, *args, **kwargs):
record = CorumAnnotation = (
collections.namedtuple(
'CorumAnnotation%s' % self._annot_attr.capitalize(),
(self._annot_attr,),
)
)
self.annot = dict(
(
cplex.__str__(),
set(
record(annot_val)
for annot_val in cplex.attrs[self._annot_attr]
if annot_val != 'None'
)
)
for cplex in self.data.values()
)
del self.data
[docs]
class CorumFuncat(Corum):
[docs]
def __init__(self, **kwargs):
Corum.__init__(
self,
name = 'CORUM_Funcat',
annot_attr = 'funcat',
**kwargs
)
[docs]
class CorumGO(Corum):
[docs]
def __init__(self, **kwargs):
Corum.__init__(
self,
name = 'CORUM_GO',
annot_attr = 'go',
**kwargs
)
[docs]
class LigandReceptor(AnnotationBase):
_eq_fields = ('mainclass',)
[docs]
def __init__(
self,
name,
ligand_col = None,
receptor_col = None,
ligand_id_type = None,
receptor_id_type = None,
record_processor_method = None,
record_extra_fields = None,
record_defaults = None,
extra_fields_methods = None,
**kwargs
):
self.name = name
self.ligand_col = ligand_col
self.receptor_col = receptor_col
self.ligand_id_type = ligand_id_type
self.receptor_id_type = receptor_id_type
self._record_extra_fields = record_extra_fields or ()
self._record_defaults = record_defaults or ()
self._extra_fields_methods = extra_fields_methods or {}
self._set_record_template()
self.record_processor_method = (
record_processor_method or
self._default_record_processor
)
if 'ncbi_tax_id' not in kwargs:
kwargs['ncbi_tax_id'] = 9606
AnnotationBase.__init__(
self,
name = self.name,
**kwargs
)
def _set_record_template(self):
self.record = collections.namedtuple(
'%sAnnotation' % self.name,
('mainclass',) + self._record_extra_fields,
)
self.record.__new__.__defaults__ = () + self._record_defaults
def _default_record_processor(self, record, typ, annot):
i_id = self.ligand_col if typ == 'ligand' else self.receptor_col
id_type = (
self.ligand_id_type if typ == 'ligand' else self.receptor_id_type
)
original_id = record[i_id]
uniprots = mapping.map_name(original_id, id_type, 'uniprot')
for uniprot in uniprots:
annot[uniprot].add(
self.record(
mainclass = typ,
**self._get_extra_fields(record)
)
)
def _get_extra_fields(self, record):
return dict(
(
name,
method(record),
)
for name, method in iteritems(self._extra_fields_methods)
)
def _process_method(self, *args, **kwargs):
annot = collections.defaultdict(set)
for record in self.data:
self.record_processor_method(
record,
typ = 'ligand',
annot = annot,
)
self.record_processor_method(
record,
typ = 'receptor',
annot = annot,
)
self.annot = dict(annot)
[docs]
class Ramilowski2015(LigandReceptor):
[docs]
def __init__(self, load_sources = False, **kwargs):
extra_fields_methods = {
'sources':
lambda record: (
tuple(record[3].split(';')) if load_sources else None
),
}
LigandReceptor.__init__(
self,
name = 'Ramilowski2015',
input_method = 'ramilowski2015.ramilowski_interactions',
record_extra_fields = ('sources',),
extra_fields_methods = extra_fields_methods,
ligand_col = 0,
receptor_col = 1,
ligand_id_type = 'genesymbol',
receptor_id_type = 'genesymbol',
**kwargs
)
[docs]
class Kirouac2010(LigandReceptor):
[docs]
def __init__(self, load_sources = False, **kwargs):
LigandReceptor.__init__(
self,
name = 'Kirouac2010',
input_method = 'kirouac2010.kirouac2010_interactions',
ligand_col = 0,
receptor_col = 1,
ligand_id_type = 'genesymbol',
receptor_id_type = 'genesymbol',
**kwargs
)
[docs]
class GuideToPharmacology(LigandReceptor):
[docs]
def __init__(self, load_sources = False, **kwargs):
LigandReceptor.__init__(
self,
name = 'Guide2Pharma',
input_method = 'guide2pharma.guide2pharma_interactions',
ligand_col = 0,
receptor_col = 2,
ligand_id_type = 'genesymbol',
receptor_id_type = 'uniprot',
**kwargs
)
def _default_record_processor(self, record, typ, annot):
if (
record.ligand_id_type != 'genesymbol' or
record.target_id_type != 'uniprot'
):
return
LigandReceptor._default_record_processor(self, record, typ, annot)
[docs]
class UniprotLocations(AnnotationBase):
_eq_fields = ('location',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Subcellular localizations from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'UniProt_location',
ncbi_tax_id = ncbi_tax_id,
input_method = 'uniprot.uniprot_locations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class UniprotFamilies(AnnotationBase):
_eq_fields = ('family', 'subfamily')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Protein families from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'UniProt_family',
ncbi_tax_id = ncbi_tax_id,
input_method = 'uniprot.uniprot_families',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class UniprotTissues(AnnotationBase):
_eq_fields = ('tissue', 'level')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Tissue expression levels from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'UniProt_tissue',
ncbi_tax_id = ncbi_tax_id,
input_method = 'uniprot.uniprot_tissues',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class UniprotKeywords(AnnotationBase):
_eq_fields = ('keyword',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Tissue expression levels from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'UniProt_keyword',
ncbi_tax_id = ncbi_tax_id,
input_method = 'uniprot.uniprot_keywords',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class UniprotTopologies(AnnotationBase):
_eq_fields = ('topology', 'start', 'end')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Topological domains and transmembrane segments from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'UniProt_topology',
ncbi_tax_id = ncbi_tax_id,
input_method = 'uniprot.uniprot_topology',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Humancellmap(AnnotationBase):
_eq_fields = ('localization', 'method')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Protein families from UniProt.
"""
AnnotationBase.__init__(
self,
name = 'HumanCellMap',
ncbi_tax_id = ncbi_tax_id,
input_method = 'humancellmap.humancellmap_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Tcdb(AnnotationBase):
_eq_fields = ('family', 'tcid')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Topological domains and transmembrane segments from UniProt.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'TCDB',
ncbi_tax_id = ncbi_tax_id,
input_method = 'tcdb.tcdb_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Mcam(AnnotationBase):
_eq_fields = ()
[docs]
def __init__(self, **kwargs):
"""
List of cell adhesion molecules (CAMs) from 10.4137/cin.s341.
"""
AnnotationBase.__init__(
self,
name = 'MCAM',
input_method = 'mcam.mcam_cell_adhesion_molecules',
**kwargs
)
[docs]
class Gpcrdb(AnnotationBase):
_eq_fields = ('gpcr_class', 'family', 'subfamily')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
GPCR classification from GPCRdb - https://gpcrdb.org/.
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'GPCRdb',
ncbi_tax_id = ncbi_tax_id,
input_method = 'gpcrdb.gpcrdb_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Progeny(AnnotationBase):
_eq_fields = ('pathway',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Pathway responsive genes: signatures based on transcriptomics data
from PROGENy (https://github.com/saezlab/progeny).
"""
if 'organism' not in kwargs:
kwargs['organism'] = ncbi_tax_id
AnnotationBase.__init__(
self,
name = 'PROGENy',
ncbi_tax_id = ncbi_tax_id,
input_method = 'progeny.progeny_annotations',
infer_complexes = False,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Celltypist(AnnotationBase):
_eq_fields = ('cell_type', 'cell_subtype')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Cell type markers from the CellTypist database.
"""
AnnotationBase.__init__(
self,
name = 'CellTypist',
ncbi_tax_id = ncbi_tax_id,
input_method = 'celltypist.celltypist_annotations',
infer_complexes = False,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Cytosig(AnnotationBase):
_eq_fields = ('cytokine',)
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Cytokine perturbation signatures from the CytoSig database.
"""
AnnotationBase.__init__(
self,
name = 'CytoSig',
ncbi_tax_id = ncbi_tax_id,
input_method = 'cytosig.cytosig_annotations',
infer_complexes = False,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Panglaodb(AnnotationBase):
_eq_fields = ('cell_type', 'organ')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Cell type markers from PanglaoDB
"""
AnnotationBase.__init__(
self,
name = 'PanglaoDB',
ncbi_tax_id = ncbi_tax_id,
input_method = 'panglaodb.panglaodb_annotations',
infer_complexes = False,
check_ids = False,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Lambert2018(AnnotationBase):
_eq_fields = ('genesymbol', 'is_tf')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Cell type markers from PanglaoDB
"""
AnnotationBase.__init__(
self,
name = 'Lambert2018',
ncbi_tax_id = ncbi_tax_id,
input_method = 'lambert2018.lambert2018_annotations',
infer_complexes = True,
check_ids = False,
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Wang(AnnotationBase):
_eq_fields = ('function', 'location')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Cytokine perturbation signatures from the CytoSig database.
"""
AnnotationBase.__init__(
self,
name = 'Wang',
ncbi_tax_id = ncbi_tax_id,
input_method = 'wang.wang_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class Cancerdrugsdb(AnnotationBase):
[docs]
def __init__(self, **kwargs):
"""
Approved cancer drugs from the Cancer Drugs Database
(https://www.anticancerfund.org/en/cancerdrugs-db).
"""
kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'CancerDrugsDB',
ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC,
input_method = 'cancerdrugsdb.cancerdrugsdb_annotations',
entity_type = 'small_molecule',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')
[docs]
class InterPro(AnnotationBase):
_eq_fields = ('interpro_id', 'start', 'end')
[docs]
def __init__(self, ncbi_tax_id = 9606, **kwargs):
"""
Protein signatures from the InterPro database.
"""
AnnotationBase.__init__(
self,
name = 'InterPro',
ncbi_tax_id = ncbi_tax_id,
input_method = 'interpro.interpro_annotations',
**kwargs
)
def _process_method(self):
self.annot = self.data
delattr(self, 'data')
[docs]
class AnnotationTable(session_mod.Logger):
[docs]
def __init__(
self,
proteins = (),
complexes = (),
protein_sources = None,
complex_sources = None,
use_fields = None,
ncbi_tax_id = 9606,
swissprot_only = True,
use_complexes = True,
keep_annotators = True,
create_dataframe = False,
load = True,
pickle_file = None,
):
"""
Manages a custom set of annotation resources. Loads data and
accepts queries, provides methods for converting the data to
data frame.
:arg set proteins:
A reference set of proteins (UniProt IDs).
:arg set complexes:
A reference set of complexes.
:arg set protein_sources:
Class names providing the protein annotations. If not provided
the module's ``protein_sources_default`` attribute will be used.
:arg set complex_sources:
Class names providing the complex annotations. If not provided
the module's ``complex_sources_default`` attribute will be used.
:arg dict use_fields:
A dict with resource names as keys and tuple of field labels as
values. If provided for any resource only these fields will be
used for constructing the data frame. If `None`, the module's
``default_fields`` settings will be used.
:arg bool use_complexes:
Whether to include complexes in the annotations.
:arg bool create_dataframe:
Whether to create a boolean data frame of annotations, apart
from having the annotator objects.
:arg bool load:
Load the data upon initialization. If `False`, you will have a
chance to call the ``load`` method later.
"""
session_mod.Logger.__init__(self, name = 'annot')
self._module = sys.modules[self.__module__]
self.pickle_file = pickle_file
self.complexes = complexes
self.protein_sources = (
protein_sources
if protein_sources is not None else
protein_sources_default
)
self.complex_sources = (
complex_sources
if complex_sources is not None else
complex_sources_default
)
self.use_fields = use_fields or default_fields
self.ncbi_tax_id = ncbi_tax_id
self.keep_annotators = keep_annotators
self.create_dataframe = create_dataframe
self.proteins = proteins
self.swissprot_only = swissprot_only
self.use_complexes = use_complexes
self.set_reference_set()
self.annots = {}
if load:
self.load()
[docs]
def reload(self):
"""
Reloads the object from the module level.
"""
modname = self.__class__.__module__
mod = __import__(modname, fromlist = [modname.split('.')[0]])
imp.reload(mod)
new = getattr(mod, self.__class__.__name__)
setattr(self, '__class__', new)
def load(self):
if self.pickle_file:
self.load_from_pickle(pickle_file = self.pickle_file)
return
self.set_reference_set()
self.load_protein_resources()
self.load_complex_resources()
if self.create_dataframe:
self.make_dataframe()
def load_from_pickle(self, pickle_file):
self._log('Loading from pickle `%s`.' % pickle_file)
with open(pickle_file, 'rb') as fp:
self.proteins, self.complexes, self.reference_set, annots = (
pickle.load(fp)
)
self.annots = {}
for name, (cls_name, data, record_cls) in iteritems(annots):
self._log(
'Loading from pickle: annotation class `%s`.' % cls_name
)
if record_cls is not None:
modname = record_cls['module']
if modname not in sys.modules:
mod = __import__(
modname,
fromlist = [modname.split('.')[0]],
)
setattr(
sys.modules[modname],
record_cls['name'],
collections.namedtuple(
record_cls['name'],
record_cls['fields'],
),
)
record_cls_new = getattr(
sys.modules[modname],
record_cls['name'],
)
data = dict(
(
key,
set(
record_cls_new(*this_annot)
for this_annot in these_annots
)
)
for key, these_annots in iteritems(data)
)
self._log(
'Reconstituted annotation data for `%s`: '
'dict of length %u.' % (
name,
len(data),
)
)
cls = globals()[cls_name]
try:
self.annots[name] = cls(dump = data)
self._log(
'Instance of annotation class `%s` (resource %s) '
'successfully loaded from pickle.' % (
cls_name,
name,
)
)
# we never want to fail due to any issue with
# one resource:
except Exception as e:
self._log(
'ERROR: Failed to create instance of `%s` '
'with data loaded from the pickle.' % cls_name
)
self._log_traceback()
self._log('Loaded from pickle `%s`.' % pickle_file)
def save_to_pickle(self, pickle_file):
def get_record_class(annot):
for val in annot.values():
for elem in val:
return elem.__class__
self._log('Saving to pickle `%s`.' % pickle_file)
for annot in self.annots.values():
annot._update_complex_attribute_classes()
with open(pickle_file, 'wb') as fp:
classes = dict(
(
name,
get_record_class(annot.annot)
)
for name, annot in iteritems(self.annots)
)
annots = dict(
(
name,
(
annot.__class__.__name__,
dict(
(
key,
set(
tuple(this_annot)
for this_annot in these_annots
)
)
for key, these_annots in iteritems(annot.annot)
),
{
'name': classes[name].__name__,
'module': classes[name].__module__,
'fields': classes[name]._fields,
}
if classes[name] else None
)
)
for name, annot in iteritems(self.annots)
)
pickle.dump(
obj = (
self.proteins,
self.complexes,
self.reference_set,
annots,
),
file = fp,
protocol = pickle.HIGHEST_PROTOCOL,
)
self._log('Saved to pickle `%s`.' % pickle_file)
def set_reference_set(self):
self.proteins, self.complexes, self.reference_set = (
AnnotationBase.get_reference_set(
proteins = self.proteins,
complexes = self.complexes,
use_complexes = self.use_complexes,
ncbi_tax_id = self.ncbi_tax_id,
swissprot_only = self.swissprot_only,
)
)
self.rows = dict(
reversed(i)
for i in enumerate(self.reference_set)
)
def load_protein_resources(self):
self._load_resources(self.protein_sources, self.proteins)
def load_complex_resources(self):
self._load_resources(self.complex_sources, self.complexes)
def _load_resources(self, definitions, reference_set):
for cls in definitions:
cls = cls if callable(cls) else getattr(self._module, cls)
total_attempts = settings.get('annot_load_resource_attempts')
for attempt in range(total_attempts):
try:
self._log(
f'Loading annotation resource `{cls.__name__}`; '
f'attempt {attempt + 1} of {total_attempts}.'
)
annot = cls(
ncbi_tax_id = self.ncbi_tax_id,
reference_set = reference_set,
)
self.annots[annot.name] = annot
self._log(
f'Successfully loaded resource `{cls.__name__}` '
f'({annot.name}).'
)
break
except Exception as e:
exc = sys.exc_info()
self._log(
'Failed to load annotations from resource `%s`:' % (
cls.__name__ if hasattr(cls, '__name__') else str(cls)
)
)
self._log_traceback()
def make_dataframe(self, reference_set = None):
if self.create_dataframe:
self.df = self.to_dataframe(reference_set = reference_set)
def ensure_array(self, reference_set = None, rebuild = False):
if not hasattr(self, 'data') or rebuild:
self.make_array(reference_set = reference_set)
def to_array(self, reference_set = None):
reference_set = reference_set or self.reference_set
names = []
arrays = []
for resource in self.annots.values():
# skipping HPA for now because too large number of
# annotations, it would take very long:
if resource.name == 'HPA':
continue
use_fields = (
self.use_fields[resource.name]
if resource.name in self.use_fields else
None
)
this_names, this_array = resource.to_array(
reference_set = reference_set,
use_fields = (
self.use_fields[resource.name]
if resource.name in self.use_fields else
None
),
)
names.extend(this_names)
arrays.append(this_array)
names = np.array(list(itertools.chain(names)))
data = np.hstack(arrays)
return names, data
def make_array(self, reference_set = None):
self.names, self.data = self.to_array(reference_set = reference_set)
self.set_cols()
def set_cols(self):
self.cols = dict((name, i) for i, name in enumerate(self.names))
def keep(self, keep):
ikeep = np.array([
i for i, name in enumerate(self.names) if name in keep
])
self.names = self.names[ikeep]
self.data = self.data[:, ikeep]
self.set_cols()
def make_sets(self):
self.ensure_array()
self.sets = dict(
(
name,
set(self.reference_set[self.data[:, i]])
)
for i, name in enumerate(self.names)
)
def annotate_network(self, pa):
nodes = pa.graph.vs['name']
edges = [
(
nodes[e.source],
nodes[e.target]
)
for e in pa.graph.es
]
nodeannot = []
edgeannot = []
for i, uniprot in enumerate(nodes):
for name, uniprots in iteritems(self.sets):
if uniprot in uniprots:
nodeannot.append((name, i))
for i, (uniprot1, uniprot2) in enumerate(edges):
for name1, uniprots1 in iteritems(self.sets):
for name2, uniprots2 in iteritems(self.sets):
if uniprot1 in uniprots1 and uniprot2 in uniprots2:
edgeannot.append((name1, name2, i))
return nodeannot, edgeannot
def network_stats(self, pa):
nodeannot, edgeannot = self.annotate_network(pa)
nodestats = collections.Counter('__'.join(n[0]) for n in nodeannot)
edgestats = collections.Counter(
tuple(sorted(('__'.join(e[0]), '__'.join(e[1]))))
for e in edgeannot
)
return nodestats, edgestats
def export_network_stats(self, pa):
nodestats, edgestats = self.network_stats(pa)
with open('annot_edgestats2.tsv', 'w') as fp:
_ = fp.write('\t'.join(('name1', 'name2', 'count')))
_ = fp.write('\n')
_ = fp.write('\n'.join(
'%s\t%s\t%u' % (name1, name2, cnt)
for (name1, name2), cnt in iteritems(edgestats)
))
with open('annot_nodestats2.tsv', 'w') as fp:
_ = fp.write('\t'.join(('name', 'count')))
_ = fp.write('\n')
_ = fp.write('\n'.join(
'%s\t%u' % (name, cnt)
for name, cnt in iteritems(nodestats)
))
def to_dataframe(self, reference_set = None):
self._log('Creating data frame from AnnotationTable.')
self.ensure_array(
reference_set = reference_set,
rebuild = reference_set is not None,
)
colnames = ['__'.join(name) for name in self.names]
df = pd.DataFrame(
data = self.data,
index = self.reference_set,
columns = colnames,
)
self._log(
'Created annotation data frame, memory usage: %s.' % (
common.df_memory_usage(self.df)
)
)
return df
def make_narrow_df(self):
self._log('Creating narrow data frame from AnnotationTable.')
for annot in self.annots.values():
annot.make_df()
self.narrow_df = pd.concat(
annot.df for annot in self.annots.values()
).astype(AnnotationBase._dtypes)
self._log(
'Created annotation data frame, memory usage: %s.' % (
common.df_memory_usage(self.narrow_df)
)
)
[docs]
def search(self, protein):
"""
Returns a dictionary with all annotations of a protein. Keys are the
resource names.
"""
return dict(
(
resource,
annot.annot[protein]
)
for resource, annot in iteritems(self.annots)
if protein in annot.annot
)
[docs]
def all_annotations(self, entity):
"""
Returns all annotation records for one protein in a single list.
"""
return [
aa
for a in self.annots.values()
if entity in a.annot
for aa in a.annot[entity]
]
[docs]
def all_annotations_str(self, protein):
"""
Returns all annotation records for one protein serialized.
"""
return '; '.join(
str(a) for a in
self.all_annotations(protein = protein)
)
def update_summaries(self):
self.summaries = dict(
(
name,
a.summary
)
for name, a in iteritems(self.annots)
)
def summaries_tab(self, outfile = None, return_table = False):
columns = (
('name', 'Resource'),
('n_total', 'Entities'),
('n_records_total', 'Records'),
('records_per_entity', 'Records per entity'),
('n_proteins', 'Proteins'),
('pct_proteins', 'Proteins [%]'),
('n_protein_records', 'Protein records'),
('n_complexes', 'Complexes'),
('pct_complexes', 'Complexes [%]'),
('n_complex_records', 'Complex records'),
('complex_annotations_inferred', 'Inferred complex annotations'),
('n_mirnas', 'miRNA'),
('pct_mirnas', 'miRNA [%]'),
('n_mirna_records', 'miRNA records'),
('references', 'References'),
('curation_effort', 'Curation effort'),
('fields', 'Fields'),
)
tab = []
tab.append([f[1] for f in columns])
tab.extend([
[
str(self.summaries[src][f[0]])
for f in columns
]
for src in sorted(self.summaries.keys())
])
if outfile:
with open(outfile, 'w') as fp:
fp.write('\n'.join('\t'.join(row) for row in tab))
if return_table:
return tab
def get_entities(self, entity_type = None):
entity_type = common.to_set(entity_type)
entities = set.union(*(
set(an.annot.keys())
for an in self.annots.values()
))
return entity.Entity.filter_entity_type(
entities,
entity_type = entity_type,
)
def get_proteins(self):
return self.get_entities(entity_type = 'protein')
def get_complexes(self):
return self.get_entities(entity_type = 'complex')
def get_mirnas(self):
return self.get_entities(entity_type = 'mirna')
def numof_entities(self, entity_type = None):
return len(self.get_entities(entity_type = entity_type))
def numof_proteins(self):
return len(self.get_proteins())
def numof_complexes(self):
return len(self.get_complexes())
def numof_mirnas(self):
return len(self.get_mirnas())
def numof_records(self, entity_type = None):
return sum(
an.numof_records(entity_types = entity_type)
for an in self.annots.values()
)
def numof_resources(self):
return len(self.annots)
def __repr__(self):
return (
'<Annotation database: %u records about %u '
'entities from %u resources>' % (
self.numof_records(),
self.numof_entities(),
self.numof_resources(),
)
)
def __getitem__(self, item):
if isinstance(item, _const.SIMPLE_TYPES):
if item in self.annots:
return self.annots[item]
elif item in self:
return self.search(item)
else:
return dict(
(it, self[it])
for it in item
)
def __contains__(self, item):
return (
item in self.annots or
any(item in a for a in self.annots.values())
)
[docs]
def init_db(
keep_annotators = True,
create_dataframe = False,
use_complexes = True,
**kwargs
):
"""
Initializes or reloads the annotation database.
The database will be assigned to the ``db`` attribute of this module.
"""
globals()['db'] = AnnotationTable(
keep_annotators = keep_annotators,
create_dataframe = create_dataframe,
use_complexes = use_complexes,
**kwargs
)
[docs]
def get_db(
keep_annotators = True,
create_dataframe = False,
use_complexes = True,
**kwargs
):
"""
Retrieves the current database instance and initializes it if does
not exist yet.
"""
if 'db' not in globals():
init_db(
keep_annotators = keep_annotators,
create_dataframe = create_dataframe,
use_complexes = use_complexes,
**kwargs
)
return globals()['db']
[docs]
class HPO(AnnotationBase):
_eq_fields = ()
[docs]
def __init__(self, **kwargs):
"""
HPO Gene Annotations from the HPO database.
"""
kwargs.pop('ncbi_tax_id', None)
AnnotationBase.__init__(
self,
name = 'HPO',
ncbi_tax_id = _const.NOT_ORGANISM_SPECIFIC,
input_method = 'hpo.hpo_annotations',
**kwargs
)
def _process_method(self):
# already the appropriate format, no processing needed
self.annot = self.data
delattr(self, 'data')