#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#from__future__importannotationsfromfuture.utilsimportiteritemsfromtypingimportMappingimportimportlibasimpimportreimportosimportsysimportcollectionsimportitertoolsimportfunctoolsimportcopyascopy_modimportpickleimportrandomimporttracebackfromtyping_extensionsimportLiteralimportnumpyasnpimportpandasaspdimportpypath.share.sessionassession_modimportpypath.share.progressasprogressimportpypath.core.interactionasinteraction_modimportpypath.core.evidenceasevidenceimportpypath.core.entityasentity_modimportpypath.core.commonascore_commonimportpypath.share.commonascommonimportpypath_common._constantsas_constimportpypath.share.settingsassettingsimportpypath.share.cacheascache_modimportpypath.utils.mappingasmappingimportpypath.inputs.pubmedaspubmed_inputimportpypath.share.curlascurlimportpypath.internals.refsasrefs_modimportpypath.utils.reflistsasreflistsimportpypath.resources.networkasnetwork_resourcesimportpypath.internals.input_formatsasinput_formatsimportpypath.internals.resourceasresource_formatsimportpypath.inputsasinputs# Py 2/3try:input=raw_inputexceptNameError:passNetworkEntityCollection=collections.namedtuple('NetworkEntityCollection',['total','by_resource','by_category','shared','unique','shared_res_cat','unique_res_cat','shared_cat','unique_cat','resource_cat','cat_resource','method','label',],)NetworkEntityCollection.__new__.__defaults__=(None,)*8
[docs]def__init__(self,collection,label=None):self.collection=collection.copy()# we need a copy where we don't add the totals# so these don't bother the shared and unique methodsself._collection=collection.copy()self.label=labelself.main()
[docs]classNetwork(session_mod.Logger):""" Represents a molecular interaction network. Provides various methods to query the network and its components. Optionally converts the network to a ``pandas.DataFrame`` of interactions. :arg list,dict resources: One or more lists or dictionaries containing ``pypath.internals.resource.NetworkResource`` objects. :arg bool make_df: Create a ``pandas.DataFrame`` already when creating the instance. If no network data loaded no data frame will be created. :arg int ncbi_tax_id: Restrict the network only to this organism. If ``None`` identifiers from any organism will be allowed. :arg bool allow_loops: Allow interactions with the their two endpoints being the same entity. """_partners_methods=({'':{},'transcriptionally_':{'interaction_type':{'transcriptional','mirna_transcriptional',},},'post_transcriptionally_':{'interaction_type':{'post_transcriptional','lncrna_post_transcriptional',},},'post_translationally_':{'interaction_type':'post_translational',},},{'regulat':{'direction':True,},'activat':{'effect':'positive',},'suppress':{'effect':'negative',},},{'es':{'mode':'IN',},'ed_by':{'mode':'OUT',}},)
[docs]defreload(self,recursive:bool=False):""" Reloads the object from the module level. """modname=self.__class__.__module__mod=__import__(modname,fromlist=[modname.split('.')[0]])imp.reload(mod)new=getattr(mod,self.__class__.__name__)setattr(self,'__class__',new)ifrecursive:imp.reload(entity_mod)imp.reload(interaction_mod)forentityinself.nodes.values():entity.__class__=entity_mod.Entityforinteractioninself.interactions.values():interaction.__class__=interaction_mod.Interactioninteraction.a.__class__=entity_mod.Entityinteraction.b.__class__=entity_mod.Entity
[docs]defreset(self):""" Removes network data i.e. creates empty interaction and node dictionaries. """self.raw_data={}self.interactions={}self.nodes={}self.nodes_by_label={}self.interactions_by_nodes=collections.defaultdict(set)
[docs]defload(self,resources=None,make_df=False,exclude=None,reread=False,redownload=False,keep_raw=False,top_call=True,cache_files=None,only_directions=False,pickle_file=None,allow_loops=None,first_n=None,):""" Loads data from a network resource or a collection of resources. :arg str,dict,list,resource.NetworkResource resources: An object defining one or more network resources. If *str* it will be looked up among the collections in the ``pypath.resources.network`` module (e.g. ``'pathway'`` will load all resources in the `pathway` collection). If *dict* or *list* it will be processed recursively i.e. the ``load`` method will be called for each element. If it is a ``pypath.internals.resource.NetworkResource`` object it will be processed and added to the network. :arg bool make_df: Whether to create a ``pandas.DataFrame`` after loading all resources. :arg NoneType,set exclude: A *set* of resource names to be ignored. It is useful if you want to load a collection with the exception of a few resources. """ifpickle_file:self.load_from_pickle(pickle_file=pickle_file)returnkwargs={'reread':reread,'redownload':redownload,'keep_raw':keep_raw,'top_call':False,'only_directions':only_directions,'allow_loops':allow_loops,'first_n':first_n,}exclude=common.to_set(exclude)resources=((resources,)ifnotisinstance(resources,(list,Mapping,tuple,set))elseresources.values()ifisinstance(resources,Mapping)elseresources)forresourceinresources:if(isinstance(resource,str)andhasattr(network_resources,resource)):self.load(resources=getattr(network_resources,resource),**kwargs)elifisinstance(resource,(list,dict,tuple,set)):self.load(resources=resource,**kwargs)elif(isinstance(resource,(input_formats.NetworkInput,resource_formats.NetworkResource,))andresource.namenotinexclude):self.load_resource(resource,**kwargs)elifresourceisnotNone:self._log('Could not recognize network input ''definition: `%s`.'%str(resource))ifmake_dfandtop_call:self.make_df()
# synonyms (old method names of PyPath)load_resources=loadinit_network=load
[docs]defload_resource(self,resource,clean=True,reread=None,redownload=None,keep_raw=False,only_directions=False,allow_loops=None,first_n=None,**kwargs):""" Loads the data from a single resource and attaches it to the network :arg pypath.input_formats.NetworkInput resource: :py:class:`pypath.input_formats.NetworkInput` instance containing the detailed definition of the input format to the downloaded file. :arg bool clean: Legacy parameter, has no effect at the moment. Optional, ``True`` by default. Whether to clean the graph after importing the data or not. See :py:meth:`pypath.main.PyPath.clean_graph` for more information. :arg dict cache_files: Legacy parameter, has no effect at the moment. Optional, ``{}`` by default. Contains the resource name(s) [str] (keys) and the corresponding cached file name [str]. If provided (and file exists) bypasses the download of the data for that resource and uses the cache file instead. :arg bool reread: Optional, ``False`` by default. Specifies whether to reread the data files from the cache or omit them (similar to *redownload*). :arg bool redownload: Optional, ``False`` by default. Specifies whether to re-download the data and ignore the cache. :arg bool only_directions: If ``True``, no new interactions will be created but direction and effect sign evidences will be added to existing interactions. :arg int first_n: Load only the first n interactions. """total_attempts=settings.get('network_load_resource_attempts')forattemptinrange(total_attempts):try:self._log(f'Loading network data from resource `{resource.name}`'f' (dataset: {resource.dataset}); 'f'attempt {attempt+1} of {total_attempts}.')self._read_resource(resource,reread=reread,redownload=redownload,keep_raw=keep_raw,first_n=first_n,)self._log('Successfully read interactions 'f'from resource `{resource.name}`.')breakexceptExceptionase:exc=sys.exc_info()self._log('Failed to read interactions 'f'from resource `{resource.name}`:')self._log_traceback(console=True)ifattempt==total_attempts-1:self._log(f'Not loading `{resource.name}`: giving up after 'f'{total_attempts} attempts.')returnallow_loops=self._allow_loops(allow_loops=allow_loops,resource=resource,)self._log('Loops allowed for resource `%s`: %s'%(resource.name,allow_loops,))self._add_edge_list(only_directions=only_directions,allow_loops=allow_loops,)self.organisms_check()self.remove_zero_degree()self._log('Completed: loading network data from ''resource `%s`.'%resource.name)
def_read_resource(self,resource,reread=False,redownload=False,keep_raw=False,cache_files=None,first_n=None,):""" Reads interaction data file containing node and edge attributes that can be read from simple text based files and adds it to the networkdata. This function works not only with files, but with lists as well. Any other function can be written to download and preprocess data, and then give it to this function to finally attach to the network. :arg pypath.input_formats.NetworkInput resource: :py:class:`pypath.input_formats.NetworkInput` instance containing the detailed definition of the input format of the file. Instead of the file name (on the :py:attr:`pypath.input_formats.NetworkInput.input` attribute) you can give a custom function name, which will be executed, and the returned data will be used instead. :arg bool keep_raw: Optional, ``False`` by default. Whether to keep the raw data read by this function, in order for debugging purposes, or further use. :arg dict cache_files: Optional, ``{}`` by default. Contains the resource name(s) [str] (keys) and the corresponding cached file name [str]. If provided (and file exists) bypasses the download of the data for that resource and uses the cache file instead. :arg bool reread: Optional, ``False`` by default. Specifies whether to reread the data files from the cache or omit them (similar to *redownload*). :arg bool redownload: Optional, ``False`` by default. Specifies whether to re-download the data and ignore the cache. :arg int first_n: Load only the first n interactions. """self._log('Reading network data from `%s`.'%resource.name)SMOL_TYPES=settings.get('small_molecule_entity_types')# workaround in order to make it work with both NetworkInput# and NetworkResource type param_resource=(resourceifisinstance(resource,resource_formats.NetworkResource)elseresource_formats.NetworkResource(name=resource.name,interaction_type=resource.interaction_type,networkinput=resource,data_model=resource.data_modelor'unknown',resource_attrs=resource.resource_attrs,))networkinput=_resource.networkinput_resources_secondary=()expand_complexes=(networkinput.expand_complexesifisinstance(networkinput.expand_complexes,bool)elsesettings.get('network_expand_complexes'))reread=(rereadifisinstance(reread,bool)elsenotsettings.get('network_pickle_cache'))self._log('Expanding complexes for `%s`: %s'%(networkinput.name,str(expand_complexes),))edge_list=[]edge_list_mapped=[]self.edge_list_mapped=[]infile=None_name=networkinput.name.lower()edges_cache=os.path.join(self.cache_dir,'%s_%s_%s.edges.pickle'%(_name,_resource.data_model,_resource.interaction_type,))interaction_cache=os.path.join(self.cache_dir,'%s_%s_%s.interactions.pickle'%(_name,_resource.data_model,_resource.interaction_type,))ifnotrereadandnotredownload:infile,edge_list_mapped=self._lookup_cache(_name,cache_files,interaction_cache,edges_cache,)ifnotlen(edge_list_mapped):ifinfileisNone:ifnotisinstance(resource,(input_formats.NetworkInput,resource_formats.NetworkResource,)):self._log('_read_network_data: No proper input file ''definition. `param` should be either ''a `pypath.internals.input_formats.NetworkInput` or a ''`pypath.internals.resource.NetworkResource` instance.',-5,)returnNoneifnetworkinput.huge:sys.stdout.write('\n\tProcessing %s requires huge memory.\n''\tPlease hit `y` if you have at ''least 2G free memory,\n''\tor `n` to omit %s.\n''\tAfter processing once, it will be saved in \n''\t%s, so next time can be loaded quickly.\n\n''\tProcess %s now? [y/n]\n'%(networkinput.name,networkinput.name,edges_cache,networkinput.name))sys.stdout.flush()whileTrue:answer=input().lower()ifanswer=='n':returnNoneelifanswer=='y':breakelse:sys.stdout.write('\n\tPlease answer `y` or `n`:\n\t')sys.stdout.flush()# if no method available it gonna be Noneinput_func=inputs.get_method(networkinput.input)# reading from remote or local file, or executing import# function:if(isinstance(networkinput.input,str)and(networkinput.input.startswith('http')ornetworkinput.input.startswith('ftp'))):curl_use_cache=notredownloadc=curl.Curl(networkinput.input,silent=False,large=True,cache=curl_use_cache)infile=c.fileobj.read()iftype(infile)isbytes:try:infile=infile.decode('utf-8')exceptUnicodeDecodeErrorase:try:infile=infile.decode('iso-8859-1')exceptUnicodeDecodeError:raiseeinfile=[xforxininfile.replace('\r','').split('\n')iflen(x)>0]self._log("Retrieving data from `%s` ..."%networkinput.input)elifinput_funcisnotNone:self._log('Retrieving data by method `%s` of the ''pypath.inputs module...'%input_func.__name__)_store_cache=curl.CACHEifisinstance(redownload,bool):curl.CACHE=notredownloadtry:infile=input_func(**networkinput.input_args)exceptExceptionase:self._log(f'Error in method `{input_func.__name__}` of the ''pypath.inputs module. ')raiseefinally:curl.CACHE=_store_cacheelifos.path.isfile(networkinput.input):infile=curl.Curl(networkinput.input,large=True,silent=False,).resultself._log('%s opened...'%networkinput.input)ifinfileisNone:self._log('`%s`: Could not find file or input function ''or failed preprocessing.'%networkinput.input,-5,)returnNoneis_directed=networkinput.is_directedsign=networkinput.signref_col=(networkinput.refs[0]ifisinstance(networkinput.refs,tuple)elsenetworkinput.refsifisinstance(networkinput.refs,int)elseNone)ref_sep=(networkinput.refs[1]ifisinstance(networkinput.refs,tuple)else';')# column index of the signsig_col=Noneifnotisinstance(sign,tuple)elsesign[0]# column index and value(s) for the directiondir_col=Nonedir_val=Nonedir_sep=Noneifisinstance(is_directed,tuple):dir_col=is_directed[0]dir_val=is_directed[1]dir_sep=is_directed[2]iflen(is_directed)>2elseNoneelifisinstance(sign,tuple):dir_col=sign[0]dir_val=sign[1:3]dir_val=(dir_valiftype(dir_val[0])in_const.SIMPLE_TYPESelsecommon.flat_list(dir_val))dir_sep=sign[3]iflen(sign)>3elseNonedir_val=common.to_set(dir_val)must_have_references=(settings.get('keep_noref')ornetworkinput.must_have_references)self._log('Resource `%s` %s have literature references ''for all interactions. Interactions without references ''will be %s. You can alter this condition globally by ''`pypath.settings.keep_noref` or for individual resources ''by the `must_have_references` attribute of their ''`NetworkInput` object.'%(networkinput.name,'must'ifmust_have_referenceselse'does not need to','dropped'ifmust_have_referenceselse'included',),1,)self._log('`%s` must have references: %s'%(networkinput.name,str(must_have_references)))# iterating lines from input fileinput_filtered=0ref_filtered=0taxon_filtered=0read_error=Falselnum=0# we need to define it here to avoid errors if the# loop below runs zero cyclesprg=progress.Progress(iterable=infile,name='Reading network data - %s'%networkinput.name,)try:forlnum,lineinenumerate(prg):iflen(line)<=1or(lnum==1andnetworkinput.header):# empty lines# or header rowcontinueifnotisinstance(line,(list,tuple)):ifhasattr(line,'decode'):line=line.decode('utf-8')line=line.strip('\n\r').split(networkinput.separator)else:line=[x.replace('\n','').replace('\r','')ifhasattr(x,'replace')elsexforxinline]# 1) filtersifself._filters(line,networkinput.positive_filters,networkinput.negative_filters):input_filtered+=1continue# 2) direction# reading names and attributes:ifis_directedandnotisinstance(is_directed,tuple):this_edge_dir=Trueelse:this_edge_dir=self._process_direction(line,dir_col,dir_val,dir_sep,)# 3) referencesrefs=[]ifref_colisnotNone:ifline[ref_col]isNone:refs=()elifisinstance(line[ref_col],(list,set,tuple)):refs=line[ref_col]elifisinstance(line[ref_col],int):refs=(line[ref_col],)else:refs=line[ref_col].split(ref_sep)refs=common.del_empty(list(set(refs)))refs=pubmed_input.only_pmids([str(r).strip()forrinrefs])iflen(refs)==0andmust_have_references:ref_filtered+=1continue# 4) entity typesentity_type_a=self._process_field(networkinput.entity_type_a,line,)entity_type_b=self._process_field(networkinput.entity_type_b,line,)# 5) ID typesid_type_a=self._process_field(networkinput.id_type_a,line)id_type_b=self._process_field(networkinput.id_type_b,line)# 6) organisms# to give an easy way for input definition:ifisinstance(networkinput.ncbi_tax_id,int):taxon_a=(_const.NOT_ORGANISM_SPECIFICifentity_type_ainSMOL_TYPESelsenetworkinput.ncbi_tax_id)taxon_b=(_const.NOT_ORGANISM_SPECIFICifentity_type_binSMOL_TYPESelsenetworkinput.ncbi_tax_id)# to enable more sophisticated inputs:elifisinstance(networkinput.ncbi_tax_id,dict):taxx=self._process_taxon(networkinput.ncbi_tax_id,line,)ifisinstance(taxx,tuple):taxon_a,taxon_b=taxxelse:taxon_a=taxon_b=taxxtaxd_a=(networkinput.ncbi_tax_id['A']if'A'innetworkinput.ncbi_tax_idelse_const.NOT_ORGANISM_SPECIFICifentity_type_ainSMOL_TYPESelsenetworkinput.ncbi_tax_id)taxd_b=(networkinput.ncbi_tax_id['B']if'B'innetworkinput.ncbi_tax_idelse_const.NOT_ORGANISM_SPECIFICifentity_type_binSMOL_TYPESelsenetworkinput.ncbi_tax_id)only_default=networkinput.only_default_organismifnot(self._match_taxon(taxd_a,taxon_a,only_default)andself._match_taxon(taxd_b,taxon_b,only_default)):taxon_filtered+=1continue# assuming by default the default organismelse:taxon_a=taxon_b=self.ncbi_tax_idiftaxon_aisNoneortaxon_bisNone:taxon_filtered+=1continue# 7) effect (sign)positive=Falsenegative=Falseifisinstance(sign,tuple):positive,negative=(self._process_sign(line[sign[0]],sign))# 8) resources (source databases)resource=(line[networkinput.resource]ifisinstance(networkinput.resource,int)elseline[networkinput.resource[0]].split(networkinput.resource[1])if(isinstance(networkinput.resource,tuple)andhasattr(line[networkinput.resource[0]],'split'))else[]ifisinstance(networkinput.resource,tuple)elsenetworkinput.resource)resource=common.to_set(resource)_resources_secondary=tuple(resource_formats.NetworkResource(name=sec_res,interaction_type=_resource.interaction_type,data_model=_resource.data_model,via=_resource.name,dataset=_resource.dataset,)forsec_resinresourceifsec_res!=_resource.name)resource.add(networkinput.name)# 9) interacting partnersid_a=self._process_partner(networkinput.id_col_a,line)id_b=self._process_partner(networkinput.id_col_b,line)# 10) further attributes# getting additional edge and node attributesattrs_edge=self._process_attrs(line,networkinput.extra_edge_attrs,lnum,)attrs_node_a=self._process_attrs(line,networkinput.extra_node_attrs_a,lnum,)attrs_node_b=self._process_attrs(line,networkinput.extra_node_attrs_b,lnum,)# 11) creating the Evidence objectevidences=evidence.Evidences(evidences=(evidence.Evidence(resource=_res,references=Noneif_res.viaelserefs,attrs=attrs_edge,)for_resin_resources_secondary+(_resource,)))# 12) node attributes that# depend on the interaction directionifnetworkinput.mark_source:attrs_node_a[networkinput.mark_source]=this_edge_dirifnetworkinput.mark_target:attrs_node_b[networkinput.mark_target]=this_edge_dir# 13) all interaction data goes into a dictnew_edge={'id_a':id_a,'id_b':id_b,'id_type_a':id_type_a,'id_type_b':id_type_b,'entity_type_a':entity_type_a,'entity_type_b':entity_type_b,'source':resource,'is_directed':this_edge_dir,'references':refs,'positive':positive,'negative':negative,'taxon_a':taxon_a,'taxon_b':taxon_b,'interaction_type':networkinput.interaction_type,'evidences':evidences,'attrs_node_a':attrs_node_a,'attrs_node_b':attrs_node_b,'attrs_edge':attrs_edge,}ifread_error:self._log('Errors occured, certain lines skipped.''Trying to read the remaining.\n',5,)edge_list.append(new_edge)iffirst_nandlen(edge_list)>=first_n:breakexceptExceptionase:self._log('Error at loading resource `%s`.'%networkinput.name)raiseeifhasattr(infile,'close'):infile.close()# 14) ID translation of edgesedge_list_mapped=self._map_list(edge_list,expand_complexes=expand_complexes,)self._log('%u lines have been read from %s, ''%u links after mapping; ''%u lines filtered by filters; ''%u lines filtered because lack of references; ''%u lines filtered by taxon filters.'%(lnum-1,networkinput.input,len(edge_list_mapped),input_filtered,ref_filtered,taxon_filtered,))ifrereadorredownload:pickle.dump(edge_list_mapped,open(edges_cache,'wb'),-1)self._log('ID translated edge list saved to %s'%edges_cache)else:self._log('Previously ID translated edge list ''has been loaded from `%s`.'%edges_cache)ifkeep_raw:self.raw_data[networkinput.name]=edge_list_mappedself.edge_list_mapped=edge_list_mappeddef_lookup_cache(self,name,cache_files,int_cache,edges_cache):""" Checks up the cache folder for the files of a given resource. First checks if *name* is on the *cache_files* dictionary. If so, loads either the interactions or edges otherwise. If not, checks *edges_cache* or *int_cache* otherwise. :arg str name: Name of the resource (lower-case). :arg dict cache_files: Contains the resource name(s) [str] (keys) and the corresponding cached file name [str] (values). :arg str int_cache: Path to the interactions cache file of the resource. :arg str edges_cache: Path to the edges cache file of the resource. :return: * (*file*) -- The loaded pickle file from the cache if the file is contains the interactions. ``None`` otherwise. * (*list*) -- List of mapped edges if the file contains the information from the edges. ``[]`` otherwise. """cache_files=cache_filesor{}infile=Noneedge_list_mapped=[]cache_file=cache_files[name]ifnameincache_fileselseNoneifcache_fileisnotNoneandos.path.exists(cache_file):cache_type=cache_file.split('.')[-2]ifcache_type=='interactions':infile=self.read_from_cache(int_cache)elifcache_type=='edges':edge_list_mapped=self.read_from_cache(edges_cache)elifos.path.exists(edges_cache):edge_list_mapped=self.read_from_cache(edges_cache)elifos.path.exists(int_cache):infile=self.read_from_cache(int_cache)returninfile,edge_list_mapped@classmethoddef_filters(cls,line,positive_filters=None,negative_filters=None,):""" Applies negative and positive filters on a line (record from an interaction database). If returns ``True`` the interaction will be discarded, if ``False`` the interaction will be further processed and if all other criteria fit then will be added to the network after identifier translation. Return (bool): True if the line should be filtered (removed), False if all filters passed, the record can be further processed. """return(cls._process_filters(line,negative_filters,False)orcls._process_filters(line,positive_filters,True))@classmethoddef_process_filters(cls,line,filters=None,negate=False):""" Args negate (bool): Whether to negate the filter matches. Sorry for the confusion, but it should be True for positive filters and False for negatives. Return (bool): True if the line should be filtered (removed), False if all filters passed, the record can be further processed. """_negate=(lambdax:notx)ifnegateelse(lambdax:x)filters=filtersor()forfiltrinfilters:if_negate(cls._process_filter(line,filtr)):returnTruereturnFalse@classmethoddef_process_filter(cls,line,filtr):""" Return (bool): True if the filter matches. """ifcallable(filtr):iffiltr(line):returnTrueelse:iflen(filtr)>2:sep=filtr[2]thisVal=set(line[filtr[0]].split(sep))else:thisVal=common.to_set(line[filtr[0]])filtrVal=common.to_set(filtr[1])returnbool(thisVal&filtrVal)def_process_sign(self,sign_data,sign_def):""" Processes the sign of an interaction, used when processing an input file. :arg str sign_data: Data regarding the sign to be processed. :arg tuple sign_def: Contains information about how to process *sign_data*. This is defined in :py:mod:`pypath.data_formats`. First element determines the position on the direction information of each line on the data file [int], second element is either [str] or [list] and defines the terms for which an interaction is defined as stimulation, third element is similar but for the inhibition and third (optional) element determines the separator for *sign_data* if contains more than one element. :return: * (*bool*) -- Determines whether the processed interaction is considered stimulation (positive) or not. * (*bool*) -- Determines whether the processed interaction is considered inhibition (negative) or not. """positive=Falsenegative=Falsesign_sep=sign_def[3]iflen(sign_def)>3elseNonesign_data=sign_data.split(sign_sep)ifsign_sepelsesign_datasign_data=common.to_set(sign_data)pos=common.to_set(sign_def[1])neg=common.to_set(sign_def[2])ifbool(sign_data&pos):positive=Trueifbool(sign_data&neg):negative=Truereturnpositive,negativedef_process_direction(self,line,dir_col,dir_val,dir_sep):""" Processes the direction information of an interaction according to a data file from a source. :arg list line: The stripped and separated line from the resource data file containing the information of an interaction. :arg int dir_col: The column/position number where the information about the direction is to be found (on *line*). :arg list dir_val: Contains the terms [str] for which that interaction is to be considered directed. :arg str dir_sep: Separator for the field in *line* containing the direction information (if any). :return: (*bool*) -- Determines whether the given interaction is directed or not. """ifisinstance(dir_col,bool):returndic_colif(dir_valisNoneandisinstance(dir_col,int)andisinstance(line[dir_col],bool)):returnline[dir_col]ifdir_colisNoneordir_valisNone:returnFalseelse:value=line[dir_col].split(dir_sep)ifdir_sepelseline[dir_col]value=common.to_set(value)returnbool(value&dir_val)def_process_field(self,fmt,line):""" Extract a value from a line describing an interaction. Args fmt (str, tuple, callable): The value, or a definition how to process it. line (list): The raw interaction record. Return (str): The extracted value. """ifcommon.is_str(fmt)orisinstance(fmt,list):returnfmtelifcallable(fmt):returnfmt(line)ifisinstance(fmt,int):idx,dct=fmt,{}elifisinstance(fmt,tuple):idx,dct=fmtval=line[idx]val=dct.get(val,val)returnval@staticmethoddef_process_partner(fmt,line):ifisinstance(fmt,int):partner=line[fmt]elifisinstance(fmt,tuple):idx,proc=fmtobj=lineifidxisNoneelseline[idx]partner=proc(obj)returnpartner.strip()ifhasattr(partner,'strip')elsepartnerdef_map_list(self,lst,single_list=False,expand_complexes=True,):""" Maps the names from a list of edges or items (molecules). :arg list lst: List of items or edge dictionaries whose names have to be mapped. :arg bool single_list: Optional, ``False`` by default. Determines whether the provided elements are items or edges. This is, either calls :py:meth:`pypath.main.PyPath.map_edge` or :py:meth:`pypath.main.PyPath.map_item` to map the item names. :arg bool expand_complexes: Expand complexes, i.e. create links between each member of the complex and the interacting partner. :return: (*list*) -- Copy of *lst* with their elements' names mapped. """list_mapped=[]ifsingle_list:foriteminlst:list_mapped+=self._map_item(item,expand_complexes=expand_complexes,)else:foredgeinlst:list_mapped+=self._map_edge(edge,expand_complexes=expand_complexes,)returnlist_mappeddef_map_item(self,item,expand_complexes=True):""" Translates the name in *item* representing a molecule. Default name types are defined in :py:attr:`pypath.main.PyPath.default_name_type` If the mapping is unsuccessful, the item will be added to :py:attr:`pypath.main.PyPath.unmapped` list. :arg dict item: Item whose name is to be mapped to a default name type. :arg bool expand_complexes: Expand complexes, i.e. create links between each member of the complex and the interacting partner. :return: (*list*) -- The default mapped name(s) [str] of *item*. """# TODO: includedefault_id=mapping.map_name(item['name'],item['id_type'],self.default_name_types[item['type']],expand_complexes=expand_complexes,)iflen(default_id)==0:self.unmapped.append(item['name'])returndefault_iddef_map_edge(self,edge,expand_complexes=True):""" Translates the identifiers in *edge* representing an edge. Default name types are defined in :py:attr:`pypath.main.PyPath.default_name_type` If the mapping is unsuccessful, the item will be added to :py:attr:`pypath.main.PyPath.unmapped` list. :arg dict edge: Item whose name is to be mapped to a default name type. :arg bool expand_complexes: Expand complexes, i.e. create links between each member of the complex and the interacting partner. :return: (*list*) -- Contains the edge(s) [dict] with default mapped names. """edge_stack=[]defnt=self.default_name_typesdef_name_type_a=defnt.get(edge['entity_type_a'],edge['id_type_a'])def_name_type_b=defnt.get(edge['entity_type_b'],edge['id_type_b'])default_id_a=mapping.map_name(edge['id_a'],edge['id_type_a'],def_name_type_a,ncbi_tax_id=edge['taxon_a'],expand_complexes=expand_complexes,)default_id_b=mapping.map_name(edge['id_b'],edge['id_type_b'],def_name_type_b,ncbi_tax_id=edge['taxon_b'],expand_complexes=expand_complexes,)# this is needed because the possibility ambigous mapping# and expansion of complexes# one name can be mapped to multiple ones# this multiplies the nodes and edges# in case of proteins this does not happen too oftenforid_a,id_binitertools.product(default_id_a,default_id_b):this_edge=copy_mod.copy(edge)this_edge['default_name_a']=id_athis_edge['default_name_type_a']=def_name_type_athis_edge['default_name_b']=id_bthis_edge['default_name_type_b']=def_name_type_bedge_stack.append(this_edge)returnedge_stackdef_process_attrs(self,line,spec,lnum):""" Extracts the extra (custom, resource specific) attributes from a line of the input based on the given specification (defined in the network input definition). """attrs={}forcolinspec.keys():# extra_edge_attrs and extra_node_attrs are dicts# of additional parameters assigned to edges and nodes# respectively;# key is the name of the parameter, value is the col number,# or a tuple of col number and the separator,# if the column contains additional subfields e.g. (5, ";")try:ifspec[col].__class__istuple:ifhasattr(spec[col][1],'__call__'):field_value=spec[col][1](line[spec[col][0]])else:field_value=line[spec[col][0]].split(spec[col][1])else:field_value=line[spec[col]]except:self._log('Wrong column index (%s) in extra attributes? ''Line #%u'%(str(col),lnum),-5,)field_name=colattrs[field_name]=field_valuereturnattrsdef_process_taxon(self,tax_dict,fields):# TODO""" """ifisinstance(tax_dict,int):returntax_dictelif'A'intax_dictand'B'intax_dict:return(self._process_taxon(tax_dict['A'],fields),self._process_taxon(tax_dict['B'],fields),)else:if'dict'notintax_dict:returnint(fields[tax_dict['col']])eliffields[tax_dict['col']]intax_dict['dict']:returntax_dict['dict'][fields[tax_dict['col']]]else:returnNonedef_match_taxon(self,tax_dict,taxon,only_default_organism=False):has_dict=isinstance(tax_dict,dict)has_include=has_dictand'include'intax_dicthas_exclude=has_dictand'exclude'intax_dictreturn((taxon==_const.NOT_ORGANISM_SPECIFIC)or(has_includeandtaxonintax_dict['include'])or(has_excludeandtaxonnotintax_dict['exclude'])or(nothas_includeandnothas_excludeand(notonly_default_organismortaxon==self.ncbi_tax_id)))def_add_edge_list(self,edge_list=False,regulator=False,only_directions=False,allow_loops=None,):""" Adds edges to the network from *edge_list* obtained from file or other input method. If none is passed, checks for such data in :py:attr:`pypath.network.Network.edge_list_mapped`. :arg str edge_list: Optional, ``False`` by default. The source name of the list of edges to be added. This must have been loaded previously (e.g.: with :py:meth:`pypath.main.PyPath.read_data_file`). If none is passed, loads the data directly from :py:attr:`pypath.main.PyPath.raw_data`. :arg bool regulator: Optional, ``False`` by default. If set to ``True``, non previously existing nodes, will not be added (and hence, the edges involved). """self._log('Adding preprocessed edge list to existing network.')allow_loops=self._allow_loops(allow_loops=allow_loops)ifnotedge_list:if(hasattr(self,'edge_list_mapped')andself.edge_list_mappedisnotNone):edge_list=self.edge_list_mappedelse:self._log('_add_edge_list(): No data, nothing to do.')returnTrueifisinstance(edge_list,str):ifedge_listinself.raw_data:edge_list=self.raw_data[edge_list]else:self._log('`%s` looks like a source name, but no data ''available under this name.'%edge_list)returnFalseself._filtered_loops=0prg=progress.Progress(iterable=edge_list,name='Processing interactions',)foreinprg:self._add_update_edge(e,allow_loops=allow_loops,only_directions=only_directions,)self._log('New network resource added, current number ''of nodes: %u, edges: %u.'%(self.vcount,self.ecount))ifnotallow_loops:self._log('Loop edges discarded: %u'%self._filtered_loops)delattr(self,'_filtered_loops')self.raw_data=Nonedef_add_update_edge(self,edge,allow_loops=None,only_directions=False,):""" Adds a new interaction (edge) or updates the attributes of the edge if it already exists. :arg dict edge: A dictionary describing an edge (interaction) with the following items: :item str id_a: Name of the source node of the edge to be added/updated. :item str id_b: Name of the source node of the edge to be added/updated. :item set source: Or [list], contains the names [str] of the resources supporting that edge. :item pypath.evidence.Evidence evidence: A ``pypath.evidence.Evidence`` object. :item bool is_directed: Whether if the edge is directed or not. :item set refs: Or [list], contains the instances of the references :py:class:`pypath.refs.Reference` for that edge. :item bool stim: Whether the edge is stimulatory or not. :item bool inh: Whether the edge is inhibitory or note :item int taxon_a: NCBI Taxonomic identifier of the source molecule. :item int taxon_b: NCBI Taxonomic identifier of the target molecule. :item str typ: The type of interaction (e.g.: ``'trascriptional'``) :item dict extra_attrs: Optional, ``{}`` by default. Contains any extra attributes for the edge to be updated. :arg bool only_directions: Optional, ``False`` by default. If set to ``True`` and the edge is not in the network, it won't be created. If it already exists the attributes of the new edge will be added to the existing one. """(id_a,id_b,id_type_a,id_type_b,entity_type_a,entity_type_b,source,evidences,is_directed,refs,positive,negative,taxon_a,taxon_b,interaction_type,extra_attrs,extra_attrs_a,extra_attrs_b,)=(edge['default_name_a'],edge['default_name_b'],edge['default_name_type_a'],edge['default_name_type_b'],edge['entity_type_a'],edge['entity_type_b'],edge['source'],edge['evidences'],edge['is_directed'],edge['references'],edge['positive'],edge['negative'],edge['taxon_a'],edge['taxon_b'],edge['interaction_type'],edge['attrs_edge'],edge['attrs_node_a'],edge['attrs_node_b'],)allow_loops=allow_loopsorself.allow_loopsrefs={refs_mod.Reference(pmid)forpmidinrefs}entity_a=entity_mod.Entity(identifier=id_a,id_type=id_type_a,entity_type=entity_type_a,taxon=taxon_a,attrs=extra_attrs_a,)entity_b=entity_mod.Entity(identifier=id_b,id_type=id_type_b,entity_type=entity_type_b,taxon=taxon_b,attrs=extra_attrs_b,)interaction=interaction_mod.Interaction(a=entity_a,b=entity_b,attrs=extra_attrs,)ifnotallow_loopsandinteraction.is_loop():self._filtered_loops+=1returnifis_directed:interaction.add_evidence(evidence=evidences,direction=(entity_a,entity_b),)else:interaction.add_evidence(evidence=evidences,direction='undirected',)# setting signs:ifpositive:interaction.add_evidence(evidence=evidences,direction=(entity_a,entity_b),effect=1,)ifnegative:interaction.add_evidence(evidence=evidences,direction=(entity_a,entity_b),effect=-1,)ifis_directedandnotpositiveandnotnegative:interaction.add_evidence(evidence=evidences,direction=(entity_a,entity_b),effect=0,)self.add_interaction(interaction,attrs=extra_attrs,only_directions=only_directions,)
[docs]deforganisms_check(self,organisms=None,remove_mismatches=True,remove_nonspecific=False,):""" Scans the network for one or more organisms and removes the nodes and interactions which belong to any other organism. :arg int,set,NoneType organisms: One or more NCBI Taxonomy IDs. If ``None`` the value in :py:attr:`ncbi_tax_id` will be used. If that's too is ``None`` then only the entities with discrepancy between their stated organism and their identifier. :arg bool remove_mismatches: Remove the entities where their ``identifier`` can not be found in the reference list from the database for their ``taxon``. :arg bool remove_nonspecific: Remove the entities with taxonomy ID zero, which is used to represent the non taxon specific entities such as metabolites or drug compounds. """self._log('Checking organisms. %u nodes and %u interactions before.'%(self.vcount,self.ecount,))organisms=common.to_set(organismsorself.ncbi_tax_id)to_remove=set()fornodeinself.nodes.values():if(organismsandnode.taxon!=_const.NOT_ORGANISM_SPECIFICandnode.taxonnotinorganisms):to_remove.add(node)if((remove_mismatchesandnotnode.entity_typein{'complex','lncrna','drug','small_molecule'}andnotreflists.check(name=node.identifier,id_type=node.id_type,ncbi_tax_id=node.taxon,))or(remove_nonspecificandnotnode.taxon)):to_remove.add(node)fornodeinto_remove:self.remove_node(node)self._log('Finished checking organisms. ''%u nodes have been removed, ''%u nodes and %u interactions remained.'%(len(to_remove),self.vcount,self.ecount,))
[docs]defget_organisms(self):""" Returns the set of all NCBI Taxonomy IDs occurring in the network. """return{n.taxonforninself.nodes.values()}
[docs]defmake_df(self,records=None,by_source=None,with_references=None,columns=None,dtype=None,):""" Creates a ``pandas.DataFrame`` from the interactions. """self._log('Creating interactions data frame.')by_source=by_sourceifby_sourceisnotNoneelseself.df_by_sourcewith_references=(with_referencesifwith_referencesisnotNoneelseself.df_with_references)columns=columnsorself.df_columnsdtype=dtypeorself.df_dtypeifnotdtype:dtype={'id_a':'category','id_b':'category','type_a':'category','type_b':'category','effect':'int8','type':'category','dmodel':'category'ifby_sourceelse'object','sources':'category'ifby_sourceelse'object','references':'object'ifwith_referenceselse'category',}ifnotrecords:records=self.generate_df_records(by_source=by_source,with_references=with_references,)ifnotisinstance(records,(list,tuple,np.ndarray)):records=list(records)ifnotcolumnsandhasattr(records[0],'_fields'):columns=records[0]._fieldsself.records=recordsself.dtype=dtypeself.df=pd.DataFrame(records,columns=columns,)### why?ifdtype:self.df=self.df.astype(dtype)self._log('Interaction data frame ready. ''Memory usage: %s '%common.df_memory_usage(self.df))
[docs]@classmethoddeffrom_igraph(cls,pa,**kwargs):""" Creates an instance from an ``igraph.Graph`` based ``pypath.main.PyPath`` object. :arg pypath.main.PyPath pa: A ``pypath.main.PyPath`` object with network data loaded. """obj=cls(**kwargs)foriainpa.graph.es['attrs']:obj.add_interaction(ia)returnobj
[docs]defadd_interaction(self,interaction,attrs=None,only_directions=False,):""" Adds a ready ``pypath.interaction.Interaction`` object to the network. If an interaction between the two endpoints already exists, the interactions will be merged: this stands for the directions, signs, evidences and other attributes. :arg interaction.Interaction interaction: A ``pypath.interaction.Interaction`` object. :arg NoneType,dict attrs: Optional, a dictionary of extra (usually resource specific) attributes. :arg bool only_directions: If the interaction between the two endpoints does not exist it won't be added to the network. Otherwise all attributes (direction, effect sign, evidences, etc) will be merged to the existing interaction. Apart from the endpoints also the ``interaction_type`` of the existing interaction has to match the interaction added here. """attrs=attrsor{}key=(interaction.a,interaction.b)ifkeynotinself.interactions:ifonly_directions:returnelse:self.interactions[key]=interactionelse:ifonly_directions:if(self.interactions[key].get_interaction_types()&interaction.get_interaction_types()):foritype_to_removein(interaction.get_interaction_types()-self.interactions[key].get_interaction_types()):interaction.unset_interaction_type(itype_to_remove)else:returnself.interactions[key]+=interactionself.interactions[key].update_attrs(**attrs)self.add_node(interaction.a,add=notonly_directions)self.add_node(interaction.b,add=notonly_directions)self.interactions_by_nodes[interaction.a].add(key)self.interactions_by_nodes[interaction.b].add(key)
[docs]defadd_node(self,entity,attrs=None,add=True):""" Adds a molecular entity to the py:attr:``nodes`` and py:attr:``nodes_by_label`` dictionaries. :arg entity.Entity entity: An object representing a molecular entity. :arg NoneType,dict attrs: Optional extra attributes to be assigned to the entity. :arg bool add: Whether to add a new molecular entity to the network if it does not exist yet. If ``False`` will only update attributes for existing entities otherwise will do nothing. """ifattrs:entity.update_attrs(**attrs)ifentity.identifierinself.nodes:self.nodes[entity.identifier]+=entityelifadd:self.nodes[entity.identifier]=entityself.nodes_by_label[entity.labelorentity.identifier]=entity
[docs]defremove_node(self,entity):""" Removes a node with all its interactions. If the removal of the interactions leaves any of the partner nodes without interactions it will be removed too. :arg str,Entity entity: A molecular entity identifier, label or ``Entity`` object. """entity=self.entity(entity)ifnotentity:return_=self.nodes.pop(entity.identifier,None)_=self.nodes_by_label.pop(entity.label,None)ifentityinself.interactions_by_nodes:partners=set()fori_keyinself.interactions_by_nodes[entity].copy():self.remove_interaction(*i_key)_=self.interactions_by_nodes.pop(entity,None)
[docs]defremove_interaction(self,entity_a,entity_b):""" Removes the interaction between two nodes if exists. :arg str,Entity entity_a,entity_b: A pair of molecular entity identifiers, labels or ``Entity`` objects. """entity_a=self.entity(entity_a)entity_b=self.entity(entity_b)key_ab=(entity_a,entity_b)key_ba=(entity_b,entity_a)_=self.interactions.pop(key_ab,None)_=self.interactions.pop(key_ba,None)keys={key_ab,key_ba}self.interactions_by_nodes[entity_a]-=keysself.interactions_by_nodes[entity_b]-=keysif(entity_ainself.interactions_by_nodesandnotself.interactions_by_nodes[entity_a]):self.remove_node(entity_a)if(entity_binself.interactions_by_nodesandnotself.interactions_by_nodes[entity_b]):self.remove_node(entity_b)
[docs]defremove_zero_degree(self):""" Removes all nodes with no interaction. """self._log('Removing zero degree nodes. ''%u nodes and %u interactions before.'%(self.vcount,self.ecount,))to_remove=set()fornode,interactionsiniteritems(self.interactions_by_nodes):ifnotinteractions:to_remove.add(node)fornodeinto_remove:self.remove_node(node)self._log('Finished removing zero degree nodes. ''%u nodes have been removed, ''%u nodes and %u interactions remained.'%(len(to_remove),self.vcount,self.ecount,))
[docs]defremove_loops(self):""" Removes the loop interactions from the network i.e. the ones with their two endpoints being the same entity. """self._log('Removing loop edges. Number of edges before: %u.'%len(self))foriainlist(self):ifia.is_loop():self.remove_interaction(ia.a,ia.b)self._log('Removed loop edges. Number of edges after: %u.'%len(self))
@propertydefresources(self):""" Returns a set of all resources. """returnset.union(*(ia.get_resources()foriainself))@propertydefresource_names(self):""" Returns a set of all resource names. """returnset.union(*(ia.get_resource_names()foriainself))
[docs]defentities_by_resource(self):""" Returns a dict of sets with resources as keys and sets of entity IDs as values. """returndict((resource,set(itertools.chain(*self.df[[resourceinresourcesforresourcesinself.df.sources]][['id_a','id_b']].values)))forresourceinself.resources)
[docs]defentity_by_id(self,identifier):""" Returns a ``pypath.entity.Entity`` object representing a molecular entity by looking it up by its identifier. If the molecule does not present in the current network ``None`` will be returned. :arg str identifier: The identifier of a molecular entity. Unless it's been set otherwise for genes/proteins it is the UniProt ID. E.g. ``'P00533'``. """ifidentifierinself.nodes:returnself.nodes[identifier]
[docs]defentity_by_label(self,label):""" Returns a ``pypath.entity.Entity`` object representing a molecular entity by looking it up by its label. If the molecule does not present in the current network ``None`` will be returned. :arg str label: The label of a molecular entity. Unless it's been set otherwise for genes/proteins it is the Gene Symbol. E.g. ``'EGFR'``. """iflabelinself.nodes_by_label:returnself.nodes_by_label[label]
[docs]definteraction(self,a,b):""" Retrieves the interaction `a --> b` if it exists in the network, otherwise `b --> a`. If no interaction exist between `a` and `b` returns `None`. """entity_a=self.entity(a)entity_b=self.entity(b)key_ab=(entity_a,entity_b)key_ba=(entity_b,entity_a)ifkey_abinself.interactions:returnself.interactions[key_ab]elifkey_bainself.interactions:returnself.interactions[key_ba]
[docs]defrandom_interaction(self,**kwargs):""" Picks a random interaction from the network. Returns An Interaction object, or None if the network is empty. """key=Nonekeys=(self.get_interactions(**kwargs)ifkwargselseself.interactions.keys())for_,keyinzip(range(random.randint(0,len(self))+1),keys):passifkey:key=tuple(sorted(key,key=lambdae:e.identifier))returnself.interactions[key]ifkeyelseNone
[docs]definteraction_by_id(self,id_a,id_b):""" Returns a ``pypath.interaction.Interaction`` object by looking it up based on a pair of identifiers. If the interaction does not exist in the network ``None`` will be returned. :arg str id_a: The identifier of one of the partners in the interaction. Unless it's been set otherwise for genes/proteins it is the UniProt ID. E.g. ``'P00533'``. :arg str id_b: The other partner, similarly to ``id_a``. The order of the partners does not matter here. """returnself._get_interaction(id_a,id_b)
[docs]definteraction_by_label(self,label_a,label_b):""" Returns a ``pypath.interaction.Interaction`` object by looking it up based on a pair of labels. If the interaction does not exist in the network ``None`` will be returned. :arg str label_a: The label of one of the partners in the interaction. Unless it's been set otherwise for genes/proteins it is the Gene Symbol. E.g. ``'EGFR'``. :arg str label_b: The other partner, similarly to ``label_a``. The order of the partners does not matter here. """returnself._get_interaction(label_a,label_b,name_type='label')
[docs]defto_igraph(self):""" Converts the network to the legacy ``igraph.Graph`` based ``PyPath`` object. """raiseNotImplementedError
[docs]defsave_to_pickle(self,pickle_file):""" Saves the network to a pickle file. :arg str pickle_file: Path to the pickle file. """self._log('Saving to pickle `%s`.'%pickle_file)withopen(pickle_file,'wb')asfp:pickle.dump(obj=(self.interactions,self.nodes,self.nodes_by_label,),file=fp,)self._update_interactions_by_nodes()self._log('Saved to pickle `%s`.'%pickle_file)
[docs]defload_from_pickle(self,pickle_file):""" Loads the network to a pickle file. :arg str pickle_file: Path to the pickle file. """self._log('Loading from pickle `%s`.'%pickle_file)withopen(pickle_file,'rb')asfp:(self.interactions,self.nodes,self.nodes_by_label,)=pickle.load(fp)self._update_interactions_by_nodes()self._log('Loaded from pickle `%s`.'%pickle_file)
[docs]@classmethoddeffrom_pickle(cls,pickle_file:str,**kwargs):""" Initializes a new ``Network`` object by loading it from a pickle file. Returns a ``Network`` object. Args pickle_file: Path to a pickle file. kwargs: Passed to ``Network.__init__``. """new=cls(pickle_file=pickle_file,**kwargs)returnnew
[docs]defextra_directions(self,resources='extra_directions',use_laudanna=False,use_string=False,dataset='directionextra',):""" Adds additional direction & effect information from resources having no literature curated references, but giving sufficient evidence about the directionality for interactions already supported by literature evidences from other sources. """resources=(getattr(network_resources,resources)ifisinstance(resources,str)elselist(resources))ifuse_laudanna:resources.append(network_resources.pathway_bad['laudanna_effects'])resources.append(network_resources.pathway_bad['laudanna_directions'])ifuse_string:passresources=resource_formats.NetworkDataset(name=dataset,resources=resources,)self.load(resources=resources,only_directions=True)
@staticmethoddefomnipath_resources(omnipath=None,kinase_substrate_extra=False,ligand_receptor_extra=False,pathway_extra=False,old_omnipath_resources=False,exclude=None,)->list[resource_formats.NetworkResource]:defreference_constraints(resources,data_model,relax=True):result=[]resources=(resources.values()ifisinstance(resources,dict)elseresources)resources=copy_mod.deepcopy(resources)forresinresources:ifres.data_model==data_model:res.networkinput.must_have_references=notrelaxresult.append(res)returnresultomnipath=omnipathorcopy_mod.deepcopy(network_resources.omnipath)exclude=common.to_set(exclude)ifold_omnipath_resources:interaction_resources=(copy_mod.deepcopy(network_resources.interaction))omnipath=copy_mod.deepcopy(omnipath)omnipath['biogrid']=interaction_resources['biogrid']omnipath['alz']=interaction_resources['alz']omnipath['netpath']=interaction_resources['netpath']exclude.update({'IntAct','HPRD'})else:omnipath['huri']=copy_mod.deepcopy(network_resources.interaction_misc['huri'])omnipath=list(omnipath.without(exclude))fordataset,data_model,enabledin(('pathwayextra','activity_flow',pathway_extra),('ligrecextra','ligand_receptor',ligand_receptor_extra),('kinaseextra','enzyme_substrate',kinase_substrate_extra),):ifenabled:extra=list(resource_formats.NetworkDataset(name=dataset,resources=reference_constraints(omnipath,data_model,),))omnipath.extend(extra)returnomnipathdefload_omnipath(self,omnipath=None,kinase_substrate_extra=False,ligand_receptor_extra=False,pathway_extra=False,extra_directions=True,remove_htp=False,htp_threshold=1,keep_directed=True,remove_undirected=True,min_refs_undirected=None,min_resources_undirected=2,old_omnipath_resources=False,exclude=None,pickle_file=None,allow_loops=None,):self._log('Loading the `OmniPath` network.')ifpickle_file:self.load(pickle_file=pickle_file)returnomnipath=self.omnipath_resources(omnipath=omnipath,kinase_substrate_extra=kinase_substrate_extra,ligand_receptor_extra=ligand_receptor_extra,pathway_extra=pathway_extra,old_omnipath_resources=old_omnipath_resources,exclude=exclude,)self.load(omnipath,exclude=exclude,allow_loops=allow_loops)fordataset,label,enabledin(('pathwayextra','activity flow',pathway_extra),('ligrecextra','ligand-receptor',ligand_receptor_extra),('kinaseextra','enzyme-PTM',kinase_substrate_extra),):ifenabled:self._log(f'Loading extra {label} interactions.')self.load(getattr(network_resources,dataset).rename(dataset),exclude=exclude,)ifextra_directions:self.extra_directions()ifremove_htp:self.remove_htp(threshold=htp_threshold,keep_directed=keep_directed,)ifremove_undirected:self.remove_undirected(min_refs=min_refs_undirected,min_resources=min_resources_undirected,)self._log('Finished loading the `OmniPath` network.')defremove_htp(self,threshold=50,keep_directed=False):self._log('Removing high-throughput interactions above threshold %u'' interactions per reference. Directed interactions %s.'%(threshold,'will be kept'ifkeep_directedelse'also will be removed'))to_remove=self.htp_interactions(threshold=threshold,ignore_directed=keep_directed,)ecount_before=self.ecountvcount_before=self.vcountforkeyinto_remove:self.remove_interaction(*key)self._log('Interactions with only high-throughput references ''have been removed. %u interactions removed. ''Number of edges decreased from %u to %u, ''number of nodes from %u to %u.'%(len(to_remove),ecount_before,self.ecount,vcount_before,self.vcount,))
[docs]defhtp_references(self,threshold=50):""" Collects the high-throughput references i.e. the ones cited at a higher number of interactions than ``threshold``. """interactions_per_reference=self.numof_interactions_per_reference()htp_refs={refforref,cntiniteritems(interactions_per_reference)ifcnt>threshold}self._log('High-throughput references collected: %u'%len(htp_refs))returnhtp_refs
[docs]defhtp_interactions(self,threshold=50,ignore_directed=False):""" Collects the interactions only from high-throughput studies. :returns: Set of interaction keys (tuples of entities). """htp_refs=self.htp_references(threshold=threshold)htp_int=set()forkey,iainiteritems(self.interactions):if((notignore_directedornotia.is_directed())andnotia.get_references()-htp_refs):htp_int.add(key)self._log('High-throughput interactions collected: %u'%len(htp_int))returnhtp_int
defremove_undirected(self,min_refs=None,min_resources=None):self._log('Removing undirected interactions%s%s%s.'%((' with less than %u references'%min_refs)ifmin_refselse'',' and'ifmin_refsandmin_resourceselse'',(' with less than %u resources '%min_resources),))ecount_before=self.ecountvcount_before=self.vcountto_remove=set()forkey,iainiteritems(self.interactions):if(notia.is_directed()and(notmin_refsoria.count_references()<min_refs)and(notmin_resourcesoria.count_resource_names()<min_resources)):to_remove.add(key)forkeyinto_remove:self.remove_interaction(*key)self._log('Undirected interactions %s have been removed. ''%u interactions removed. Number of edges ''decreased from %u to %u, number of vertices ''from %u to %u.'%(''ifmin_refsisNoneelse'with less than %u references'%min_refs,len(to_remove),ecount_before,self.ecount,vcount_before,self.vcount,))
[docs]defnumof_interactions_per_reference(self):""" Counts the number of interactions for each literature reference. Returns a ``collections.Counter`` object (similar to ``dict``). """returncollections.Counter(itertools.chain(*(ia.get_references()foriainself)))
[docs]definteractions_by_reference(self):""" Creates a ``dict`` with literature references as keys and interactions described by each reference as values. """interactions_by_reference=collections.defaultdict(set)fori_key,iainiteritems(self.interactions):forrefinia.get_references():interactions_by_reference[ref].add(i_key)returndict(interactions_by_reference)
## Methods for loading specific datasets or initializing the object# with loading datasets#@classmethoddefomnipath(cls,omnipath=None,kinase_substrate_extra=False,ligand_receptor_extra=False,pathway_extra=False,extra_directions=True,remove_htp=False,htp_threshold=1,keep_directed=True,min_refs_undirected=2,old_omnipath_resources=False,exclude=None,ncbi_tax_id=9606,**kwargs):make_df=kwargs.pop('make_df',None)new=cls(ncbi_tax_id=ncbi_tax_id,**kwargs)new.load_omnipath(omnipath=omnipath,kinase_substrate_extra=kinase_substrate_extra,ligand_receptor_extra=ligand_receptor_extra,pathway_extra=pathway_extra,extra_directions=extra_directions,remove_htp=remove_htp,htp_threshold=htp_threshold,keep_directed=keep_directed,min_refs_undirected=min_refs_undirected,old_omnipath_resources=old_omnipath_resources,exclude=exclude,)ifmake_df:cls.make_df()returnnew@staticmethoddefdorothea_resources(levels=None,expand_levels=None):expand_levels=(expand_levelsifisinstance(expand_levels,bool)elsesettings.get('dorothea_expand_levels'))dorothea=copy_mod.deepcopy(network_resources.transcription_dorothea)iflevels:dorothea['dorothea'].networkinput.input_args['levels']=levelsdorothea=(network_resources.dorothea_expand_levels(dorothea,levels=levels)ifexpand_levelselsedorothea)dorothea=dorothea.rename('dorothea')returndorotheadefload_dorothea(self,levels=None,expand_levels=None,**kwargs):dorothea=self.dorothea_resources(levels=levels,expand_levels=expand_levels,)self.load(dorothea,**kwargs)
[docs]@classmethoddefdorothea(cls,levels=None,ncbi_tax_id=9606,**kwargs):""" Initializes a new ``Network`` object with loading the transcriptional regulation network from DoRothEA. :arg NontType,set levels: The confidence levels to include. """make_df=kwargs.pop('make_df',False)new=cls(ncbi_tax_id=ncbi_tax_id,**kwargs)new.load_dorothea(levels=levels,make_df=make_df)returnnew
[docs]@classmethoddefcollectri(cls,ncbi_tax_id=9606,**kwargs):""" Initializes a new ``Network`` object with loading the transcriptional regulation network from CollecTRI. """make_df=kwargs.pop('make_df',False)new=cls(ncbi_tax_id=ncbi_tax_id,**kwargs)new.load_collectri(make_df=make_df)returnnew
[docs]@classmethoddeftranscription(cls,dorothea=True,original_resources=True,dorothea_levels=None,exclude=None,reread=False,redownload=False,make_df=False,ncbi_tax_id=9606,allow_loops=None,**kwargs):""" Initializes a new ``Network`` object with loading a transcriptional regulation network from all databases by default. Args kwargs: Passed to ``Network.__init__``. """load_args=locals()kwargs=load_args.pop('kwargs')ncbi_tax_id=load_args.pop('ncbi_tax_id')kwargs['ncbi_tax_id']=ncbi_tax_idcls=load_args.pop('cls')new=cls(**kwargs)new.load_transcription(**load_args)returnnew
[docs]@classmethoddefmirna_target(cls,resources=None,make_df=None,reread=False,redownload=False,exclude=None,ncbi_tax_id=9606,**kwargs):""" Initializes a new ``Network`` object with loading a miRNA-mRNA regulation network from all databases by default. Args kwargs: Passed to ``Network.__init__``. """new=cls(ncbi_tax_id=ncbi_tax_id,**kwargs)new.load_mirna_target(exclude=exclude,make_df=make_df,reread=reread,redownload=redownload,)returnnew
## Methods for querying partners by node#
[docs]defpartners(self,entity,mode='ALL',direction:bool|tuple|None=None,effect:bool|str|None=None,resources:str|set[str]|None=None,interaction_type:str|set[str]|None=None,data_model:str|set[str]|None=None,via:bool|str|set[str]|None=None,references:bool|str|set[str]|None=None,return_interactions:bool=False,):""" :arg str,Entity,list,set,tuple,EntityList entity: An identifier or label of a molecular entity or an :py:class:`Entity` object. Alternatively an iterator with the elements of any of the types valid for a single entity argument, e.g. a list of gene symbols. :arg str mode: Mode of counting the interactions: `IN`, `OUT` or `ALL` , whether to consider incoming, outgoing or all edges, respectively, respective to the `node defined in `entity``. :returns: :py:class:`EntityList` object containing the partners having interactions to the queried node(s) matching all the criteria. If ``entity`` doesn't present in the network the returned ``EntityList`` will be empty just like if no interaction matches the criteria. """if(notcommon.is_str(entity)andnothasattr(entity,'identifier')andhasattr(entity,'__iter__')):kwargs=locals()_=kwargs.pop('self')_=kwargs.pop('entity')_=kwargs.pop('return_interactions')returnentity_mod.EntityList(set(itertools.chain(*(self.partners(_entity,**kwargs)for_entityinentity))))entity=self.entity(entity)# we need to swap it to make it work relative to the queried entity_mode=('IN'ifmode=='OUT'else'OUT'ifmode=='IN'else'ALL')return(entity_mod.EntityList({partnerforiainself.interactions_by_nodes[entity]forpartnerinself.interactions[ia].get_degrees(mode=_mode,direction=direction,effect=effect,resources=resources,interaction_type=interaction_type,data_model=data_model,via=via,references=references,)ifpartner!=entityorself.interactions[ia].is_loop()}ifentityinself.interactions_by_nodeselse()))
[docs]defcount_partners(self,entity,**kwargs):""" Returns the count of the interacting partners for one or more entities according to the specified criteria. Please refer to the docs of the ``partners`` method. """returnlen(self.partners(entity=entity,**kwargs))
@classmethoddef_generate_partners_methods(cls):def_create_partners_method(method_args):count=method_args.pop('count')method='count_partners'ifcountelse'partners'@functools.wraps(method_args)def_partners_method(*args,**kwargs):self=args[0]kwargs.update(method_args)returngetattr(self,method)(*args[1:],**kwargs)_partners_method.__doc__=getattr(cls,method).__doc__return_partners_methodforname_parts,arg_partsin(zip(*param)forparaminitertools.product(*(iteritems(variety)forvarietyincls._partners_methods))):forcountin(False,True):method_args=dict(itertools.chain(*(iteritems(part)forpartinarg_parts)))method_name=''.join(name_parts)method_name=('count_%s'%method_nameifcountelsemethod_name)method_args['count']=countmethod=_create_partners_method(method_args)method.__name__=method_namesetattr(cls,method_name,method,)## Methods for selecting paths and motives in the network#
[docs]deffind_paths(self,start:(str|entity.Entity|entity.EntityList|Iterable[str|entity.Entity]),end:(str|entity.Entity|entity.EntityList|Iterable[str|entity.Entity]|None)=None,loops:bool=False,mode:Literal['OUT','IN','ALL']='OUT',maxlen:int=2,minlen:int=1,direction:bool|tuple|None=None,effect:bool|str|None=None,resources:str|set[str]|None=None,interaction_type:str|set[str]|None=None,data_model:str|set[str]|None=None,via:bool|str|set[str]|None=None,references:bool|str|set[str]|None=None,silent:bool=False,):""" Find paths or motifs in a network. Finds all paths up to length ``maxlen`` between groups of nodes. In addition is able to search for motifs or select the nodes of a subnetwork around certain nodes. Args start: Starting node(s) of the paths. end: Target node(s) of the paths. If ``None`` any target node will be accepted and all paths from the starting nodes with length ``maxlen`` will be returned. loops: Search for loops, i.e. the start and end nodes of each path should be the same. mode: Direction of the paths. ``'OUT'`` means from ``start`` to ``end``, ``'IN'`` the opposite direction while ``'ALL'`` both directions. maxlen: Maximum length of paths in steps, i.e. if maxlen = 3, then the longest path may consist of 3 edges and 4 nodes. minlen: Minimum length of the path. silent: Indicate progress by showing a progress bar. Details The arguments: ``direction``, ``effect``, ``resources``, ``interaction_type``, ``data_model``, ``via`` and ``references`` will be passed to the ``partners`` method of this object and from there to the relevant methods of the ``Interaction`` and ``Evidence`` objects. By these arguments it is possible to filter the interactions in the paths according to custom criteria. If any of these arguments is a ``tuple`` or ``list``, its first value will be used to match the first interaction in the path, the second for the second one and so on. If the list or tuple is shorter then ``maxlen``, its last element will be used for all interactions. If it's longer than ``maxlen``, the remaining elements will be discarded. This way the method is able to search for custom motives. For example, let's say you want to find the motives where the estrogen receptor transcription factor *ESR1* transcriptionally regulates a gene encoding a protein which then has some effect post-translationally on *ESR1*: Examples n.find_paths( 'ESR1', loops = True, minlen = 2, interaction_type = ('transcriptional', 'post_translational'), ) # Or if you are interested only in the -/+ feedback loops i.e. # *ESR1 --(-)--> X --(+)--> ESR1*: n.find_paths( 'ESR1', loops = True, minlen = 2, interaction_type = ('transcriptional', 'post_translational'), effect = ('negative', 'positive'), ) """deflist_of_entities(entities):entities=((entities,)ifisinstance(entities,(str,entity_mod.Entity))elseentities)entities=[self.entity(en)foreninentities]returnentitiesdefinteraction_arg(value):value=(tuple(value)ifisinstance(value,(tuple,list))else(value,))value=value+(value[-1],)*(maxlen-len(value))value=value[:maxlen]returnvaluedeffind_all_paths_aux(start,end,path,maxlen=None):path=path+[start]if(len(path)>=minlen+1and(start==endor(endisNoneandnotloopsandlen(path)==maxlen+1)or(loopsandpath[0]==path[-1]))):return[path]paths=[]iflen(path)<=maxlen:next_steps=set(self.partners(entity=start,**interaction_args[len(path)-1]))next_steps=next_stepsifloopselsenext_steps-set(path)fornodeinnext_steps:paths.extend(find_all_paths_aux(node,end,path,maxlen))returnpathsminlen=max(1,minlen)start=list_of_entities(start)end=list_of_entities(end)ifendelse(None,)interaction_args={'mode':interaction_arg(mode),'direction':interaction_arg(direction),'effect':interaction_arg(effect),'resources':interaction_arg(resources),'interaction_type':interaction_arg(interaction_type),'data_model':interaction_arg(data_model),'via':interaction_arg(via),'references':interaction_arg(references),}interaction_args=tuple(dict((key,interaction_args[key][i])forkeyininteraction_args.keys())foriinrange(maxlen))all_paths=[]ifnotsilent:prg=progress.Progress(len(start)*len(end),'Looking up all paths up to length %u'%maxlen,1)forsinstart:foreinend:ifnotsilent:prg.step()all_paths.extend(find_all_paths_aux(s,e,[],maxlen))ifnotsilent:prg.terminate()returnall_paths
## Methods for collecting interaction attributes across the network#def_collect(self,what,by=None,add_total=False,**kwargs):""" Collects the values of an attribute over all interactions in the network. Args kwargs: Passed to methods of :py:class:`pypath.interaction.Interaction`. """result=set()ifnotbyelsecollections.defaultdict(set)method=self._get_by_method_name(what,by)ifnothasattr(interaction_mod.Interaction,method):self._log('Collecting attributes: no such method: `%s`.'%method)else:foriainself:ia_attrs=getattr(ia,method)(**kwargs)ifby:forgrp,valiniteritems(ia_attrs):result[grp].update(val)else:result.update(ia_attrs)ifbyandadd_total:result['total']=set.union(*result.values())returndict(result)ifbyelseresult@classmethoddef_generate_collect_methods(cls):def_create_collect_method(what):@functools.wraps(what)def_collect_method(self,**kwargs):kwargs['what']=whatself._log('Collecting `%s`.'%what)collection=self._collect(by='interaction_type_and_data_model_and_resource',**kwargs)return(NetworkEntityCollection(collection=collection,label=what,))return_collect_methodfor_getininteraction_mod.Interaction._get_methods:method=_create_collect_method(_get)method_name='collect_%s'%_getdoc=('Builds a comprehensive collection of `%s` entities ''across the network, counts unique and shared objects ''by resource, data model and interaction types.'%_get)signature=interaction_mod.Interaction._get_method_signatureif'degree'in_get:signature=[('mode',)]+signaturecls._add_method(method_name,method,signature=signature,doc=doc,)defupdate_summaries(self,collect_args=None):defget_labels(lab,key,segments):returntuple(('%s%s%s%s'%(key,'_'ifsegelse'',seg.replace(' ','_'),'_pct'ifpctelse'_n',),'%s%s%s%s'%(lab,' 'ifsegelse'',seg,pct))forseginsegmentsforpctin('',r' [%]'))defadd_resource_segments(rec,res,key,lab,segments,coll):get=coll[key].__getattribute__values=tuple(itertools.chain(*zip(*((get('%s_collection'%n_pct).get(res,0),get('%s_shared_within_data_model'%n_pct).get(res,0),get('%s_unique_within_data_model'%n_pct).get(res,0),get('%s_shared_within_interaction_type'%n_pct).get(res,0),get('%s_unique_within_interaction_type'%n_pct).get(res,0),)forn_pctin('n','pct')))))labels=get_labels(lab,key,segments)rec.extend(list(zip(labels,values)))returnrecdefadd_dmodel_segments(rec,itype,dmodel,key,lab,segments,coll):it_dm_key=(itype,dmodel)total_key=it_dm_key+('Total',)get=coll[key].__getattribute__values=tuple(itertools.chain(*zip(*((get('%s_by_data_model'%n_pct).get(it_dm_key,0),get('%s_shared_within_data_model'%n_pct).get(total_key,0),get('%s_unique_within_data_model'%n_pct).get(total_key,0),get('%s_shared_by_data_model'%n_pct).get(it_dm_key,0),get('%s_unique_by_data_model'%n_pct).get(it_dm_key,0),)forn_pctin('n','pct')))))labels=get_labels(lab,key,segments)rec.extend(list(zip(labels,values)))returnrecdefadd_itype_segments(rec,itype,key,lab,segments,coll):get=coll[key].__getattribute__total_key=(itype,'all','Total')values=tuple(itertools.chain(*zip(*((get('%s_by_interaction_type'%n_pct).get(itype,0),get('%s_shared_within_interaction_type'%n_pct).get(total_key,0),get('%s_unique_within_interaction_type'%n_pct).get(total_key,0),get('%s_shared_by_data_model'%n_pct).get(total_key,0),get('%s_unique_by_data_model'%n_pct).get(total_key,0),)forn_pctin('n','pct')))))labels=get_labels(lab,key,segments)rec.extend(list(zip(labels,values)))returnreccollect_args=collect_argsor{'via':False}required=collections.OrderedDict(entities='Entities',proteins='Proteins',mirnas='miRNAs',interactions_0='Edges',references='References',curation_effort='Curation effort',interactions_non_directed_0='Undirected interactions',interactions_directed='Directed interactions',interactions_positive='Stimulatory interactions',interactions_negative='Inhibitory interactions',interactions_mutual='Mutual interactions',)segments=('','shared within database category','unique within database category','shared within interaction type','unique within interaction type',)self.summaries=[]coll={}self._log('Updating summaries.')formethodinrequired.keys():coll[method]=getattr(self,'collect_%s'%method)(**collect_args)foritypeinself.get_interaction_types():fordmodelinself.get_data_models(interaction_type=itype):forresinsorted(self.get_resource_names(interaction_type=itype,data_model=dmodel,**collect_args),key=lambdar:r.lower()):# compiling a record for each resource# within the data modelrec=[(('resource','Resource'),res)]_res=(itype,dmodel,res)forkey,labiniteritems(required):rec=add_resource_segments(rec,_res,key,lab,segments,coll,)self.summaries.append(rec)# compiling a summary record for the data modelrec=[(('resource','Resource'),'%s total'%dmodel.replace('_',' ').capitalize())]forkey,labiniteritems(required):rec=add_dmodel_segments(rec,itype,dmodel,key,lab,segments,coll,)self.summaries.append(rec)# compiling a summary record for the interaction typerec=[(('resource','Resource'),'%s total'%itype.replace('_',' ').capitalize())]forkey,labiniteritems(required):rec=add_itype_segments(rec,itype,key,lab,segments,coll)self.summaries.append(rec)# maybe we could compile a summary record for the entire networkself.summaries=[collections.OrderedDict(rec)forrecinself.summaries]self._log('Finished updating summaries.')
[docs]defsummaries_tab(self,outfile=None,return_table=False,label_type=1,):""" Creates a table from resource vs. entity counts and optionally writes it to ``outfile`` and returns it. """tab=[]tab.append(key[label_type]forkeyinself.summaries[0].keys())forrecinself.summaries:tab.append([str(val)forvalinrec.values()])ifoutfile:withopen(outfile,'w')asfp:fp.write('\n'.join('\t'.join(row)forrowintab))ifreturn_table:returntab
defhomology_translate(self,taxon,exclude=None):self._log('Translating network by homology from organism `%u` to `%u`.'%(self.ncbi_tax_id,taxon,))new=Network(ncbi_tax_id=taxon)n_ia_translated=0entities_translated=set()foriainself:ia_translated=Falsefornew_iainia.homology_translate(taxon=taxon,exclude=exclude,):new.add_interaction(new_ia)ia_translated=Trueentities_translated.update(ia.get_entities())n_ia_translated+=ia_translatedself._log('Orthology translation ready. ''%u out of %u interactions (%.02f%%), ''%u out of %u entities (%.02f%%) ''have been translated.'%(n_ia_translated,len(self),n_ia_translated/len(self)*100,len(entities_translated),len(self.nodes),len(entities_translated)/len(self.nodes)*100,))returnnew@staticmethoddef_get_by_method_name(get,by):return(''.join(('get_'ifnotbyelse'',get,'_by_'ifbyelse'',byor'',)))@staticmethoddef_iter_get_by_methods():return(itertools.product(interaction_mod.Interaction._get_methods|{'entities'},interaction_mod.Interaction._by_methods+(None,),))@classmethoddef_generate_get_methods(cls):def_create_get_method(what,by):wrap_args=(what,by)@functools.wraps(wrap_args)def_get_by_method(*args,**kwargs):what,by=wrap_argsself=args[0]kwargs['what']=whatkwargs['by']=byreturnself._collect(**kwargs)return_get_by_methodfor_get,_byincls._iter_get_by_methods():method_name=cls._get_by_method_name(_get,_by)setattr(cls,method_name,_create_get_method(what=_get,by=_by),)@classmethoddef_generate_count_methods(cls):def_create_count_method(what,by):method_name=cls._get_by_method_name(what,by)@functools.wraps(method_name)def_count_method(*args,**kwargs):self=args[0]collection=getattr(self,method_name)(**kwargs)return(len(collection)ifisinstance(collection,set)elsecommon.dict_counts(collection))return_count_methodfor_get,_byincls._iter_get_by_methods():method_name=('count_%s'%(cls._get_by_method_name(_get,_by).replace('get_','')))setattr(cls,method_name,_create_count_method(what=_get,by=_by))@classmethoddef_add_method(cls,method_name,method,signature=None,doc=None):common.add_method(cls,method_name,method,signature=signature,doc=doc,)def_allow_loops(self,allow_loops=None,resource=None):""" Integrates settings for the `allow_loops` parameter from the method, instance and module level settings. """default=settings.get('network_allow_loops')return(# from the arguments of the actual `load` callallow_loopsifisinstance(allow_loops,bool)else# from the current instanceself.allow_loopsifisinstance(self.allow_loops,bool)else# resource specific settingsresource.networkinput.allow_loopsif(hasattr(resource,'networkinput')andisinstance(resource.networkinput.allow_loops,bool))else# interaction type specific settings from the module levelresource.networkinput.interaction_typeindefaultif(isinstance(default,_const.LIST_LIKE)andhasattr(resource,'networkinput'))else# general settings from the module levelbool(default))defcount_loops(self):returnsum(ia.is_loop()foriainself)
[docs]defdirection_consistency(self):""" Collects statistics about the consistency of interaction directions between resources. * total_directed: number of directed edges * shared_directed: number of directed edges in overlap with other resources * consistent_edges: number of edges consistent with other resources * inconsistent_edges: number of edges inconsistent with other resources * total_consistency: sum of consistencies (for all edges and all resources) * total_inconsistency: sum of inconsistencies (for all edges and all resources) """defdd_matrix(dd):names=list(dd.keys())returnpd.DataFrame([[key]+list(val.values())forkey,valindd.items()],columns=['resource']+names,)DirectionConsistency=collections.namedtuple('DirectionConsistency',['total_directed','shared_directed','consistent_edges','inconsistent_edges','total_consistency','total_inconsistency','total_signed','shared_signed','consistent_signed_edges','inconsistent_signed_edges','total_sign_consistency','total_sign_inconsistency',])summary={}resources=sorted(self.get_resource_names(via=False))consistencies=collections.OrderedDict((resource1,collections.OrderedDict((resource2,0)forresource2inresources))forresource1inresources)inconsistencies=copy_mod.deepcopy(consistencies)sign_consistencies=copy_mod.deepcopy(consistencies)sign_inconsistencies=copy_mod.deepcopy(consistencies)forresourceinresources:total_directed=0shared_directed=0consistent_edges=0inconsistent_edges=0total_consistency=0total_inconsistency=0total_signed=0shared_signed=0consistent_signed_edges=0inconsistent_signed_edges=0total_sign_consistency=0total_sign_inconsistency=0foriainself:ifnotia.is_directed():continueres_a_b=ia.direction[ia.a_b].get_resource_names(via=False)res_b_a=ia.direction[ia.b_a].get_resource_names(via=False)res_a_b_pos=ia.positive[ia.a_b].get_resource_names(via=False)res_a_b_neg=ia.negative[ia.a_b].get_resource_names(via=False)res_b_a_pos=ia.positive[ia.b_a].get_resource_names(via=False)res_b_a_neg=ia.negative[ia.b_a].get_resource_names(via=False)ifresourceinres_a_borresourceinres_b_a:total_directed+=1else:continueifresourceinres_a_b_posorresourceinres_a_b_neg:total_signed+=1ifresourceinres_b_a_posorresourceinres_b_a_neg:total_signed+=1iflen(res_a_b|res_b_a)>1:shared_directed+=1iflen(res_a_b_pos|res_a_b_neg)>1:shared_signed+=1iflen(res_b_a_pos|res_b_a_neg)>1:shared_signed+=1if((resourceinres_a_bandlen(res_a_b)>1)or(resourceinres_b_aandlen(res_b_a)>1)):consistent_edges+=1if((resourceinres_a_b_posandlen(res_a_b_pos)>1)or(resourceinres_a_b_negandlen(res_a_b_neg)>1)):consistent_signed_edges+=1if((resourceinres_b_a_posandlen(res_b_a_pos)>1)or(resourceinres_b_a_negandlen(res_b_a_neg)>1)):consistent_signed_edges+=1if((resourceinres_a_bandresourcenotinres_b_aandres_b_a)or(resourceinres_b_aandresourcenotinres_a_bandres_a_b)):inconsistent_edges+=1if((resourceinres_a_b_posandresourcenotinres_a_b_negandres_a_b_neg)or(resourceinres_a_b_negandresourcenotinres_a_b_posandres_a_b_pos)):inconsistent_signed_edges+=1if((resourceinres_b_a_posandresourcenotinres_b_a_negandres_b_a_neg)or(resourceinres_b_a_negandresourcenotinres_b_a_posandres_b_a_pos)):inconsistent_signed_edges+=1ifresourceinres_a_b:total_consistency+=len(res_a_b)-1else:total_inconsistency+=len(res_a_b)ifresourceinres_a_b_pos:total_sign_consistency+=len(res_a_b_pos)-1ifresourceinres_a_b_neg:total_sign_consistency+=len(res_a_b_neg)-1ifresourceinres_b_a_pos:total_sign_consistency+=len(res_b_a_pos)-1ifresourceinres_b_a_neg:total_sign_consistency+=len(res_b_a_neg)-1ifresourcenotinres_a_b_pos:total_sign_inconsistency+=len(res_a_b_pos)ifresourcenotinres_a_b_neg:total_sign_inconsistency+=len(res_a_b_neg)ifresourcenotinres_b_a_pos:total_sign_inconsistency+=len(res_b_a_pos)ifresourcenotinres_b_a_neg:total_sign_inconsistency+=len(res_b_a_neg)ifresourceinres_b_a:total_consistency+=len(res_b_a)-1else:total_inconsistency+=len(res_b_a)fordir_resourcesin(res_a_b,res_b_a):forres_otherindir_resources:ifresourceindir_resources:consistencies[resource][res_other]+=1else:inconsistencies[resource][res_other]+=1forsign_resourcesin(res_a_b_pos,res_a_b_neg,res_b_a_pos,res_a_b_neg,):forres_otherinsign_resources:ifresourceinsign_resources:sign_consistencies[resource][res_other]+=1else:sign_inconsistencies[resource][res_other]+=1summary[resource]=DirectionConsistency(total_directed=total_directed,shared_directed=shared_directed,consistent_edges=consistent_edges,inconsistent_edges=inconsistent_edges,total_consistency=total_consistency,total_inconsistency=total_inconsistency,total_signed=total_signed,shared_signed=shared_signed,consistent_signed_edges=consistent_signed_edges,inconsistent_signed_edges=inconsistent_signed_edges,total_sign_consistency=total_sign_consistency,total_sign_inconsistency=total_sign_inconsistency,)consistencies=dd_matrix(consistencies)inconsistencies=dd_matrix(inconsistencies)sign_consistencies=dd_matrix(sign_consistencies)sign_inconsistencies=dd_matrix(sign_inconsistencies)summary=pd.DataFrame([[resource]+list(values)forresource,valuesinsummary.items()],columns=['resource']+list(DirectionConsistency._fields),)return{'summary':summary,'consistencies':consistencies,'inconsistencies':inconsistencies,'sign_consistencies':sign_consistencies,'sign_inconsistencies':sign_inconsistencies,}