#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#from__future__importannotationsfromfuture.utilsimportiteritemsfrompast.builtinsimportxrange,range,reduceimportosimportsysimportcopyimportimportlibasimpimportcollectionsimportitertoolsimporttracebackimportdillaspickleimportnumpyasnpimportpandasaspdimportpypath.inputs.cellphonedbascellphonedbimportpypath.inputs.lrdbaslrdbimportpypath.inputs.uniprot_dbasuniprot_dbimportpypath.share.commonascommonimportpypath_common._constantsas_constimportpypath.share.settingsassettingsimportpypath.utils.mappingasmappingimportpypath.utils.reflistsasreflistsimportpypath.utils.uniprotasutils_uniprotimportpypath.internals.resourceasresourceimportpypath.utils.goasgoimportpypath.core.intercell_annotasintercell_annotimportpypath.core.commonascore_commonimportpypath.share.sessionassession_modimportpypath.internals.annot_formatsasannot_formatsimportpypath.core.complexascompleximportpypath.internals.interaasinteraimportpypath.core.entityasentity#TODO this should be part of json filesprotein_sources_default={'Dgidb','Membranome','Exocarta','Vesiclepedia','Matrisome','Surfaceome','CellSurfaceProteinAtlas','CellSurfaceProteinAtlasCellType','HumanPlasmaMembraneReceptome','Matrixdb','Locate','GOIntercell','CellPhoneDB','Ramilowski2015','Ramilowski2015Location','Kirouac2010','GuideToPharmacology','Adhesome','Integrins','Opm','Topdb','Hgnc','Zhong2015','HumanProteinAtlas','HumanProteinAtlasSubcellular','HumanProteinAtlasSecretome','Comppi','SignorPathways','SignalinkPathways','SignalinkFunctions','KeggPathways','KeggPathwaysPC','NetpathPathways','Cpad','Disgenet','Kinasedotcom','Phosphatome','Tfcensus','Intogen','CancerGeneCensus','Cancersea','Msigdb','Lrdb','Baccin2019','Almen2009','Phobius','Icellnet','Cellcellinteractions','Italk','Embrace','UniprotLocations','UniprotFamilies','UniprotTopologies','UniprotTissues','UniprotKeywords','Tcdb','Mcam','Gpcrdb','Celltalkdb','Cellchatdb','Connectomedb','Talklr','Humancellmap','Cellcall',#'Biogps','Cellinker','Scconnect','Cancerdrugsdb','Progeny','Celltypist','Cytosig','Wang','Panglaodb','Lambert2018','InterPro',}#TODO this should be part of json filescomplex_sources_default={'CellPhoneDBComplex','CorumFuncat','CorumGO','IcellnetComplex','CellchatdbComplex','CellinkerComplex','ScconnectComplex',}#TODO this should be part of json filesdefault_fields={'Matrisome':('mainclass','subclass'),'Locate':('location',),'Vesiclepedia':('vesicle',),'Exocarta':('vesicle',),'Ramilowski_location':('location',),'HPA':('tissue','level'),'CellPhoneDB':('receptor','adhesion','cytoplasm','peripheral','secretion','secreted','transporter','transmembrane','extracellular',),'CellPhoneDB_Complex':('receptor','adhesion','cytoplasm','peripheral','secretion','secreted','transporter','transmembrane','extracellular',),'Cpad':('cancer','effect_on_cancer',),'Disgenet':('disease',),}
[docs]def__init__(self,class_definitions=None,excludes=None,excludes_extra=None,build=True,pickle_file=None,annotdb_pickle_file=None,composite_resource_name=None,):""" :param tuple class_definitions: A series of annotation class definitions, each represented by an instance of ``pypath.internals.annot_formats.AnnotDef``. These definitions carry the attributes and instructions to populate the classes. :param dict excludes: A dict with parent category names (strings) or category keys (tuples) as keys and sets if identifiers as values. The identifiers in this dict will be excluded from all the respective categories while building the database. E.g. if the UniProt ID `P00533` (EGFR) is in the set under the key of `adhesion` it will be excluded from the category `adhesion` and all it's direct children. :param dict excludes_extra: Same kind of dict as `excludes` but it will be added to the built-in default. The built in and the provided extra sets will be merged. If you want to overwrite or modify the built-in sets provide your custom dict as `excludes`. :param bool build: Execute the build upon instantiation or set up an empty object the build can be executed on later. """ifnothasattr(self,'_log_name'):session_mod.Logger.__init__(self,name='annot')self.pickle_file=pickle_fileself.annotdb_pickle_file=annotdb_pickle_fileself._class_definitions_provided=class_definitionsself._excludes_original=excludesor{}self._excludes_extra_original=excludes_extraor{}self.network=Noneself.classes={}self.consensus_scores={}self.composite_numof_resources={}self.composite_resource_name=(composite_resource_nameorsettings.get('annot_composite_database_name'))ifbuild:self.load()
[docs]defreload(self):""" Reloads the object from the module level. """imp.reload(core_common)modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)imp.reload(annot_formats)new_annotkey=annot_formats.AnnotDefKeynew_annotgroup=annot_formats.AnnotationGroupforkey,clsiniteritems(self.classes):key.__class__=new_annotkeycls.__class__=new_annotgroup
[docs]defupdate_parents(self):""" Creates a dict :py:attr:``children`` with parent class names as keys and sets of children class keys as values. Also a dict :py:attr:``parents`` with children class keys as keys and parent class keys as values. """children=collections.defaultdict(set)parents=collections.defaultdict(set)collect_parents=collections.defaultdict(set)# collecting the potential parentsforkey,classdefiniteritems(self._class_definitions):ifclassdef.source=='composite':collect_parents[classdef.name].add(key)# assigning children to parentsforkey,classdefiniteritems(self._class_definitions):parent=key[1]ifparentincollect_parents:forparent_keyincollect_parents[parent]:children[parent_key].add(key)parents[key].add(parent_key)parents[key[0]].add(parent_key)parents[(key[0],key[1])].add(parent_key)parents[(key[0],key[2])].add(parent_key)parents[key[2]].add(parent_key)self.children=dict(children)self.parents=dict(parents)
[docs]defpopulate_classes(self,update=False):""" Creates a classification of proteins according to the custom annotation definitions. """ifself.pickle_file:self.load_from_pickle(pickle_file=self.pickle_file)returnforclassdefinself._class_definitions.values():ifclassdef.keynotinself.classesorupdate:self.create_class(classdef)self.populate_scores()
[docs]defpopulate_scores(self):""" Creates the consensus score dictionaries based on the number of resources annotating an entity for each composite category. """forclassdefinself._class_definitions.values():ifclassdef.source!='composite':continuecomponents=self._execute_operation(classdef.resource,execute=False,only_generic=True,)name=classdef.name#components = self._collect_by_parent(#classdef.resource,#only_generic = True,#)n_resources=len(components)n_resources_by_entity=dict(collections.Counter(itertools.chain(*components)))self.composite_numof_resources[name]=n_resourcesself.consensus_scores[name]=n_resources_by_entity
defload_from_pickle(self,pickle_file):self._log('Loading from pickle `%s`.'%pickle_file)withopen(pickle_file,'rb')asfp:(self.classes,self.consensus_scores,self.composite_numof_resources,self.parents,self.children,self.composite_resource_name,self._class_definitions,self._excludes,)=pickle.load(fp)self._update_complex_attribute_classes()self._log('Loaded from pickle `%s`.'%pickle_file)defsave_to_pickle(self,pickle_file):self._log('Saving to pickle `%s`.'%pickle_file)self._update_complex_attribute_classes()withopen(pickle_file,'wb')asfp:pickle.dump(obj=(self.classes,self.consensus_scores,self.composite_numof_resources,self.parents,self.children,self.composite_resource_name,self._class_definitions,self._excludes,),file=fp,protocol=pickle.HIGHEST_PROTOCOL,)self._log('Saved to pickle `%s`.'%pickle_file)def_update_complex_attribute_classes(self):complex.ComplexAggregator._update_complex_attribute_classes_static(self.classes.keys(),mod=sys.modules[__name__],)
[docs]defcreate_class(self,classdef,override=False):""" Creates a category of entities by processing a custom definition. """ifclassdef.enabledoroverride:self.classes[classdef.key]=self.process_annot(classdef)
[docs]defprocess_annot(self,classdef):""" Processes an annotation definition and returns a set of identifiers. """members=set()ifnotclassdef.enabled:returnmembersself._log('Processing custom annotation definition ''`%s` (parent: `%s`, resource: `%s`).'%classdef.key)ifisinstance(classdef.resource,set):members=classdef.resourceelifisinstance(classdef.resource,str):ifclassdef.resourceinself.annotdb.annots:ifnotclassdef.args:members=(self.annotdb.annots[classdef.resource].to_set())else:members=(self.annotdb.annots[classdef.resource].select(**classdef.args))# Automatically include direct complex annotationscplex_resource='%s_complex'%classdef.resourceifcplex_resourceinself.annotdb.annots:classdef_args=classdef._asdict()classdef_args['resource']=cplex_resourcecplex_classdef=annot_formats.AnnotDef(**classdef_args)members.update(self.process_annot(cplex_classdef))elif(classdef.resource.startswith('~')orclassdef.resource.startswith('#')):members=self._execute_operation(annot_formats.AnnotOp(annots=classdef.resource))else:self._log('Resource not found: %s'%classdef.resource)elifcallable(classdef.resource):members=classdef.resource(**(classdef.argsor{}))elifisinstance(classdef.resource,annot_formats.AnnotOp):members=self._execute_operation(classdef.resource)foravoidinclassdef.avoid:op=annot_formats.AnnotOp(annots=(members,self.select(avoid)),op=set.difference,)members=self._execute_operation(op)forlimitinclassdef.limit:op=annot_formats.AnnotOp(annots=(members,self.select(limit)),op=set.intersection,)members=self._execute_operation(op)ifclassdef.exclude:members=members-classdef.excludeifclassdef.parentinself._excludes:members=members-self._excludes[classdef.parent]ifclassdef.keyinself._excludes:members=members-self._excludes[classdef.key]transmitter,receiver=self._get_transmitter_receiver(classdef)self._log('Finished processing custom annotation definition ''`%s` (parent: `%s`, resource: `%s`). Resulted a set of %u ''entities.'%(classdef.key+(len(members),)))returnannot_formats.AnnotationGroup(members=members,name=classdef.name,parent=classdef.parent,aspect=classdef.aspect,resource=classdef.resource_name,# the actual database namescope=classdef.scope,source=classdef.source,# resource_specific / compositetransmitter=transmitter,receiver=receiver,)
def_execute_operation(self,annotop,execute=True,**kwargs):""" Executes a set operation on anntation sets. """ifself._is_short_notation(annotop):annots=self._collect_by_parent(annotop,**kwargs)op=set.unionelifself._is_short_notation(annotop.annots):annots=self._collect_by_parent(annotop.annots,**kwargs)op=annotop.opelse:annots=tuple(self.select(_annot,execute=execute,**kwargs)for_annotinannotop.annotsif(nothasattr(_annot,'enabled')or_annot.enabled))annots=tuple(itertools.chain(*((a,)ifisinstance(a,set)elseaforainannots)))op=annotop.opifexecute:annots=op(*(aifisinstance(a,set)elseset(a)forainannots))returnannotsdef_collect_by_parent(self,parent,only_generic=False):""" Processes the shorthand (single string) notation `[#name]~parent[~resource]`. Returns tuple of sets. """name,parent,resource=self._process_short_notation(parent)returntuple(self.select(classdef.key)forclassdefinself._class_definitions.values()if(classdef.parent==parentand(notresourceorclassdef.resource_name==resource)andclassdef.enabledandnot(classdef.name==classdef.parentand(classdef.source=='composite'orclassdef.resource_name==resource))and(notonly_genericorclassdef.scope=='generic')))@staticmethoddef_process_short_notation(shortdef):""" Extracts name, parent and resource froms the shorthand (single string) notation `[#name]~parent[~resource]`. """parent=shortdefname=Noneresource=Noneifparent.startswith('#'):name,parent=parent.split('~',maxsplit=1)name=name.strip('#')parent=parent.strip('~')parent_resource=parent.split('~')iflen(parent_resource)==2:parent,resource=parent_resourcereturnname,parent,resource@staticmethoddef_is_short_notation(obj):return(isinstance(obj,str)and(obj.startswith('~')orobj.startswith('#')))def_get_transmitter_receiver(self,classdef):transmitter=classdef.transmitterreceiver=classdef.receiveriftransmitterisNoneorreceiverisNone:name,parent,resource=classdef.keyforkey,parentdefiniteritems(self._class_definitions):if(parentdef.name==parentand(parentdef.source=='composite'orparentdef.resource==self.composite_resource_name)):transmitter=(transmitteriftransmitterisnotNoneelseparentdef.transmitter)receiver=(receiverifreceiverisnotNoneelseparentdef.receiver)breakreturntransmitter,receiverdef_select(self,name,parent=None,resource=None,entity_type=None,execute=True,**kwargs):""" Retrieves a class by its name and loads it if hasn't been loaded yet but the name present in the class definitions. """selected=Noneifself._is_short_notation(name):annots=self._collect_by_parent(name,**kwargs)annots=tuple(aifisinstance(a,set)elseset(a)forainannots)selected=set.union(*annots)ifexecuteelseannotselse:ifisinstance(name,tuple):name,parent,resource=nameifnotparentornotresource:ifnotparent:parent=self.get_parent(name=name,resource=resource)parent=parent.nameifparentelseNoneifnotresource:resource=self.get_resource(name=name,parent=parent)key=annot_formats.AnnotDefKey(name,parent,resource)ifkeynotinself.classesandkeyinself._class_definitions:self.create_class(self._class_definitions[key])ifkeyinself.classes:selected=self.classes[key]ifselectedisnotNone:returnself._filter_entity_type(selected,entity_type=entity_type,)self._log('No such annotation class: `name=%s, ''parent=%s, resource=%s`'%key)
[docs]defselect(self,definition,parent=None,resource=None,entity_type=None,**kwargs):""" Retrieves a class by its name or definition. The definition can be a class name (string) or a set of entities, or an AnnotDef object defining the contents based on original resources or an AnnotOp which defines the contents as an operation over other definitions. """selected=(self._execute_operation(definition)ifisinstance(definition,annot_formats.AnnotOp)elseself.process_annot(definition)ifisinstance(definition,annot_formats.AnnotDef)elsedefinitionifisinstance(definition,annot_formats._set_type)elseself._select(*definition)ifisinstance(definition,(tuple,list))elseself._select(**definition)ifisinstance(definition,dict)elseself._select(definition,parent=parent,resource=resource,**kwargs))returnself._filter_entity_type(selected,entity_type=entity_type)
# synonym for old nameget_class=select
[docs]deflabels(self,name,parent=None,resource=None,entity_type=None,):""" Same as ``select`` but returns a list of labels (more human readable). """returnmapping.label(self.select(name=name,parent=parent,resource=resource,entity_type=entity_type,))
[docs]defshow(self,name,parent=None,resource=None,**kwargs):""" Same as ``select`` but prints a table to the console with basic information from the UniProt datasheets. """utils_uniprot.info(*self.select(definition=name,parent=parent,resource=resource,entity_type='protein',),**kwargs)
[docs]defquality_check_table(self,path=None,fmt='tsv',only_swissprot=True,top=None,**kwargs):""" Exports a table in tsv format for quality check and browsing purposes. Each protein represented in one row of this table with basic data from UniProt and the list of annotation categories from this database. :param str path: Path for the exported file. :param str fmt: Format: either `tsv` or `latex`. """features=kwargs['features']if'features'inkwargselse()proteins=list(self.get_proteins())ifonly_swissprot:proteins=reflists.select(proteins,'swissprot')genesymbols=[mapping.label(uniprot)foruniprotinproteins]proteins=[uniprotforuniprot,genesymbolinsorted(((uniprot,genesymbol)foruniprot,genesymbolinzip(proteins,genesymbols)),key=lambdait:it[1],)][:top]tbl=utils_uniprot.collect(proteins,*features)tbl['intercell_composite']=[', '.join(clsforclsinself.classes_by_entity(uniprot,labels=True)ifcls.endswith(self.composite_resource_name))foruniprotinproteins]tbl['intercell_all']=[', '.join(clsforclsinself.classes_by_entity(uniprot,labels=True)ifnotcls.endswith(self.composite_resource_name))foruniprotinproteins]iffmt=='tsv':result=common.tsv_table(tbl=tbl,path=path,**kwargs)elif'tex'infmt:if'colformat'notinkwargs:kwargs['colformat']=r'rllrrK{25mm}LK{20mm}K{20mm}K{25mm}L'result=common.latex_table(tbl=tbl,path=path,**kwargs)else:result=tblreturnresult
[docs]defget_parents(self,name,parent=None,resource=None):""" As names should be unique for resources, a combination of a name and resource determines the parent category. This method looks up the parent for a pair of name and resource. """parent=parentornamekeys=((name,parent,resource),(name,name,resource),(name,resource),(name,parent),(parent,resource),)forkeyinkeys:ifkeyinself.parents:returnself.parents[key]
[docs]defget_resources(self,name,parent=None):""" Returns a set with the names of all resources defining a category with the given name and parent. """parent=parentornamereturn{key[2]forkeyinself._class_definitions.keys()ifkey[0]==nameandkey[1]==parent}
[docs]defget_resource(self,name,parent=None):""" For a category name and its parent returns a single resource name. If a category belonging to the composite database matches the name and the parent the name of the composite database will be returned, otherwise the resource name first in alphabetic order. """resources=self.get_resources(name=name,parent=parent)return(self.composite_resource_nameifself.composite_resource_nameinresourceselsesorted(resources)[0]ifresourceselseNone)
[docs]defmake_df(self,all_annotations=False,full_name=False):""" Creates a ``pandas.DataFrame`` where each record assigns a molecular entity to an annotation category. The data frame will be assigned to the ``df`` attribute. """self._log('Creating data frame from custom annotation.')header=['category','parent','database','scope','aspect','source','uniprot','genesymbol','entity_type','consensus_score',]dtypes={'category':'category','parent':'category','database':'category','scope':'category','aspect':'category','source':'category','uniprot':'category','genesymbol':'category','entity_type':'category','consensus_score':'uint16',}iffull_name:header.insert(-1,'full_name')dtypes['full_name']='category'# this won't be needed any more I guess#self.collect_classes()self.df=pd.DataFrame([# annotation category, entity id[annotgroup.name,annotgroup.parent,annotgroup.resource,annotgroup.scope,annotgroup.aspect,annotgroup.source,uniprot.__str__(),(mapping.map_name0(uniprot,'uniprot','genesymbol')ifisinstance(uniprot,str)else'COMPLEX:%s'%uniprot.genesymbol_strifhasattr(uniprot,'genesymbol_str')elseuniprot.__str__()),]+# full name(['; '.join(mapping.map_name(uniprot,'uniprot','protein-name',)),]iffull_nameelse[])+# entity type and consensus score[('complex'ifhasattr(uniprot,'genesymbol_str')else'mirna'ifuniprot.startswith('MIMAT')else'protein'),self.consensus_score(annotgroup.name,uniprot,),]+# all annotations([self.annotdb.all_annotations_str(uniprot)]ifall_annotationselse[])forkey,annotgroupiniteritems(self.classes)foruniprotinannotgroup],columns=header+(['all_annotations']ifall_annotationselse[]),).astype(dtypes)self._log('Custom annotation data frame has been created. ''Memory usage: %s.'%common.df_memory_usage(self.df))
[docs]defget_df(self):""" Returns the data frame of custom annotations. If it does not exist yet builds the data frame. """ifnothasattr(self,'df'):self.make_df()returnself.df
[docs]defcounts(self,entity_type='protein',labels=True,**kwargs):""" Returns a dict with number of elements in each class. :param bool labels: Use keys or labels as keys in the returned dict. All other arguments passed to ``iter_classes``. """returndict((cls.labeliflabelselsecls.key,cls.count_entity_type(entity_type=entity_type))forclsinself.iter_classes(**kwargs)iflen(cls)>0)
# synonymcounts_by_class=countsdefcounts_df(self,groupby=None,**kwargs):df=self.filtered(**kwargs)# n.b. pandas is horrible, I can't understand how it could got# released for production use, how one can build business on it???groupby=groupbyor['category','parent','database']df=df.groupby(groupby)counts=df.uniprot.nunique().reset_index()counts.rename(columns={'uniprot':'n_uniprot'},inplace=True)df=df.agg('head',n=1).reset_index()df.drop(['uniprot','entity_type','genesymbol','index'],axis=1,inplace=True,)df=df.merge(counts,on=groupby)returndfdefiter_classes(self,**kwargs):returnself.filter_classes(classes=self.classes.values(),**kwargs)
[docs]@staticmethoddeffilter_classes(classes,**kwargs):""" Returns a list of annotation classes filtered by their attributes. ``kwargs`` contains attributes and values. """classes=classesreturn(clsforclsinclassesifall(common.eq(val,getattr(cls,attr))forattr,valiniteritems(kwargs)))
[docs]deffilter(self,entity_type=None,**kwargs):""" Filters the annotated entities by annotation class attributes and ``entity_type``. ``kwargs`` passed to ``filter_classes``. """returnset(itertools.chain(*(cls.filter_entity_type(entity_type=entity_type)forclsinself.iter_classes(**kwargs))))
[docs]defnetwork_df(self,annot_df=None,network=None,combined_df=None,network_args=None,annot_args=None,annot_args_source=None,annot_args_target=None,entities=None,entities_source=None,entities_target=None,only_directed=False,only_undirected=False,only_signed=None,only_effect=None,only_proteins=False,swap_undirected=True,undirected_orientation=None,entities_or=False,):""" Combines the annotation data frame and a network data frame. Creates a ``pandas.DataFrame`` where each record is an interaction between a pair of molecular enitities labeled by their annotations. network : pypath.network.Network,pandas.DataFrame A ``pypath.network.Network`` object or a data frame with network data. combined_df : pandas.DataFrame Optional, a network data frame already combined with annotations for filtering only. resources : set,None Use only these network resources. entities : set,None Limit the network only to these molecular entities. entities_source : set,None Limit the source side of network connections only to these molecular entities. entities_target : set,None Limit the target side of network connections only to these molecular entities. annot_args : dict,None Parameters for filtering annotation classes; note, the defaults might include some filtering, provide an empty dict if you want no filtering at all; however this might result in huge data frame and consequently memory issues. Passed to the ``filtered`` method. annot_args_source : dict,None Same as ``annot_args`` but only for the source side of the network connections. These override ``annot_args`` but all the criteria not defined here will be applied from ``annot_args``. annot_args_target : dict,None Same as ``annot_args`` but only for the target side of the network connections. These override ``annot_args`` but all the criteria not defined here will be applied from ``annot_args``. only_directed : bool Use only the directed interactions. only_undirected : bool Use only the undirected interactions. Specifically for retrieving and counting the interactions without direction information. only_effect : int,None Use only the interactions with this effect. Either -1 or 1. only_proteins : bool Use only the interactions where each of the partners is a protein (i.e. not complex, miRNA, small molecule or other kind of entity). swap_undirected : bool Convert undirected interactions to a pair of mutual interactions. undirected_orientation : str,None Ignore the direction at all interactions and make sure all of them have a uniform orientation. If `id`, all interactions will be oriented by the identifiers of the partenrs; if `category`, the interactions will be oriented by the categories of the partners. """ifhasattr(self,'interclass_network'):combined_df=self.interclass_networkparam_str=', '.join(['network_args=[%s]'%common.dict_str(network_args),'annot_args=[%s]'%common.dict_str(annot_args),'annot_args_source=[%s]'%common.dict_str(annot_args_source),'annot_args_target=[%s]'%common.dict_str(annot_args_target),'entities=%s'%common.none_or_len(entities),'entities_source=%s'%common.none_or_len(entities_source),'entities_target=%s'%common.none_or_len(entities_target),'only_directed=%s'%only_directed,'only_undirected=%s'%only_undirected,'only_signed=%s'%only_signed,'only_effect=%s'%only_effect,'only_proteins=%s'%only_proteins,'swap_undirected=%s'%swap_undirected,'entities_or=%s'%entities_or,])ifcombined_dfisnotNone:self._log('Using previously created network-annotation data frame. ''Parameters %s'%param_str)network_df=Noneelse:self._log('Combining custom annotation with network data frame. ''Parameters %s'%param_str)network_df=(self._network_df(network)ifnetworkisnotNoneelseself.network)ifnetwork_dfisNoneandcombined_dfisNone:self._log('No network provided, no default network set.')return_network_args={'only_proteins':only_proteins,'only_effect':only_effect,'only_signed':only_signed,'only_directed':only_directed,'only_undirected':only_undirected,'entities':entities,'source_entities':entities_source,'target_entities':entities_target,'swap_undirected':swap_undirected,'entities_or':entities_or,}_network_args.update(network_argsor{})ifnotentities_or:entities_source=entities_sourceorentitiesorset()entities_target=entities_targetorentitiesorset()_annot_args_source=(annot_argsor{}).copy()_annot_args_source.update(annot_args_source)_annot_args_source['entities']=entities_source_annot_args_target=(annot_argsor{}).copy()_annot_args_target.update(annot_args_target)_annot_args_target['entities']=entities_targetifonly_proteins:_annot_args_source['entity_type']='protein'_annot_args_target['entity_type']='protein'ifcombined_dfisNone:network_df=core_common.filter_network_df(df=network_df,**_network_args)annot_df_source=self.filtered(annot_df=annot_df,**_annot_args_source)annot_df_target=self.filtered(annot_df=annot_df,**_annot_args_target)annot_network_df=pd.merge(network_df,annot_df_source,suffixes=['','_a'],how='inner',left_on='id_a',right_on='uniprot',)annot_network_df.id_a=annot_network_df.id_a.astype('category')annot_network_df=pd.merge(annot_network_df,annot_df_target,suffixes=['_a','_b'],how='inner',left_on='id_b',right_on='uniprot',)annot_network_df.id_b=annot_network_df.id_b.astype('category')# these columns are duplicatesannot_network_df.drop(labels=['type_a','type_b','uniprot_a','uniprot_b'],inplace=True,axis='columns',)else:combined_df=core_common.filter_network_df(df=combined_df,**_network_args)combined_df=self.filtered(annot_df=combined_df,postfix='_a',**_annot_args_source)combined_df=self.filtered(annot_df=combined_df,postfix='_b',**_annot_args_target)annot_network_df=combined_dfifundirected_orientation:# which columns we consider for the orientationby=undirected_orientationby=byifbyin{'id','category'}else'category'by_col_a=getattr(annot_network_df,'%s_a'%by)by_col_b=getattr(annot_network_df,'%s_b'%by)# indices of the records with the wrong orientationidx_wrong_orient=[a>bfora,binzip(by_col_a,by_col_b)]# split the data framewrong_orient=annot_network_df.iloc[idx_wrong_orient].copy()good_orient=annot_network_df.iloc[np.logical_not(idx_wrong_orient)].copy()column_order=list(annot_network_df.columns)# swap the orientationcolumn_map=dict((col,common.swap_suffix(col))forcolincolumn_order)wrong_orient=wrong_orient.rename(columns=column_map)# make sure the column order is correctwrong_orient=wrong_orient[column_order]# concatenate the slicesorientation_swapped=pd.concat([good_orient,wrong_orient])orientation_swapped=orientation_swapped.drop_duplicates(subset=['id_a','id_b','type','category_a','category_b','parent_a','parent_b','source_a','source_b','scope_a','scope_b','entity_type_a','entity_type_b',])# removing direction and effect columns# as they are not valid any moreorientation_swapped.drop(['directed','effect'],axis=1,inplace=True,)annot_network_df=orientation_swappedself._log('Combined custom annotation data frame with network data frame. ''Memory usage: %s.'%common.df_memory_usage(annot_network_df))returnannot_network_df
# this became a synonymfilter_interclass_network=network_df
[docs]defset_interclass_network_df(self,**kwargs):""" Creates a data frame of the whole inter-class network and keeps it assigned to the instance in order to make subsequent queries faster. """self.unset_interclass_network_df()self.interclass_network=self.get_interclass_network_df(**kwargs)
[docs]defget_interclass_network_df(self,**kwargs):""" If the an interclass network is already present the ``network`` and other ``kwargs`` provided not considered. Otherwise these are passed to ``network_df``. """return(self.interclass_networkifhasattr(self,'interclass_network')elseself.network_df(**kwargs))
defunset_interclass_network_df(self):ifhasattr(self,'interclass_network'):delself.interclass_network## Below only thin wrappers to make the interface more intuitive# without knowing the argument names### Building a network of connections between classes#definter_class_network(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)definter_class_network_undirected(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):kwargs.update({'only_undirected':True})returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)definter_class_network_directed(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):kwargs.update({'only_directed':True})returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)definter_class_network_signed(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):kwargs.update({'only_signed':True})returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)definter_class_network_stimulatory(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):kwargs.update({'only_directed':True,'only_effect':1,})returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)definter_class_network_inhibitory(self,annot_args_source=None,annot_args_target=None,network=None,**kwargs):kwargs.update({'only_directed':True,'only_effect':-1,})returnself.network_df(network=network,annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs)## Counting connections between classes (total)#defcount_inter_class_connections(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroups# synonymcount_inter_class_connections_all=count_inter_class_connectionsdefcount_inter_class_connections_undirected(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network_undirected(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroupsdefcount_inter_class_connections_directed(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network_directed(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroupsdefcount_inter_class_connections_signed(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network_signed(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroupsdefcount_inter_class_connections_stimulatory(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network_stimulatory(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroupsdefcount_inter_class_connections_inhibitory(self,annot_args_source=None,annot_args_target=None,**kwargs):returnself.inter_class_network_inhibitory(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(['id_a','id_b'],as_index=False).ngroups## Class to class connection counts#
[docs]defclass_to_class_connections(self,**kwargs):""" ``kwargs`` passed to ``filter_interclass_network``. """network=self.network_df(**kwargs)self._log('Counting connections between classes.')return(network.groupby(['category_a','category_b','id_a','id_b']).size().groupby(level=['category_a','category_b']).size())
[docs]defdegree_inter_class_network(self,annot_args_source=None,annot_args_target=None,degrees_of='target',**kwargs):""" degrees_of : str Either *source* or *target*. Count the degrees for the source or the target class. """id_cols=('id_a','id_b')groupby,unique=(id_colsifdegrees_of=='source'elsereversed(id_cols))degrees=(self.inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs).groupby(groupby)[unique].nunique())returndegrees[degrees!=0]
defdegree_inter_class_network_undirected(self,annot_args_source=None,annot_args_target=None,**kwargs):kwargs.update({'only_undirected':True})return(self.degree_inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs))defdegree_inter_class_network_directed(self,annot_args_source=None,annot_args_target=None,**kwargs):kwargs.update({'only_directed':True})return(self.degree_inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs))defdegree_inter_class_network_stimulatory(self,annot_args_source=None,annot_args_target=None,**kwargs):kwargs.update({'only_directed':True,'only_effect':1,})return(self.degree_inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs))defdegree_inter_class_network_inhibitory(self,annot_args_source=None,annot_args_target=None,**kwargs):kwargs.update({'only_directed':True,'only_effect':-1,})return(self.degree_inter_class_network(annot_args_source=annot_args_source,annot_args_target=annot_args_target,**kwargs))defdegree_inter_class_network_2(self,degrees_of='target',sum_by_class=True,**kwargs):network=self.network_df(**kwargs)id_cols=('id_a','id_b')groupby,unique=(id_colsifdegrees_of=='source'elsereversed(id_cols))ifsum_by_class:groupby_cat=('category_a'ifdegrees_of=='source'else'category_b')groupby=[groupby,groupby_cat]degrees=network.groupby(groupby)[unique].nunique()ifsum_by_class:degrees=degrees.groupby(groupby_cat).sum()returndegrees[degrees!=0]defdegree_inter_class_network_undirected_2(self,**kwargs):kwargs.update({'only_undirected':True,'degrees_of':'source'})deg_source=self.degree_inter_class_network_2(**kwargs)kwargs.update({'only_undirected':True,'degrees_of':'target'})deg_target=self.degree_inter_class_network_2(**kwargs)returncommon.sum_dicts(deg_source,deg_target)defdegree_inter_class_network_directed_2(self,**kwargs):kwargs.update({'only_directed':True})returnself.degree_inter_class_network_2(**kwargs)defdegree_inter_class_network_stimulatory_2(self,**kwargs):kwargs.update({'only_effect':1})returnself.degree_inter_class_network_2(**kwargs)defdegree_inter_class_network_inhibitory_2(self,**kwargs):kwargs.update({'only_effect':-1})returnself.degree_inter_class_network_2(**kwargs)## End of wrappers#
[docs]defregister_network(self,network):""" Sets ``network`` as the default network dataset for the instance. All methods afterwards will use this network. Also it discards the interclass network data frame if it present to make sure future queries will address the network registered here. """self.unset_interclass_network_df()self.network=self._network_df(network)
@staticmethoddef_network_df(network):ifnothasattr(network,'df')andhasattr(network,'make_df'):network.make_df()return(network.dfifhasattr(network,'df')elsenetwork)deffiltered(self,annot_df=None,entities=None,**kwargs):annot_df=self.get_df()ifannot_dfisNoneelseannot_dfreturnself.filter_df(annot_df=annot_df,entities=entities,**kwargs)@classmethoddeffilter_df(cls,annot_df,entities=None,postfix=None,**kwargs):query=cls._process_query_args(df=annot_df,entities=entities,args=kwargs,postfix=postfix,)args=cls._args_add_postfix(args,postfix)query=' and '.join(query)returnannot_df.query(query)ifqueryelseannot_df@staticmethoddef_process_query_args(df,args,entities=None,postfix=None):query=[]forcol,valiniteritems(args):col='%s%s'%(col,postfix)ifpostfixelsecolifvalisnotNoneandcolindf.columns:op='=='ifisinstance(val,_const.SIMPLE_TYPES)else'in'q='%s%s%s'%(col,op,'@args["%s"]'%col)query.append(q)ifentities:entity_cols={'id','genesymbol','uniprot'}ifpostfix:entity_cols={'%s%s'%(col,postfix)forcolinentity_cols}entity_cols=entity_cols&set(df.columns)q='(%s)'%(' or '.join('%s in @entities'%colforcolinentity_cols))query.append(q)returnquery@staticmethoddef_args_add_postfix(args,postfix):ifpostfix:args=dict(('%s%s'%(key,postfix),val)forkey,valiniteritems(args))returnargsdefexport(self,fname,**kwargs):self.make_df()self.df.to_csv(fname,**kwargs)
[docs]defclasses_by_entity(self,element,labels=False):""" Returns a set of class keys with the classes containing at least one of the elements. :param str,set element: One or more element (entity) to search for in the classes. :param bool labels: Return labels instead of keys. """element=common.to_set(element)returnset(cls.labeliflabelselsekeyforkey,clsiniteritems(self.classes)ifelement&cls)
defentities_by_resource(self,entity_types=None,**kwargs):by_resource=collections.defaultdict(set)forkey,clsiniteritems(self.classes):by_resource[cls.resource].update(cls.filter_entity_type(entity_type=entity_types))returndict(by_resource)# TODO: this kind of methods should be implemented by metaprogrammingdefproteins_by_resource(self):returnself.entities_by_resource(entity_types='protein')defcomplexes_by_resource(self):returnself.entities_by_resource(entity_types='complex')defmirnas_by_resource(self):returnself.entities_by_resource(entity_types='mirna')defcounts_by_resource(self,entity_types=None):returndict((resource,len(entities))forresource,entitiesiniteritems(self.entities_by_resource(entity_types=entity_types)))defget_entities(self,entity_types=None):returnentity.Entity.filter_entity_type(set.union(*(set(a)forainself.classes.values()))ifself.classeselse(),entity_type=entity_types,)# TODO: this kind of methods should be implemented by metaprogrammingdefget_proteins(self):returnself.get_entities(entity_types='protein')defget_complexes(self):returnself.get_entities(entity_types='complex')defget_mirnas(self):returnself.get_entities(entity_types='mirna')defnumof_entities(self,entity_types=None):returnlen(self.get_entities(entity_types=entity_types))# TODO: this kind of methods should be implemented by metaprogrammingdefnumof_proteins(self):returnself.numof_entities(entity_types='protein')defnumof_complexes(self):returnself.numof_entities(entity_types='complex')defnumof_mirnas(self):returnself.numof_entities(entity_types='mirna')defnumof_classes(self):returnlen(self.classes)defnumof_records(self,entity_types=None):returnsum(cls.count_entity_type(entity_type=entity_types)forclsinself.classes.values())# TODO: this kind of methods should be implemented by metaprogrammingdefnumof_protein_records(self):returnself.numof_records(entity_types='protein')defnumof_complex_records(self):returnself.numof_records(entity_types='complex')defnumof_mirna_records(self):returnself.numof_records(entity_types='mirna')
[docs]defresources_in_category(self,key):""" Returns a list of resources contributing to the definition of a category. """ifnotisinstance(key,tuple):key=(key,key,self.composite_resource_name)ifkeyinself.children:returnsorted({child.resourceforchildinself.children[key]})
[docs]defbrowse(self,start:int=0,**kwargs):""" Print gene information as a table. Presents information about annotation classes as ascii tables printed in the terminal. If one class provided, prints one table. If multiple classes provided, prints a table for each of them one by one proceeding to the next one once you hit return. If no classes provided goes through all classes. ``kwargs`` passed to ``pypath.utils.uniprot.info``. """classes=dict((cls.label,cls.filter_entity_type(entity_type='protein'))forclsinself.iter_classes(**kwargs))utils_uniprot.browse(groups=classes,start=start,**kwargs)
[docs]def__init__(self,name,ncbi_tax_id=9606,input_method=None,input_args=None,entity_type='protein',swissprot_only=True,proteins=(),complexes=(),reference_set=(),infer_complexes=None,dump=None,primary_field=None,check_ids=True,**kwargs):""" Represents annotations for a set of proteins. Loads the data from the original resource and provides methods to query the annotations. :arg str name: A custom name for the annotation resource. :arg int ncbi_tax_id: NCBI Taxonomy identifier. :arg callable,str input_method: Either a callable or the name of a method in any submodules of the ``pypath.inputs`` module. Should return a dict with UniProt IDs as keys or an object suitable for ``process_method``. :arg dict input_args: Arguments for the ``input_method``. """session_mod.Logger.__init__(self,name='annot')input_args=input_argsor{}input_args.update(kwargs)resource.AbstractResource.__init__(self,name=name,ncbi_tax_id=ncbi_tax_id,input_method=input_method,input_args=input_args,dump=dump,data_attr_name='annot',)self.entity_type=entity_typeself.primary_field=primary_fieldinfer_complexes=(infer_complexesifisinstance(infer_complexes,bool)elsesettings.get('annot_infer_complexes'))self.infer_complexes=(infer_complexesandself.entity_type=='protein')self.proteins=proteinsself.complexes=complexesself.reference_set=reference_setself.swissprot_only=swissprot_onlyself.check_ids=check_idsself.load()
[docs]defreload(self):""" Reloads the object from the module level. """modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)
[docs]defload(self):""" Loads the annotation data by calling the input method. Infers annotations for complexes in the complex database if py:attr:``infer_complexes`` is True. """self._log('Loading annotations from `%s`.'%self.name)self.set_reference_set()resource.AbstractResource.load(self)self._ensure_swissprot()self._update_primary_field()ifself.infer_complexes:self.add_complexes_by_inference()self._log('Loaded annotations from `%s`: %u molecules, %u annotations.'%(self.name,self.numof_entities(),self.numof_records(),))
[docs]defadd_complexes_by_inference(self,complexes=None):""" Creates complex annotations by in silico inference and adds them to this annotation set. """complex_annotation=self.complex_inference(complexes=complexes)self.annot.update(complex_annotation)
[docs]defcomplex_inference(self,complexes=None):""" Annotates all complexes in `complexes`, by default in the default complex database (existing in the `complex` module or generated on demand according to the module's current settings). Returns ------- Dict with complexes as keys and sets of annotations as values. Complexes with no valid information in this annotation resource won't be in the dict. Parameters ---------- complexes : iterable Iterable yielding complexes. """self._log('Inferring complex annotations from `%s`.'%self.name)ifnotcomplexes:importpypath.core.complexascomplexcomplexdb=complex.get_db()complexes=complexdb.complexes.values()complex_annotation=collections.defaultdict(set)forcplexincomplexes:this_cplex_annot=self.annotate_complex(cplex)ifthis_cplex_annotisnotNone:complex_annotation[cplex].update(this_cplex_annot)returncomplex_annotation
[docs]defannotate_complex(self,cplex):""" Infers annotations for a single complex. """if(notall(compinselfforcompincplex.components.keys())orself._eq_fieldsisNone):# this means no annotation for this complexreturnNoneelifnotself._eq_fields:# here empty set means the complex belongs# to the class of enitities covered by this# annotationreturnset()elifcallable(self._eq_fields):# here a custom method combines the annotations# we look at all possible combinations of the annotations# of the components, but most likely each component have# only one annotation in this casereturnset(self._eq_fields(*annots)forannotsinitertools.product(*(self.annot[comp]forcompincplex.components.keys())))elifhasattr(self,'_merge'):returnself._merge(*(self.annot[comp]forcompincplex.components.keys()))else:groups=collections.defaultdict(set)empty_args={}cls=Nonecomponents=set(cplex.components.keys())forcompincplex.components.keys():forcomp_annotinself.annot[comp]:ifclsisNone:cls=comp_annot.__class__empty_args=dict((f,None)forfincomp_annot._fieldsiffnotinself._eq_fields)groups[tuple(getattr(comp_annot,f)forfinself._eq_fields)].add(comp)returnset(# the characteristic attributes of the group# and the remaining left emptycls(**dict(zip(self._eq_fields,key)),**empty_args)# checking all groupsforkey,groupiniteritems(groups)# and accepting the ones covering all members of the complexifgroup==components)orNone
[docs]defload_proteins(self):""" Retrieves a set of all UniProt IDs to have a base set of the entire proteome. """self.uniprots=set(uniprot_db.all_uniprots(organism=self.ncbi_tax_id))
[docs]@staticmethoddefget_reference_set(proteins=(),complexes=(),use_complexes=False,ncbi_tax_id=9606,swissprot_only=True,):""" Retrieves the reference set i.e. the set of all entities which potentially have annotation in this resource. Typically this is the proteome of the organism from UniProt optionally with all the protein complexes from the complex database. """proteins=(proteinsorsorted(uniprot_db.all_uniprots(organism=ncbi_tax_id,swissprot=swissprot_only,)))ifuse_complexes:importpypath.core.complexascomplexcomplexes=(complexesorsorted(complex.all_complexes()))reference_set=sorted(itertools.chain(proteins,complexes,))returnproteins,complexes,reference_set
[docs]defset_reference_set(self):""" Assigns the reference set to the :py:attr``reference_set`` attribute. The reference set is the set of all entities which potentially have annotation in this resource. Typically this is the proteome of the organism from UniProt optionally with all the protein complexes from the complex database. """ifnotself.reference_set:ifself.ncbi_tax_id==_const.NOT_ORGANISM_SPECIFIC:proteins,complexes,reference_set=(set(),)*3else:proteins,complexes,reference_set=self._get_reference_set()self.proteins=proteinsself.complexes=complexesself.reference_set=reference_set
defhas_complexes(self):returnself.entity_type=='complex'orself.infer_complexesdef_process_method(self,*args,**kwargs):""" By default it converts a set to dict of empty sets in order to make it compatible with other methods. Derived classes might override. """self.annot=dict((u,set())foruinself.data)
[docs]defselect(self,method=None,entity_type=None,**kwargs):""" Retrieves a subset by filtering based on ``kwargs``. Each argument should be a name and a value or set of values. Elements having the provided values in the annotation will be returned. Returns a set of UniProt IDs. """result=set()names=set(self.get_names())ifnotall(kinnamesforkinkwargs.keys()):raiseValueError('Unknown field names: %s'%(', '.join(sorted(set(kwargs.keys())-names))))foruniprot,annotiniteritems(self.annot):forainannot:# we either call a method on all records# or check against conditions provided in **kwargsif(notcallable(method)ormethod(a))andall((# simple agreement(getattr(a,name)==value)# custom method returns boolor(callable(value)andvalue(getattr(a,name)))# multiple value in annotation slot# and value is a set: checking if they have# any in commonor(isinstance(getattr(a,name),_const.LIST_LIKE)andisinstance(value,set)andset(getattr(a,name))&value)# search value is a set, checking if contains# the record's valueor(isinstance(value,set)andgetattr(a,name)invalue)# record's value contains multiple elements# (set, list or tuple), checking if it contains# the search valueor(isinstance(getattr(a,name),_const.LIST_LIKE)andvalueingetattr(a,name)))forname,valueiniteritems(kwargs)):result.add(uniprot)breakresult=entity.Entity.filter_entity_type(result,entity_type)returnresult
# synonym for old nameget_subset=select
[docs]deflabels(self,method=None,**kwargs):""" Same as ``select`` but returns a list of labels (more human readable). """returnmapping.label(self.select(method=method,**kwargs))
[docs]defshow(self,method=None,table_param=None,**kwargs):""" Same as ``select`` but prints a table to the console with basic information from the UniProt datasheets. """table_param=table_paramor{}utils_uniprot.info(*self.select(method=method,**kwargs),**table_param)
[docs]defget_subset_bool_array(self,reference_set=None,**kwargs):""" Returns a boolean vector with True and False values for each entity in the reference set. The values represent presence absence data in the simplest case, but by providing ``kwargs`` any kind of matching and filtering is possible. ``kwargs`` are passed to the ``select`` method. """reference_set=reference_setorself.reference_setsubset=self.get_subset(**kwargs)returnnp.array([entityinsubsetforentityinreference_set])
[docs]defto_bool_array(self,reference_set):""" Returns a presence/absence boolean array for a reference set. """total=self.to_set()returnnp.array([entityintotalforentityinreference_set])
[docs]defto_set(self):""" Returns the entities present in this annotation resource as a set. """returnset(self.annot.keys())
[docs]defall_entities(self,entity_types=None):""" All entities annotated in this resource. """entity_types=self._entity_types(entity_types)returnsorted((kforkinself.annot.keys()ifself._match_entity_type(k,entity_types)))
[docs]defall_proteins(self):""" All UniProt IDs annotated in this resource. """returnsorted((kforkinself.annot.keys()ifself.is_protein(k)))
[docs]defall_complexes(self):""" All protein complexes annotated in this resource. """returnsorted((kforkinself.annot.keys()ifself.is_complex(k)))
[docs]defall_mirnas(self):""" All miRNAs annotated in this resource. """returnsorted((kforkinself.annot.keys()ifself.is_mirna(k)))
[docs]defnumof_records(self,entity_types=None):""" The total number of annotation records. """entity_types=self._entity_types(entity_types)returnsum(max(len(a),1)fork,ainiteritems(self.annot)ifself._match_entity_type(k,entity_types))
[docs]defnumof_entities(self):""" The number of annotated entities in the resource. """returnlen(self.annot)
def_numof_entities(self,entity_types=None):entity_types=self._entity_types(entity_types)returnlen([kforkinself.annot.keys()ifself._match_entity_type(k,entity_types)])defnumof_proteins(self):returnself._numof_entities(entity_types={'protein'})defnumof_mirnas(self):returnself._numof_entities(entity_types={'mirna'})defnumof_complexes(self):returnself._numof_entities(entity_types={'complex'})def__repr__(self):return('<%s annotations: %u records about %u entities>'%(self.name,self.numof_records(),self.numof_entities(),))
[docs]defto_array(self,reference_set=None,use_fields=None):""" Returns an entity vs feature array. In case of more complex annotations this might be huge. """use_fields=(use_fieldsor(default_fields[self.name]ifself.nameindefault_fieldselseNone))self._log('Creating boolean array from `%s` annotation data.'%self.name)reference_set=reference_setorself.reference_setall_fields=self.get_names()fields=use_fieldsorall_fieldsifields=tuple(ifori,fieldinenumerate(all_fields)iffieldinfields)result=[((self.name,),self.to_bool_array(reference_set=reference_set))]foriinxrange(len(fields)):this_ifields=ifields[:i+1]this_fields=fields[:i+1]value_combinations=set(tuple(annot[j]forjinthis_ifields)forannotsinself.annot.values()forannotinannots)value_combinations=sorted(valuesforvaluesinvalue_combinationsifnotany(isinstance(v,(type(None),float,int))forvinvalues))forvaluesinvalue_combinations:labels=tuple('not-%s'%this_fields[ival]ifisinstance(val,bool)andnotvalelsethis_fields[ival]ifisinstance(val,bool)andvalelsevalforival,valinenumerate(values))this_values=dict(zip(this_fields,values))this_array=self.get_subset_bool_array(reference_set=reference_set,**this_values)result.append(((self.name,)+labels,this_array,))self._log('Boolean array has been created from ''`%s` annotation data.'%self.name)return(tuple(r[0]forrinresult),np.vstack([r[1]forrinresult]).T)
[docs]defmake_df(self,rebuild=False):""" Compiles a ``pandas.DataFrame`` from the annotation data. The data frame will be assigned to :py:attr``df``. """self._log('Creating dataframe from `%s` annotations.'%self.name)ifhasattr(self,'df')andnotrebuild:self._log('Data frame already exists, rebuild not requested.')returndiscard={'n/a',None}columns=['uniprot','genesymbol','entity_type','source','label','value','record_id',]has_fields=self.has_fieldsrecords=[]irec=0forelement,annotsiniteritems(self.annot):ifnotelement:continueentity_type=self.get_entity_type(element)genesymbol_str=('COMPLEX:%s'%element.genesymbol_strifhasattr(element,'genesymbol_str')else'COMPLEX:%s'%(complex.get_db().complexes[element].genesymbol_str)ifelement.startswith('COMPLEX:')else(mapping.label(element,entity_type=entity_type,ncbi_tax_id=self.ncbi_tax_id,)or''))ifnothas_fields:records.append([element.__str__(),genesymbol_str,entity_type,self.name,'in %s'%self.name,'yes',irec,])irec+=1forannotinannots:forlabel,valueinzip(annot._fields,annot):ifvalueindiscard:continueifisinstance(value,(set,list,tuple)):value=';'.join(map(str,value))records.append([element.__str__(),genesymbol_str,entity_type,self.name,label,str(value),irec,])irec+=1self.df=pd.DataFrame(records,columns=columns,).astype(self._dtypes)
[docs]defcoverage(self,other):""" Calculates the coverage of the annotation i.e. the proportion of entities having at least one record in this annotation resource for an arbitrary set of entities. """other=otherifisinstance(other,set)elseset(other)returnlen(self&other)/len(self)
[docs]defsubset_intersection(self,universe,**kwargs):""" Calculates the proportion of entities in a subset occuring in the set ``universe``. The subset is selected by passing ``kwargs`` to the ``select`` method. """subset=self.get_subset(**kwargs)returnlen(subset&universe)/len(subset)
[docs]defget_values(self,name,exclude_none=True):""" Returns the set of all possible values of a field. E.g. if the records of this annotation have a field ``cell_type`` then calling this method can tell you that across all records the values of this field might be ``{'macrophage', 'epithelial_cell', ...}``. """values={valforasetinself.annot.values()forainasetforvalin(# to support tuple valuesgetattr(a,name)ifisinstance(getattr(a,name),_const.LIST_LIKE)else(getattr(a,name),))}ifexclude_none:values.discard(None)returnvalues
[docs]defget_names(self):""" Returns the list of field names in the records. The annotation consists of uniform records and each entity might be annotated with one or more records. Each record is a tuple of fields, for example ``('cell_type', 'expression_level', 'score')``. """names=()forvaluesinself.annot.values():ifvalues:forvalinvalues:names=val._fieldsbreakbreakreturnnames
[docs]defnumof_references(self):""" Some annotations contain references. The field name for references is always ``pmid`` (PubMed ID). This method collects and counts the references across all records. """returnlen(set(self.all_refs()))
[docs]defcuration_effort(self):""" Counts the reference-record pairs. """returnlen(self.all_refs())
[docs]defall_refs(self):""" Some annotations contain references. The field name for references is always ``pmid`` (PubMed ID). This method collects the references across all records. Returns *list*. """if'pmid'inself.get_names():return[a.pmidforaainself.annot.values()forainaaifa.pmid]return[]
[docs]defbrowse(self,field:str|list[str]|dict[str,str]|None=None,start:int=0,**kwargs):""" Print gene information as a table. Presents information about annotation categories as ascii tables printed in the terminal. If one category provided, prints one table. If multiple categories provided, prints a table for each of them one by one proceeding to the next one once you hit return. If no categories provided goes through all levels of the primary category. Args field: The field to browse categories by. * If None the primary field will be selected. If this annotation resource doesn't have fields, all proteins will be presented as one single category. * If a string it will be considered a field name and it will browse through all levels of this field. * If a ``list``, set or tuple, it will be considered either a ``list`` of field names or a list of values from the primary field. In the former case all combinations of the values of the fields will be presented, in the latter case the browsing will be limited to the levels of the primary field contained in ``field``. * If a ``dict``, keys are supposed to be field names and values as list of levels. If any of the values are None, all levels from that field will be used. start: Start browsing from this category. E.g. if there are 500 categories and start is 250 it will skip everything before the 250th. kwargs: Passed to ``pypath.utils.uniprot.info``. """ifnotfieldandnotself.primary_field:uniprots=entity.Entity.only_proteins(self.to_set())utils_uniprot.info(uniprots,**kwargs)returnfield=fieldorself.primary_fieldifisinstance(field,str):# all values of the fieldfield={field:self.get_values(field)}elifisinstance(field,_const.LIST_LIKE):ifset(field)&set(self.get_names()):# a set of fields providedfield=dict((fi,self.get_values(fi))forfiinfield)else:# a set of values providedfield={self.primary_field:field}elifisinstance(field,dict):field=dict((fi,valsorself.get_values(fi))forfi,valsiniteritems(field))else:sys.stdout.write('Could not recognize field definition, ''please refer to the docs.\n')sys.stdout.flush()return# otherwise we assume `field` is a dict of fields and valuesfield_keys=list(field.keys())field_values=[field[k]forkinfield_keys]values=sorted(itertools.product(*field_values))total=len(values)groups={}forvalsinvalues:args=dict(zip(field_keys,vals))proteins=entity.Entity.only_proteins(self.select(**args))ifnotproteins:continuelabel=(vals[0]iflen(vals)==1else', '.join('%s: %s'%(key,str(val))forkey,valiniteritems(args)))groups[label]=proteinsutils_uniprot.browse(groups=groups,start=start,**kwargs)
def_process_method(self):record=collections.namedtuple('%sAnnotation'%self.name,['pmid','tissue','vesicle'],)_annot=collections.defaultdict(set)missing_name=Falseforainself.data:ifnota[1]:missing_name=Truecontinueuniprots=mapping.map_name(a[1],'genesymbol','uniprot')foruinuniprots:forvesiclein(a[3][3]ifself.name=='Vesiclepedia'else('Exosomes',)):_annot[u].add(record(a[3][0],a[3][2],vesicle))self.annot=dict(_annot)ifmissing_name:self._log('One or more names were missing while processing ''annotations from %s. Best if you check your cache ''file and re-download the data if it\' corrupted.'%(self.name))
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" The name of this resource abbreviated as `CSPA`. """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='CSPA',ncbi_tax_id=ncbi_tax_id,input_method='cspa.cspa_annotations',**kwargs)
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" The name of this resource abbreviated as `CSPA`. """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='CSPA_celltype',ncbi_tax_id=ncbi_tax_id,input_method='cspa.cspa_cell_type_annotations',**kwargs)
[docs]def__init__(self,**kwargs):""" The name of this resource abbreviated as `HPMR`. """AnnotationBase.__init__(self,name='HPMR',input_method='hpmr.hpmr_annotations',**kwargs)
[docs]def__init__(self,**kwargs):""" Kinases from `kinase.com`. """AnnotationBase.__init__(self,name='kinase.com',input_method='kinasedotcom.kinasedotcom_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,**kwargs):""" Transcription factors from TF census (Vaquerizas et al 2009). """AnnotationBase.__init__(self,name='TFcensus',input_method='tfcensus.tfcensus_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,**kwargs):""" The list of phosphatases from Chen et al, Science Signaling (2017) Table S1. """AnnotationBase.__init__(self,name='Phosphatome',input_method='phosphatome.phosphatome_annotations',**kwargs)
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Protein annotations from MatrixDB. """AnnotationBase.__init__(self,name='MatrixDB',ncbi_tax_id=ncbi_tax_id,input_method='matrixdb.matrixdb_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Pathway annotations from KEGG via PathwayCommons. """AnnotationBase.__init__(self,name='KEGG-PC',ncbi_tax_id=ncbi_tax_id,input_method='kegg.kegg_pathway_annotations_pathwaycommons',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,categories=None,go_annot=None,ncbi_tax_id=9606,**kwargs):""" Same as :class:``pypath.go.GOCustomAnnotation`` initialized with the categories defined in ``pypath.intercell_annot.intercell_categories``. """categories=categoriesorintercell_annot.go_combined_classesgo.GOCustomAnnotation.__init__(self,categories=categories,go_annot=go_annot,ncbi_tax_id=ncbi_tax_id,)
[docs]def__init__(self,categories=None,go_annot=None,ncbi_tax_id=9606,**kwargs):""" Annotation of proteins based on their roles in intercellular communication from Gene Ontology. """self.categories=categoriesself.go_annot=go_annotAnnotationBase.__init__(self,name='GO_Intercell',ncbi_tax_id=ncbi_tax_id,**kwargs)
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Protein families from UniProt. """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='UniProt_family',ncbi_tax_id=ncbi_tax_id,input_method='uniprot.uniprot_families',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Topological domains and transmembrane segments from UniProt. """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='UniProt_topology',ncbi_tax_id=ncbi_tax_id,input_method='uniprot.uniprot_topology',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Protein families from UniProt. """AnnotationBase.__init__(self,name='HumanCellMap',ncbi_tax_id=ncbi_tax_id,input_method='humancellmap.humancellmap_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Topological domains and transmembrane segments from UniProt. """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='TCDB',ncbi_tax_id=ncbi_tax_id,input_method='tcdb.tcdb_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,**kwargs):""" List of cell adhesion molecules (CAMs) from 10.4137/cin.s341. """AnnotationBase.__init__(self,name='MCAM',input_method='mcam.mcam_cell_adhesion_molecules',**kwargs)
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Pathway responsive genes: signatures based on transcriptomics data from PROGENy (https://github.com/saezlab/progeny). """if'organism'notinkwargs:kwargs['organism']=ncbi_tax_idAnnotationBase.__init__(self,name='PROGENy',ncbi_tax_id=ncbi_tax_id,input_method='progeny.progeny_annotations',infer_complexes=False,**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Cell type markers from the CellTypist database. """AnnotationBase.__init__(self,name='CellTypist',ncbi_tax_id=ncbi_tax_id,input_method='celltypist.celltypist_annotations',infer_complexes=False,**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Cytokine perturbation signatures from the CytoSig database. """AnnotationBase.__init__(self,name='CytoSig',ncbi_tax_id=ncbi_tax_id,input_method='cytosig.cytosig_annotations',infer_complexes=False,**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Cell type markers from PanglaoDB """AnnotationBase.__init__(self,name='PanglaoDB',ncbi_tax_id=ncbi_tax_id,input_method='panglaodb.panglaodb_annotations',infer_complexes=False,check_ids=False,**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Cell type markers from PanglaoDB """AnnotationBase.__init__(self,name='Lambert2018',ncbi_tax_id=ncbi_tax_id,input_method='lambert2018.lambert2018_annotations',infer_complexes=True,check_ids=False,**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Cytokine perturbation signatures from the CytoSig database. """AnnotationBase.__init__(self,name='Wang',ncbi_tax_id=ncbi_tax_id,input_method='wang.wang_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,**kwargs):""" Approved cancer drugs from the Cancer Drugs Database (https://www.anticancerfund.org/en/cancerdrugs-db). """kwargs.pop('ncbi_tax_id',None)AnnotationBase.__init__(self,name='CancerDrugsDB',ncbi_tax_id=_const.NOT_ORGANISM_SPECIFIC,input_method='cancerdrugsdb.cancerdrugsdb_annotations',entity_type='small_molecule',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')
[docs]def__init__(self,ncbi_tax_id=9606,**kwargs):""" Protein signatures from the InterPro database. """AnnotationBase.__init__(self,name='InterPro',ncbi_tax_id=ncbi_tax_id,input_method='interpro.interpro_annotations',**kwargs)
[docs]def__init__(self,proteins=(),complexes=(),protein_sources=None,complex_sources=None,use_fields=None,ncbi_tax_id=9606,swissprot_only=True,use_complexes=True,keep_annotators=True,create_dataframe=False,load=True,pickle_file=None,):""" Manages a custom set of annotation resources. Loads data and accepts queries, provides methods for converting the data to data frame. :arg set proteins: A reference set of proteins (UniProt IDs). :arg set complexes: A reference set of complexes. :arg set protein_sources: Class names providing the protein annotations. If not provided the module's ``protein_sources_default`` attribute will be used. :arg set complex_sources: Class names providing the complex annotations. If not provided the module's ``complex_sources_default`` attribute will be used. :arg dict use_fields: A dict with resource names as keys and tuple of field labels as values. If provided for any resource only these fields will be used for constructing the data frame. If `None`, the module's ``default_fields`` settings will be used. :arg bool use_complexes: Whether to include complexes in the annotations. :arg bool create_dataframe: Whether to create a boolean data frame of annotations, apart from having the annotator objects. :arg bool load: Load the data upon initialization. If `False`, you will have a chance to call the ``load`` method later. """session_mod.Logger.__init__(self,name='annot')self._module=sys.modules[self.__module__]self.pickle_file=pickle_fileself.complexes=complexesself.protein_sources=(protein_sourcesifprotein_sourcesisnotNoneelseprotein_sources_default)self.complex_sources=(complex_sourcesifcomplex_sourcesisnotNoneelsecomplex_sources_default)self.use_fields=use_fieldsordefault_fieldsself.ncbi_tax_id=ncbi_tax_idself.keep_annotators=keep_annotatorsself.create_dataframe=create_dataframeself.proteins=proteinsself.swissprot_only=swissprot_onlyself.use_complexes=use_complexesself.set_reference_set()self.annots={}ifload:self.load()
[docs]defreload(self):""" Reloads the object from the module level. """modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)
defload(self):ifself.pickle_file:self.load_from_pickle(pickle_file=self.pickle_file)returnself.set_reference_set()self.load_protein_resources()self.load_complex_resources()ifself.create_dataframe:self.make_dataframe()defload_from_pickle(self,pickle_file):self._log('Loading from pickle `%s`.'%pickle_file)withopen(pickle_file,'rb')asfp:self.proteins,self.complexes,self.reference_set,annots=(pickle.load(fp))self.annots={}forname,(cls_name,data,record_cls)initeritems(annots):self._log('Loading from pickle: annotation class `%s`.'%cls_name)ifrecord_clsisnotNone:modname=record_cls['module']ifmodnamenotinsys.modules:mod=__import__(modname,fromlist=[modname.split('.')[0]],)setattr(sys.modules[modname],record_cls['name'],collections.namedtuple(record_cls['name'],record_cls['fields'],),)record_cls_new=getattr(sys.modules[modname],record_cls['name'],)data=dict((key,set(record_cls_new(*this_annot)forthis_annotinthese_annots))forkey,these_annotsiniteritems(data))self._log('Reconstituted annotation data for `%s`: ''dict of length %u.'%(name,len(data),))cls=globals()[cls_name]try:self.annots[name]=cls(dump=data)self._log('Instance of annotation class `%s` (resource %s) ''successfully loaded from pickle.'%(cls_name,name,))# we never want to fail due to any issue with# one resource:exceptExceptionase:self._log('ERROR: Failed to create instance of `%s` ''with data loaded from the pickle.'%cls_name)self._log_traceback()self._log('Loaded from pickle `%s`.'%pickle_file)defsave_to_pickle(self,pickle_file):defget_record_class(annot):forvalinannot.values():foreleminval:returnelem.__class__self._log('Saving to pickle `%s`.'%pickle_file)forannotinself.annots.values():annot._update_complex_attribute_classes()withopen(pickle_file,'wb')asfp:classes=dict((name,get_record_class(annot.annot))forname,annotiniteritems(self.annots))annots=dict((name,(annot.__class__.__name__,dict((key,set(tuple(this_annot)forthis_annotinthese_annots))forkey,these_annotsiniteritems(annot.annot)),{'name':classes[name].__name__,'module':classes[name].__module__,'fields':classes[name]._fields,}ifclasses[name]elseNone))forname,annotiniteritems(self.annots))pickle.dump(obj=(self.proteins,self.complexes,self.reference_set,annots,),file=fp,protocol=pickle.HIGHEST_PROTOCOL,)self._log('Saved to pickle `%s`.'%pickle_file)defset_reference_set(self):self.proteins,self.complexes,self.reference_set=(AnnotationBase.get_reference_set(proteins=self.proteins,complexes=self.complexes,use_complexes=self.use_complexes,ncbi_tax_id=self.ncbi_tax_id,swissprot_only=self.swissprot_only,))self.rows=dict(reversed(i)foriinenumerate(self.reference_set))defload_protein_resources(self):self._load_resources(self.protein_sources,self.proteins)defload_complex_resources(self):self._load_resources(self.complex_sources,self.complexes)def_load_resources(self,definitions,reference_set):forclsindefinitions:cls=clsifcallable(cls)elsegetattr(self._module,cls)total_attempts=settings.get('annot_load_resource_attempts')forattemptinrange(total_attempts):try:self._log(f'Loading annotation resource `{cls.__name__}`; 'f'attempt {attempt+1} of {total_attempts}.')annot=cls(ncbi_tax_id=self.ncbi_tax_id,reference_set=reference_set,)self.annots[annot.name]=annotself._log(f'Successfully loaded resource `{cls.__name__}` 'f'({annot.name}).')breakexceptExceptionase:exc=sys.exc_info()self._log('Failed to load annotations from resource `%s`:'%(cls.__name__ifhasattr(cls,'__name__')elsestr(cls)))self._log_traceback()defmake_dataframe(self,reference_set=None):ifself.create_dataframe:self.df=self.to_dataframe(reference_set=reference_set)defensure_array(self,reference_set=None,rebuild=False):ifnothasattr(self,'data')orrebuild:self.make_array(reference_set=reference_set)defto_array(self,reference_set=None):reference_set=reference_setorself.reference_setnames=[]arrays=[]forresourceinself.annots.values():# skipping HPA for now because too large number of# annotations, it would take very long:ifresource.name=='HPA':continueuse_fields=(self.use_fields[resource.name]ifresource.nameinself.use_fieldselseNone)this_names,this_array=resource.to_array(reference_set=reference_set,use_fields=(self.use_fields[resource.name]ifresource.nameinself.use_fieldselseNone),)names.extend(this_names)arrays.append(this_array)names=np.array(list(itertools.chain(names)))data=np.hstack(arrays)returnnames,datadefmake_array(self,reference_set=None):self.names,self.data=self.to_array(reference_set=reference_set)self.set_cols()defset_cols(self):self.cols=dict((name,i)fori,nameinenumerate(self.names))defkeep(self,keep):ikeep=np.array([ifori,nameinenumerate(self.names)ifnameinkeep])self.names=self.names[ikeep]self.data=self.data[:,ikeep]self.set_cols()defmake_sets(self):self.ensure_array()self.sets=dict((name,set(self.reference_set[self.data[:,i]]))fori,nameinenumerate(self.names))defannotate_network(self,pa):nodes=pa.graph.vs['name']edges=[(nodes[e.source],nodes[e.target])foreinpa.graph.es]nodeannot=[]edgeannot=[]fori,uniprotinenumerate(nodes):forname,uniprotsiniteritems(self.sets):ifuniprotinuniprots:nodeannot.append((name,i))fori,(uniprot1,uniprot2)inenumerate(edges):forname1,uniprots1initeritems(self.sets):forname2,uniprots2initeritems(self.sets):ifuniprot1inuniprots1anduniprot2inuniprots2:edgeannot.append((name1,name2,i))returnnodeannot,edgeannotdefnetwork_stats(self,pa):nodeannot,edgeannot=self.annotate_network(pa)nodestats=collections.Counter('__'.join(n[0])forninnodeannot)edgestats=collections.Counter(tuple(sorted(('__'.join(e[0]),'__'.join(e[1]))))foreinedgeannot)returnnodestats,edgestatsdefexport_network_stats(self,pa):nodestats,edgestats=self.network_stats(pa)withopen('annot_edgestats2.tsv','w')asfp:_=fp.write('\t'.join(('name1','name2','count')))_=fp.write('\n')_=fp.write('\n'.join('%s\t%s\t%u'%(name1,name2,cnt)for(name1,name2),cntiniteritems(edgestats)))withopen('annot_nodestats2.tsv','w')asfp:_=fp.write('\t'.join(('name','count')))_=fp.write('\n')_=fp.write('\n'.join('%s\t%u'%(name,cnt)forname,cntiniteritems(nodestats)))defto_dataframe(self,reference_set=None):self._log('Creating data frame from AnnotationTable.')self.ensure_array(reference_set=reference_set,rebuild=reference_setisnotNone,)colnames=['__'.join(name)fornameinself.names]df=pd.DataFrame(data=self.data,index=self.reference_set,columns=colnames,)self._log('Created annotation data frame, memory usage: %s.'%(common.df_memory_usage(self.df)))returndfdefmake_narrow_df(self):self._log('Creating narrow data frame from AnnotationTable.')forannotinself.annots.values():annot.make_df()self.narrow_df=pd.concat(annot.dfforannotinself.annots.values()).astype(AnnotationBase._dtypes)self._log('Created annotation data frame, memory usage: %s.'%(common.df_memory_usage(self.narrow_df)))
[docs]defsearch(self,protein):""" Returns a dictionary with all annotations of a protein. Keys are the resource names. """returndict((resource,annot.annot[protein])forresource,annotiniteritems(self.annots)ifproteininannot.annot)
[docs]defall_annotations(self,entity):""" Returns all annotation records for one protein in a single list. """return[aaforainself.annots.values()ifentityina.annotforaaina.annot[entity]]
[docs]defall_annotations_str(self,protein):""" Returns all annotation records for one protein serialized. """return'; '.join(str(a)forainself.all_annotations(protein=protein))
defupdate_summaries(self):self.summaries=dict((name,a.summary)forname,ainiteritems(self.annots))defsummaries_tab(self,outfile=None,return_table=False):columns=(('name','Resource'),('n_total','Entities'),('n_records_total','Records'),('records_per_entity','Records per entity'),('n_proteins','Proteins'),('pct_proteins','Proteins [%]'),('n_protein_records','Protein records'),('n_complexes','Complexes'),('pct_complexes','Complexes [%]'),('n_complex_records','Complex records'),('complex_annotations_inferred','Inferred complex annotations'),('n_mirnas','miRNA'),('pct_mirnas','miRNA [%]'),('n_mirna_records','miRNA records'),('references','References'),('curation_effort','Curation effort'),('fields','Fields'),)tab=[]tab.append([f[1]forfincolumns])tab.extend([[str(self.summaries[src][f[0]])forfincolumns]forsrcinsorted(self.summaries.keys())])ifoutfile:withopen(outfile,'w')asfp:fp.write('\n'.join('\t'.join(row)forrowintab))ifreturn_table:returntabdefget_entities(self,entity_type=None):entity_type=common.to_set(entity_type)entities=set.union(*(set(an.annot.keys())foraninself.annots.values()))returnentity.Entity.filter_entity_type(entities,entity_type=entity_type,)defget_proteins(self):returnself.get_entities(entity_type='protein')defget_complexes(self):returnself.get_entities(entity_type='complex')defget_mirnas(self):returnself.get_entities(entity_type='mirna')defnumof_entities(self,entity_type=None):returnlen(self.get_entities(entity_type=entity_type))defnumof_proteins(self):returnlen(self.get_proteins())defnumof_complexes(self):returnlen(self.get_complexes())defnumof_mirnas(self):returnlen(self.get_mirnas())defnumof_records(self,entity_type=None):returnsum(an.numof_records(entity_types=entity_type)foraninself.annots.values())defnumof_resources(self):returnlen(self.annots)def__repr__(self):return('<Annotation database: %u records about %u ''entities from %u resources>'%(self.numof_records(),self.numof_entities(),self.numof_resources(),))def__getitem__(self,item):ifisinstance(item,_const.SIMPLE_TYPES):ifiteminself.annots:returnself.annots[item]elifiteminself:returnself.search(item)else:returndict((it,self[it])foritinitem)def__contains__(self,item):return(iteminself.annotsorany(iteminaforainself.annots.values()))
[docs]definit_db(keep_annotators=True,create_dataframe=False,use_complexes=True,**kwargs):""" Initializes or reloads the annotation database. The database will be assigned to the ``db`` attribute of this module. """globals()['db']=AnnotationTable(keep_annotators=keep_annotators,create_dataframe=create_dataframe,use_complexes=use_complexes,**kwargs)
[docs]defget_db(keep_annotators=True,create_dataframe=False,use_complexes=True,**kwargs):""" Retrieves the current database instance and initializes it if does not exist yet. """if'db'notinglobals():init_db(keep_annotators=keep_annotators,create_dataframe=create_dataframe,use_complexes=use_complexes,**kwargs)returnglobals()['db']
[docs]def__init__(self,**kwargs):""" HPO Gene Annotations from the HPO database. """kwargs.pop('ncbi_tax_id',None)AnnotationBase.__init__(self,name='HPO',ncbi_tax_id=_const.NOT_ORGANISM_SPECIFIC,input_method='hpo.hpo_annotations',**kwargs)
def_process_method(self):# already the appropriate format, no processing neededself.annot=self.datadelattr(self,'data')