#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#fromfuture.utilsimportiteritemsimportsysimportimportlibasimpimporttracebackimportcollectionstry:importcPickleaspickleexcept:importpickleimportnumpyasnpimportpandasaspdimportpypath.inputsasinputsimportpypath.internals.interaasinteraimportpypath.internals.resourceasresourceimportpypath.share.settingsassettingsimportpypath.share.sessionassession_modimportpypath.share.commonascommoncomplex_resources=('Signor','Corum','CellPhoneDB','Havugimana','Compleat','ComplexPortal','Pdb','GuideToPharmacology','Humap','Humap2','Icellnet','Kegg','Cellchatdb','Cellinker','Spike',)
[docs]classAbstractComplexResource(resource.AbstractResource):""" A resource which provides information about molecular complexes. """
[docs]def__init__(self,name,ncbi_tax_id=9606,input_method=None,input_args=None,dump=None,**kwargs):""" name : str Custom name for the resource. input_method : callable Method providing the input data. process_method : callable Method processing the data and yielding ``intera.Complex`` instances. """session_mod.Logger.__init__(self,name='complex')self.complexes={}resource.AbstractResource.__init__(self,name=name,ncbi_tax_id=ncbi_tax_id,input_method=input_method,input_args=input_args,dump=dump,data_attr_name='complexes',)self.load()
defload(self):resource.AbstractResource.load(self)self.update_index()self._log('Loaded resource `%s`: %u proteins, %u complexes.'%(self.name,len(self.proteins),len(self.complexes),))def_process_method(self):self.complexes=self.datadelattr(self,'data')def__iter__(self):forcplexinself.complexes.values():yieldcplexdefupdate_index(self):self.proteins=collections.defaultdict(set)self.resources=collections.defaultdict(set)self.ids={}forcplexinself:forproteinincplex:self.proteins[protein].add(cplex)fordbincplex.sources:self.resources[protein].add(cplex)fordb,idsiniteritems(cplex.ids):for_idinids:self.ids[(db,_id)]=cplexdef__contains__(self,other):# a Complex instanceifisinstance(other,intera.Complex):other=other.__str__()# either a UniProt ID or# a complex string representationifisinstance(other,str):iflen(other)<=10:returnotherinself.proteinselse:returnotherinself.complexesreturnFalsedef__len__(self):returnlen(self.complexes)def__repr__(self):return'<Complex database: %u complexes>'%len(self)@propertydefnumof_references(self):returnlen(set.union(*(cplex.referencesforcplexinself.complexes.values())))@propertydefcuration_effort(self):returnlen(set.union(*({(key,ref)forrefincplex.references}forkey,cplexiniteritems(self.complexes))))@propertydefhas_stoichiometry(self):returnany(cnt>1forcplexinself.complexes.values()forcntincplex.components.values())@propertydefall_sources(self):returnset.union(*(cplex.sourcesforcplexinself.complexes.values()))@propertydefhomomers(self):returnsum(1forcplexinself.complexes.values()iflen(cplex.components)==1)@propertydefheteromers(self):returnsum(1forcplexinself.complexes.values()iflen(cplex.components)>1)defmake_df(self):colnames=['name','components','components_genesymbols','stoichiometry','sources','references','identifiers',]self._log('Creating a data frame of complexes.')records=[]forcplexinself.complexes.values():records.append([cplex.nameifcplex.nameelseNone,cplex.__str__()[8:],cplex.genesymbol_str,cplex.stoichiometry,';'.join(cplex.sources),';'.join(cplex.references),';'.join('%s:%s'%(db,_id)fordb,idsiniteritems(cplex.ids)for_idinids),])self.df=pd.DataFrame(records,columns=colnames,)self._log('Created data frame of complexes. ''Memory usage: %s.'%common.df_memory_usage(self.df))def_from_dump_callback(self):ifhasattr(self,'_from_dump'):self.complexes=self._from_dumpdelattr(self,'_from_dump')delattr(self,'dump')@propertydefsummary(self):return{'n_complexes':self.__len__(),'n_references':self.numof_references,'curation_effort':self.curation_effort,'has_stoichiometry':self.has_stoichiometry,'name':self.name,'sources':self.all_sources,'homomers':self.homomers,'heteromers':self.heteromers,}@propertydefsummary_str(self):s=self.summarybar='='*70return('\n%s\n''Complex resource `%s`\n''%s\n''\tNumber of complexes: %u\n''\tHomomers: %u\n''\tHeteromers: %u\n''\tNumber of literature references: %u\n''\tCuration effort (reference-entity pairs): %u\n''\tHas stoichiometry: %s\n''\tSources: %s\n''%s\n\n')%(bar,self.name,bar,s['n_complexes'],s['homomers'],s['heteromers'],s['n_references'],s['curation_effort'],str(s['has_stoichiometry']),', '.join(s['sources']),bar)
[docs]def__init__(self,resources=None,pickle_file=None,):""" Combines complexes from multiple resources. :arg list resources: List of resources. Names of complex resource classes in this module or custom """self.pickle_file=pickle_fileself.resources=resourcesorcomplex_resourcesAbstractComplexResource.__init__(self,name='OmniPath',)
[docs]defreload(self):""" Reloads the object from the module level. """modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)
defload(self):ifself.pickle_file:self._log('Loading database from pickle `%s`.'%self.pickle_file)self.load_from_pickle(self.pickle_file)returnself.data={}self.summaries={}forresinself.resources:total_attempts=settings.get('complex_load_resource_attempts')forattemptinrange(total_attempts):try:self._log(f'Loading resource `{str(res)}`; 'f'attempt {attempt+1}/{total_attempts}.')ifnotcallable(res):ifresinglobals():res=globals()[res]ifcallable(res):processor=res()elifhasattr(res,'complexes'):processor=resifhasattr(processor,'summary'):self.summaries[processor.name]=processor.summaryforkey,cplexiniteritems(processor.complexes):ifkeyinself.data:self.data[key]+=cplexelse:self.data[key]=cplexself._log(f'Successfully loaded resource `{str(res)}`.')breakexceptException:exc=sys.exc_info()self._log('Failed to load resource `%s`:'%str(res))self._log_traceback()resource.AbstractResource.load(self)self.update_index()self.update_summaries()defload_from_pickle(self,pickle_file):self._log('Loading from pickle `%s`.'%pickle_file)withopen(pickle_file,'rb')asfp:self.complexes,self.summaries=pickle.load(fp)self._log('Loaded from pickle `%s`.'%pickle_file)defupdate_summaries(self):forsrcinself.summaries.keys():self.summaries[src]['unique_complexes']=sum(1forcplexinself.complexes.values()iflen(cplex.sources)==1andsrcincplex.sources)self.summaries[src]['shared_complexes']=sum(1forcplexinself.complexes.values()iflen(cplex.sources)>1andsrcincplex.sources)defsummaries_tab(self,outfile=None,return_table=False):columns=(('name','Resource'),('n_complexes','All complexes'),('homomers','Homomers'),('heteromers','Heteromers'),('has_stoichiometry','Stoichiometry'),('unique_complexes','Unique complexes'),('shared_complexes','Shared complexes'),('n_references','References'),('curation_effort','Curation effort'),)tab=[]tab.append([f[1]forfincolumns])tab.extend([[str(self.summaries[src][f[0]])forfincolumns]forsrcinsorted(self.summaries.keys())])ifoutfile:withopen(outfile,'w')asfp:fp.write('\n'.join('\t'.join(row)forrowintab))ifreturn_table:returntabdef_update_complex_attribute_classes(self):self._update_complex_attribute_classes_static(self.complexes)@staticmethoddef_update_complex_attribute_classes_static(cplexes,mod=None):mod=modorsys.modules[__name__]forkeyincplexes:ifhasattr(key,'attrs'):forattr,valiniteritems(key.attrs):cls=val.__class__.__name__ifhasattr(mod,cls):val.__class__=getattr(mod,cls)defsave_to_pickle(self,pickle_file):self._log('Saving to pickle `%s`.'%pickle_file)self._update_complex_attribute_classes()withopen(pickle_file,'wb')asfp:pickle.dump(obj=(self.complexes,self.summaries),file=fp,)self._log('Saved to pickle `%s`.'%pickle_file)
[docs]definit_db(**kwargs):""" Initializes or reloads the complex database. The database will be assigned to the ``db`` attribute of this module. """globals()['db']=ComplexAggregator(**kwargs)
[docs]defget_db(**kwargs):""" Retrieves the current database instance and initializes it if does not exist yet. """if'db'notinglobals():init_db(**kwargs)returnglobals()['db']
[docs]defall_complexes():""" Returns a set of all complexes in the database which serves as a reference set for many methods, just like ``inputs.uniprot_db.all_uniprots`` represents the proteome. """db=get_db()returnset(db.complexes.values())