#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#importosimportimportlibasimpimportreimportjsonimportcopyimportitertoolsfromfuture.utilsimportiteritemsimportpandasaspdimportpypath.share.progressasprogressimportpypath.resources.urlsasurlsimportpypath.resources.data_formatsasdata_formatsimportpypath.share.settingsassettingsimportpypath.core.entityasentityimportpypath.share.sessionassessionimportpypath.resources.networkasnetresimportpypath.share.commonascommonstrip_json=re.compile(r'[\[\]{}\"]')simple_types={bool,int,float,type(None)}
[docs]def__init__(self,network=None,only_sources=None,extra_node_attrs=None,extra_edge_attrs=None,outfile=None,default_vertex_attr_processor=None,default_edge_attr_processor=None,pa=None,):session.Logger.__init__(self,name='export')self._log('Export object created for network.')self.extra_node_attrs=extra_node_attrsor{}self.extra_edge_attrs=extra_edge_attrsor{}self.outfile=outfileself.network=networkorpaself.pa=self.networkself._set_graph()self.only_sources=only_sourcesifisinstance(self.only_sources,list):self.only_sources=set(self.only_sources)self.default_vertex_attr_processor=(default_vertex_attr_processororself.default_vertex_attr_processor)self.default_edge_attr_processor=(default_edge_attr_processororself.default_edge_attr_processor)
[docs]defmake_df(self,unique_pairs=True,extra_node_attrs=None,extra_edge_attrs=None,):""" Creates a data frame from the network. By default UniProt IDs, Gene Symbols, source databases, literature references, directionality and sign information and interaction type are included. Args ----- :param bool unique_pairs: If `True` each line corresponds to a unique pair of molecules, all directionality and sign information are covered in other columns. If `False`, order of `A` and `B` IDs corresponds to the direction while sign covered in further columns. :param dict extra_node_attrs: Additional node attributes to be included in the exported table. Keys are column names used in the header while values are names of vertex attributes. Values also might be methods which then will be called then on each vertex. These should return strings or their result will be converted to string. In the header `_A` and `_B` suffixes will be appended to the column names so the values can be assigned to A and B side interaction partners. :param dict extra_edge_attrs: Additional edge attributes to be included in the exported table. Keys are column names used in the header while values are names of edge attributes or callables accepting an edge as single argument. :param str outfile: Name of the output file. If `None` a file name "netrowk-<session id>.tab" is used. """self._log('Creating data frame of type `%s`.'%('unique pairs'ifunique_pairselse'by direction'))kwargs=locals()_=kwargs.pop('self')ifself.graph:self._make_df_igraph(**kwargs)else:self._make_df_network(**kwargs)
def_make_df_network(self,unique_pairs=True,extra_node_attrs=None,extra_edge_attrs=None,):""" See docs at method ``make_df``. """self._log('Creating data frame from `core.network.Network` object.')ifunique_pairs:msg=('Data frame with unique pairs from `core.network.Network` ''is not implemented yet, only possible to create it from ''`legacy.main.PyPath` object.')self._log(msg)raiseNotImplementedError(msg)self.extra_node_attrs=extra_node_attrsorself.extra_node_attrsself.extra_edge_attrs=extra_edge_attrsorself.extra_edge_attrsheader=self.get_header(unique_pairs=unique_pairs)dtypes=(self.default_dtypes_uniquepairsifunique_pairselseself.default_dtypes_bydirs)dtypes=dict(iforiindtypes.items()ifi[0]inheader)result=[]foriainself.network:result.extend(self.process_interaction(ia))self.df=pd.DataFrame(result,columns=header)self.df=self.df.astype(dtypes)defprocess_interaction(self,ia):result=[]consensus=ia.consensus()for_dirin('a_b','b_a'):nodes=getattr(ia,_dir)directed=bool(ia.direction[nodes])directed_rev=bool(ia.direction[tuple(reversed(nodes))])if((notdirectedand(_dir=='b_a'ordirected_rev))or(ia.is_loop()and_dir=='b_a')):continuepositive=getattr(ia,'positive_%s'%_dir)()negative=getattr(ia,'negative_%s'%_dir)()resources=';'.join(sorted(set('%s%s'%(res,('_%s'%via)ifviaelse'',)forres,viainitertools.chain(ia.get_resource_names_via(direction='undirected',via=None,),ia.get_resource_names_via(direction=nodes,via=None,),))))references=';'.join(sorted(set('%s:%s'%(ev.resource.viaorev.resource.name,ref.pmid)forevinitertools.chain(ia.get_evidences(direction='undirected'),ia.get_evidences(direction=nodes),)forrefinev.referencesifnotev.resource.via)))this_row=[nodes[0].identifier,nodes[1].identifier,nodes[0].label,nodes[1].label,int(directed),int(positive),int(negative),self.match_consensus(consensus,nodes,),self.match_consensus(consensus,nodes,'positive',),self.match_consensus(consensus,nodes,'negative',),resources,references,]this_row=self.add_extra_fields(ia,this_row,nodes)result.append(this_row)returnresult@staticmethoddefmatch_consensus(consensus,nodes,effect=None):param=list(nodes)+['directed']ifeffect:param.append(effect)returnint(any(co[:len(param)]==paramforcoinconsensus))def_make_df_igraph(self,unique_pairs=True,extra_node_attrs=None,extra_edge_attrs=None,):""" See docs at method ``make_df``. """self._log('Creating data frame from `legacy.main.PyPath` object.')result=[]self.pa.genesymbol_labels()self.extra_node_attrs=extra_node_attrsorself.extra_node_attrsself.extra_edge_attrs=extra_edge_attrsorself.extra_edge_attrsdtypes=(self.default_dtypes_uniquepairsifunique_pairselseself.default_dtypes_bydirs)header=self.get_header(unique_pairs=unique_pairs)prg=progress.Progress(total=self.graph.ecount(),name='Creating table',interval=31)foreinself.graph.es:# adding default fieldslines=(self._process_edge_uniquepairs_igraph(e)ifunique_pairselseself._process_edge_bydirection_igraph(e))result.extend(lines)prg.step()prg.terminate()self.df=pd.DataFrame(result,columns=header)self.df=self.df.astype(dtypes)
[docs]defget_header(self,unique_pairs=True,):""" Creates a data frame header (list of field names) according to the data frame type and the extra fields. """suffix_a='A'ifunique_pairselse'source'suffix_b='B'ifunique_pairselse'target'header=copy.copy(self.default_header_uniquepairsifunique_pairselseself.default_header_bydirs)header+=self.extra_edge_attrs.keys()header+=['%s_%s'%(x,suffix_a)forxinself.extra_node_attrs.keys()]header+=['%s_%s'%(x,suffix_b)forxinself.extra_node_attrs.keys()]returnheader
def_process_edge_uniquepairs_igraph(self,e):""" Returns a table row representing a network edge with covering all annotations in a single row: directionality represented by fields like `Direction_A-B` and `Direction_B-A` effect sign a similar way. Args ----- :param igraph.Edge e: An edge from a pypath igraph object. """ifself.only_sourcesandnote['sources']&self.only_sources:return[]vertex_a=self.graph.vs[e.source]vertex_b=self.graph.vs[e.target]name_a,label_a=entity.Entity.igraph_vertex_name_label(vertex_a)name_b,label_b=entity.Entity.igraph_vertex_name_label(vertex_b)return[list(itertools.chain((# uniprots, genesymbolsname_a,label_a,name_b,label_b,# sources, references';'.join(list(e['sources'])),';'.join(sorted(set(r.pmidforrine['references']),key=int)),# directions';'.join(e['dirs'].get_dir('undirected',sources=True)),';'.join(e['dirs'].get_dir((name_a,name_b),sources=True)),';'.join(e['dirs'].get_dir((name_b,name_a),sources=True))),(# signs';'.join(a)forainitertools.chain(e['dirs'].get_sign((name_a,name_b),sources=True),e['dirs'].get_sign((name_b,name_a),sources=True),)),(# category';'.join(e['type']),)))]def_process_edge_bydirection_igraph(self,e):""" Returns one or more table rows representing a network edge a way that opposite direction connections contained in separate rows. Directionality and sign information covered in 3 columns: `is_directed`, `is_inhibition` and `is_stimulation`. This is the row format used in the webservice. Args ----- :param igraph.Edge e: An edge from a pypath igraph object. """lines=[]consensus_edges=set(map(tuple,e['dirs'].consensus_edges()))consensus_dir=set(c[:3]forcinconsensus_edges)fordin['straight','reverse']:uniprots=getattr(e['dirs'],d)ife['dirs'].dirs[uniprots]:is_stimulation=int(e['dirs'].is_stimulation(uniprots))is_inhibition=int(e['dirs'].is_inhibition(uniprots))this_edge=[entity.Entity.entity_name_str(uniprots[0]),entity.Entity.entity_name_str(uniprots[1]),self.pa.name_to_label(uniprots[0]),self.pa.name_to_label(uniprots[1]),1,# is_directedis_stimulation,is_inhibition,int((uniprots[0],uniprots[1],'directed')inconsensus_dir),int((uniprots[0],uniprots[1],'directed','positive')inconsensus_edges),int((uniprots[0],uniprots[1],'directed','negative')inconsensus_edges),]dsources=(e['dirs'].get_dir(uniprots,sources=True)|e['dirs'].get_dir('undirected',sources=True))ifself.only_sourcesandnotdsources&self.only_sources:continuethis_edge.extend([';'.join(sorted(dsources)),';'.join(r.pmidforrinitertools.chain(*(rsfors,rsiniteritems(e['refs_by_source'])ifsindsources)))])this_edge=self.add_extra_fields(e,this_edge,uniprots)lines.append(this_edge)ifnote['dirs'].is_directed():this_edge=[entity.Entity.entity_name_str(e['dirs'].nodes[0]),entity.Entity.entity_name_str(e['dirs'].nodes[1]),self.pa.name_to_label(e['dirs'].nodes[0]),self.pa.name_to_label(e['dirs'].nodes[1]),0,0,0,0,0,0,';'.join(sorted(e['sources'])),';'.join([r.pmidforrine['references']]),self._dip_urls(e),]this_edge=(self.add_extra_fields_igraph(e,this_edge,'undirected'))lines.append(this_edge)returnlines
[docs]defadd_extra_fields(self,e,line,dr=None):""" Takes one table row and using the `igraph.Edge` object and the direction provided adds the extra node and edge attribute fields as they are defined in `extra_node_attrs` and `extra_edge_attrs`. Returns the row with extra fields added. Args ----- :param igraph.Edge e: One edge. :param list line: A table row. :param tuple,str dr: Direction key. A tuple of names (most often UniProt IDs) or `undirected`. """# extra edge attributes on demand of the userfork,viniteritems(self.extra_edge_attrs):line.append(self.generic_attr_processor(v,e,dr)ifhasattr(v,'__call__')elseself.default_edge_attr_processor(e[v]if(self.graphandvinself.graph.es.attributes())elsegetattr(e,v)ifhasattr(e,v)elsee.attrs[v]ifvine.attrselseNone))# extra vertex attributesnodes=((self.graph.vs[e.source],self.graph.vs[e.target])ifself.graphelsedr)forvertexinnodes:fork,viniteritems(self.extra_node_attrs):line.append(self.generic_attr_processor(v,vertex,dr)ifhasattr(v,'__call__')elseself.default_vertex_attr_processor(vertex[v]if(self.graphandvinself.graph.vs.attributes())elsegetattr(vertex,v)ifhasattr(vertex,v)elsevertex.attrs[v]ifvinvertex.attrselseNone))returnline
[docs]@staticmethoddefgeneric_attr_processor(proc,obj,dr=None):""" Wraps the attribute processor to handle unknown number of arguments. Not knowing if the attribute processor expects one or two arguments, have no better way than try: if calling with 2 arguments fails with `TypeError` we call with one argument. """dr=('undirected'if(drandhasattr(obj,'is_directed')andnotobj.is_directed())elsedr)try:returnproc(obj,dr)exceptTypeErrorase:try:returnproc(obj)exceptTypeError:raisee
[docs]defwrite_tab(self,outfile=None,**kwargs):""" Writes the data frame into a tab separated file. Args ----- :param **kwargs: Forwarded to `make_df()`. """ifnothasattr(self,'df'):self.make_df(**kwargs)self.write(outfile=outfile)
defwrite(self,outfile=None):outfile=outfileorself.outfileoros.path.join(self.pa.outdir,'network-%s.tab'%self.pa.session)self.df.to_csv(outfile,sep='\t',index=False)defwebservice_interactions_df(self):datasets=('omnipath','kinaseextra','ligrecextra','pathwayextra','mirnatarget','dorothea','collectri','tf_target','lncrna_mrna','tf_mirna','small_molecule',)defget_dataset_callback(dataset:str)->callable:defhas_dataset(e,d)->bool:returne.has_dataset(dataset,direction=d)returnhas_datasetdataset_args={dataset:get_dataset_callback(dataset)fordatasetindatasets}self.make_df(unique_pairs=False,extra_node_attrs={'ncbi_tax_id':'taxon','entity_type':'entity_type',},extra_edge_attrs={**dataset_args,**{'dorothea_curated':lambdae,d:(e._get_attr('DoRothEA','curated',d)),'dorothea_chipseq':lambdae,d:(e._get_attr('DoRothEA','chipseq',d)),'dorothea_tfbs':lambdae,d:(e._get_attr('DoRothEA','tfbs',d)),'dorothea_coexp':lambdae,d:(e._get_attr('DoRothEA','coexp',d)),'dorothea_level':lambdae,d:(';'.join(e.dorothea_levels(d))),'type':lambdae,d:(list(e.get_interaction_types(direction=d))[0]),'curation_effort':lambdae,d:(e.count_curation_effort(direction=d)+(e.count_curation_effort(direction='undirected')ifisinstance(d,tuple)else0)),'extra_attrs':lambdae,d:e.serialize_attrs(d),'evidences':lambdae,d:e.serialize_evidences(d),},},)defwebservice_interactions_df_legacy(self):sources_omnipath=set(f.nameforfindata_formats.omnipath.values())sources_extra_directions=settings.get('network_extra_directions')sources_kinase_extra=set(f.nameforfindata_formats.ptm_misc.values())sources_ligrec_extra=set(f.nameforfindata_formats.ligand_receptor.values())sources_pathway_extra=set(f.nameforfindata_formats.pathway_noref.values())sources_mirna=set(f.nameforfindata_formats.mirna_target.values())self.make_df(unique_pairs=False,extra_node_attrs={'ncbi_tax_id':'ncbi_tax_id'},extra_edge_attrs={'omnipath':lambdae,d:((bool(e['dirs'].sources[d]&sources_omnipath)or(bool(e['dirs'].sources['undirected']&sources_omnipath)andbool(e['dirs'].sources[d]&sources_extra_directions)))and'PPI'ine['type']),'kinaseextra':lambdae,d:(bool(e['dirs'].sources[d]&sources_kinase_extra)and'PPI'ine['type']),'ligrecextra':lambdae,d:(bool(e['dirs'].sources[d]&sources_ligrec_extra)and'PPI'ine['type']),'pathwayextra':lambdae,d:(bool(e['dirs'].sources[d]&sources_pathway_extra)and'TF'ine['type']),'mirnatarget':lambdae,d:(bool(e['dirs'].sources[d]&sources_mirna)and'MTI'ine['type']),'dorothea':lambdae,d:('TF'ine['sources_by_type']andbool(e['sources_by_type']['TF']&e['dirs'].sources[d])),'dorothea_curated':'dorothea_curated','dorothea_chipseq':'dorothea_chipseq','dorothea_tfbs':'dorothea_tfbs','dorothea_coexp':'dorothea_coexp','dorothea_level':lambdae,d:(';'.join(sorted(e['dorothea_level']))if'dorothea_level'ine.attributes()and'TF'ine['sources_by_type']andbool(e['sources_by_type']['TF']&e['dirs'].sources[d])else''),# quite wrong (taking only the first one):'type':lambdae:e['type'][0],'curation_effort':lambdae,d:(e.count_curation_effort(direction=d)+(e.count_curation_effort(direction='undirected')ifisinstance(d,tuple)else0)),})
[docs]@classmethoddefsources_table(cls,pa,only_sources=None,unique_pairs=True,extra_edge_attrs=None,extra_node_attrs=None,outfile=None,default_vertex_attr_processor=None,default_edge_attr_processor=None,):""" Creates a data frame which contains a column for each source database with binary values showing presence-absence of interactions across resources. """new=cls(pa=pa,only_sources=only_sources,extra_edge_attrs=extra_edge_attrsor{},extra_node_attrs=extra_node_attrsor{},outfile=outfile,default_vertex_attr_processor=default_vertex_attr_processor,default_edge_attr_processor=default_edge_attr_processor,)new.make_df(unique_pairs=unique_pairs)src_attr='Databases'ifunique_pairselse'sources'src_all=sorted(only_sourcesorpa.sources)src_cols=dict((src,[])forsrcinsrc_all)fori,rowinnew.df.iterrows():this_row_src=set(row[src_attr].split(';'))forsrcinsrc_all:src_cols[src].append(int(srcinthis_row_src))forsrcinsrc_all:new.df.insert(loc=new.df.columns.get_loc(src_attr),column=src,value=src_cols[src])returnnew
def_dip_urls(self,e):attrs=e.attrsifhasattr(e,'attrs')elsee.attributesresult=[]if'dip_id'inattrs:dip_ids=sorted(common.to_set(attrs['dip_id']))fordip_idindip_ids:try:result.append(urls.urls['dip']['ik']%(int(dip_id.split('-')[1][:-1])))except:self._log('Could not find DIP ID: %s'%dip_id)return';'.join(result)