Source code for pypath.evidence

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#  (Planned for) centrally handling cache for all databases/resources.
#
#  Copyright
#  2014-2020
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  File author(s): Dénes Türei (turei.denes@gmail.com)
#                  Nicolàs Palacio
#                  Olga Ivanova
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      http://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: http://pypath.omnipathdb.org/
#

"""
Provides classes for representing and processing evidences supporting
relationships. The evidences hold information about the databases and
literature references, they can be organized into collections. A number
of operations are available on evidences and their collections, for
example they can be combined or filtered.
"""

from future.utils import iteritems

import importlib as imp
import copy

import pypath.refs as refs
import pypath.common as common
import pypath.session_mod as session_mod
import pypath.entity as entity

_logger = session_mod.Logger(name = 'evidence')
_log = _logger._log


[docs]class Evidence(object): """ Represents an evidence supporting a relationship such as molecular interaction, molecular complex, enzyme-PTM interaction, annotation, etc. The evidence consists of two main parts: the database and the literature references. If a relationship is supprted by multiple databases, for each one `Evidence` object should be created and :arg pypath.resource.ResourceAttributes resource: An object derived from :py:class:`pypath.resource.ResourceAttributes`. :arg str,list,set,NoneType references: Optional, one or more literature references (preferably PubMed IDs). """ __slots__ = [ 'resource', 'references', ] def __init__(self, resource, references = None): self.resource = resource self.references = self._process_references(references)
[docs] def reload(self): """ Reloads the object from the module level. """ modname = self.__class__.__module__ mod = __import__(modname, fromlist = [modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new)
@staticmethod def _process_references(references): references = common.to_set(references) return ( set( ( refs.Reference(ref) if not isinstance(ref, refs.Reference) else ref ) for ref in references ) ) def __hash__(self): return self.resource.__hash__() def __eq__(self, other): return ( self.resource == other or ( hasattr(other, 'resource') and self.resource == other.resource ) ) def __iadd__(self, other): """ This will ignore if the other evidence is from different resource: still better than attributing wrong references to a resource. """ if self == other: self.references.update(other.references) else: _log( 'Warning: attempt to merge evidences from different ' 'resources. Ignoring the second evidence.' ) return self def __add__(self, other): return Evidence( resource = self.resource, references = self.references | other.references, ) @property def key(self): return self.resource.key
[docs] def merge(self, other): """ Merges two evidences. Returns set of either one or two evidences depending on whether the two evidences are from the same resource. """ if self == other: self += other return {self} else: return {self, other}
def __repr__(self): return '<Evidence %s (%s%u references)>' % ( self.resource.name, 'via %s,' % self.resource.via if self.resource.via else '', len(self.references), ) def __copy__(self): return Evidence( resource = self.resource, references = copy.copy(self.references), ) def __contains__(self, other): """ :arg str,tuple,Reference other: Either a reference or a database name, or a tuple of a database name and an interaction type or a tuple of a database, interaction type and a primary database (or None if the query should be limited only to primary databases). """ return self._contains(self, other) def contains_database(self, database): return self.resource.name == database def contains_reference(self, reference): return reference in self.references def has_database_via(self, database, via): return ( self.resource.name == database and self.resource.via == via )
[docs] def has_interaction_type( self, interaction_type, database = None, via = False, ): """ If ``via`` is ``False`` then it will be ignored, otherwise if ``None`` only primary resources are considered. """ return ( self.resource.interaction_type == interaction_type and ( not database or self.resource.name == database ) and ( via == False or self.resource.via == via ) )
@staticmethod def _contains(obj, other): if isinstance(other, refs.Reference): return obj.contains_reference(other) # this makes possible to accept a NetworkResource or a # NetworkResourceKey: if ( hasattr(other, 'name') and hasattr(other, 'interaction_type') and hasattr(other, 'via') ): other = (other.name, other.interaction_type, other.via) other = other if isinstance(other, tuple) else (other,) return ( obj.contains_database(other[0]) and ( len(other) == 1 or obj.has_interaction_type(other[1], other[0]) ) and ( len(other) <= 2 or obj.has_database_via(other[0], other[2]) ) ) def has_data_model(self, data_model): return self.resource.data_model == data_model def match( self, resource = None, data_model = None, interaction_type = None, via = False, references = None, ): def _match(attr, value): return ( getattr(self.resource, attr) in value if isinstance(value, common.list_like) else getattr(self.resource, attr) == value ) resource = ( resource.resource if isinstance(resource, Evidence) else resource ) interaction_type = ( resource.interaction_type if ( interaction_type is None and hasattr(resource, 'interaction_type') ) else interaction_type ) via = ( resource.via if ( via is None and hasattr(resource, 'via') ) else via ) data_model = ( resource.data_model if hasattr(resource, 'data_model') else data_model ) references = common.to_set(references) return ( ( resource is None or ( self.resource.name in resource if isinstance(resource, set) else self.resource == resource ) ) and ( interaction_type is None or _match('interaction_type', interaction_type) ) and ( via == False or _match('via', via) ) and ( not references or self.references & references ) and ( not data_model or _match('data_model', data_model) ) )
[docs]class Evidences(object): """ A collection of evidences. All evidences supporting a relationship such as molecular interaction, molecular complex, enzyme-PTM interaction, annotation, etc should be collected in one `Evidences` object. This way the set of evidences can be queried a comprehensive way. :arg tuple,list,set,Evidences evidences: An iterable providing :py:class:`Evidence` instances. It is possible to create an empty evidence collection and populate it later or to show this way that certain relationship has no supporting evidences. """ __slots__ = [ 'evidences', ] def __init__(self, evidences = ()): self.evidences = {} self.__iadd__(evidences)
[docs] def reload(self): """ Reloads the object from the module level. """ modname = self.__class__.__module__ mod = __import__(modname, fromlist = [modname.split('.')[0]]) imp.reload(mod) new = getattr(mod, self.__class__.__name__) setattr(self, '__class__', new) new_ev_class = getattr(mod, 'Evidence') for ev in self: ev.__class__ = new_ev_class
def __iadd__(self, other): other = ( other if hasattr(other, '__iter__') else (other,) if isinstance(other, self.__class__) else () ) for ev in other: if ev.key in self.evidences: self.evidences[ev.key] = self.evidences[ev.key] + ev else: self.evidences[ev.key] = ev.__copy__() return self def __add__(self, other): if not isinstance(other, self.__class__): return self.__copy__() return Evidences( ( self.evidences[key].__copy__() if key not in other.evidences else other.evidences[key].__copy__() if key not in self.evidences else self.evidences[key] + other.evidences[key] ) for key in set(self.evidences.keys()) | set(other.evidences.keys()) ) def __radd__(self, other): return self.__add__(other) def __sub__(self, other): return Evidences( ev for ev in self if ev not in other ) def intersection(self, other): return Evidences( self.evidences[key] + other.evidences[key] for key in set(self.evidences.keys()) & set(other.evidences.keys()) ) def __iter__(self): for ev in self.evidences.values(): yield ev def __repr__(self): return '<Evidences: %s (%u references)>' % ( ( ', '.join(sorted(set(ev.resource.name for ev in self))) if self else 'None' ), ( len(set.union(*(ev.references for ev in self))) if self else 0 ), ) def __copy__(self): return Evidences((ev.__copy__() for ev in self)) def __bool__(self): return bool(len(self.evidences)) def __contains__(self, other): """ :arg str,tuple,Reference other: Either a reference or a database name, or a tuple of a database name and an interaction type or a tuple of a database, interaction type and a primary database (or None if the query should be limited only to primary databases). """ return Evidence._contains(self, other) def __and__(self, other): other = self._foreign_resources_set(other) this = self._resident_resources_set(other) return this & other def __or__(self, other): other = self._foreign_resources_set(other) this = self._resident_resources_set(other) return this | other @staticmethod def _foreign_resources_set(resources): other = common.to_set(resources) return { ( res.resource if hasattr(res, 'resource') else res ) for res in resources } def _resident_resources_set(self, other = None): return ( {ev.resource.name for ev in self} if ( hasattr(other, '__iter__') and all(isinstance(res, common.basestring) for res in other) ) else {ev.resource for ev in self} ) def __eq__(self, other): return {ev.resource for ev in self} == {ev.resource for ev in other} def __len__(self): return self.count_resources() def count_resources(self, **kwargs): return len(self.filter(**kwargs)) def get_resources(self, **kwargs): return { ev.resource for ev in self.filter(**kwargs) } def get_resources_via(self, **kwargs): return { (ev.resource, ev.resource.via) for ev in self.filter(**kwargs) } def get_resource_names_via(self, **kwargs): return { (ev.resource.name, ev.resource.via) for ev in self.filter(**kwargs) } def count_references(self, **kwargs): return len(self.get_references(**kwargs)) def get_references(self, **kwargs): evidences = self.filter(**kwargs) return { ref for ev in evidences for ref in ev.references } def count_curation_effort(self, **kwargs): return len(self.get_curation_effort(**kwargs)) def get_curation_effort(self, **kwargs): evidences = self.filter(**kwargs) return { (ev.resource, ref) for ev in evidences for ref in ev.references } def contains_database(self, database): return any(ev.resource.name == database for ev in self) def contains_reference(self, reference): return any(reference in ev.references for ev in self) def has_database_via(self, database, via): return any( ev.has_database_via(database, via) for ev in self )
[docs] def has_interaction_type( self, interaction_type, database = None, via = False, ): """ If ``via`` is ``False`` then it will be ignored, otherwise if ``None`` only primary resources are considered. """ return any( ev.has_interaction_type(interaction_type, database, via) for ev in self )
def has_data_model(self, data_model): return any(ev.has_data_model(data_model) for ev in self) def get_resources(self): return {ev.resource for ev in self} def get_resource_names(self): return {ev.resource.name for ev in self} def get_interaction_types(self): return {ev.resource.interaction_type for ev in self} def get_data_models(self): return {ev.resource.data_model for ev in self} def __isub__(self, other): if isinstance(other, self.__class__): self.evidences = dict( (key, ev) for key, ev in iteritems(self.evidences) if key not in other.evidences or other.evidences[key] != ev ) else: self.remove(other) return self def remove(self, resource = None, interaction_type = None, via = False): self.evidences = dict( (key, ev) for key, ev in iteritems(self.evidences) if not ev.match( resource = resource, interaction_type = interaction_type, via = via, ) ) def filter( self, resource = None, data_model = None, interaction_type = None, via = False, references = None, ): return ( ev for ev in self if ev.match( resource = resource, data_model = data_model, interaction_type = interaction_type, via = via, references = references, ) ) def match( self, resource = None, data_model = None, interaction_type = None, via = False, references = None, ): return bool( tuple( self.filter( resource = resource, data_model = data_model, interaction_type = interaction_type, via = via, references = references, ) ) )