Source code for pypath.omnipath.server.legacy

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#
#  This file is part of the `pypath` python module
#
#  Copyright 2014-2023
#  EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University
#
#  Authors: see the file `README.rst`
#  Contact: Dénes Türei (turei.denes@gmail.com)
#
#  Distributed under the GPLv3 License.
#  See accompanying file LICENSE.txt or copy at
#      https://www.gnu.org/licenses/gpl-3.0.html
#
#  Website: https://pypath.omnipathdb.org/
#

"""
Old server class working from a ``pypath.legacy.main.PyPath`` object.
"""

import json

import pypath.omnipath.server.run as server


[docs] class PypathServer(server.BaseServer):
[docs] def __init__(self, pypath): self.p = pypath self.g = pypath.graph self.isLeaf = True server.BaseServer.__init__(self)
def network(self, req): hdr = ['nodes', 'edges', 'is_directed', 'sources'] val = [ self.g.vcount(), self.g.ecount(), int(self.g.is_directed()), self.p.sources ] if b'format' in req.args and req.args[b'format'] == b'json': return json.dumps(dict(zip(hdr, val))) else: return '%s\n%s' % ('\t'.join(hdr), '\t'.join( [str(v) if type(v) is not list else ';'.join(v) for v in val])) def interactions(self, req): fields = [b'sources', b'references'] result = [] elist = self._get_eids(req) res = [] hdr = [ 'source', 'target', 'is_directed', 'is_stimulation', 'is_inhibition' ] if b'fields' in req.args: hdr += [ f.decode('utf-8') for f in fields if f in b','.join(req.args[b'fields']).split(b',') ] if ( b'genesymbols' in req.args and self._parse_arg(req.args[b'genesymbols']) ): genesymbols = True hdr.insert(2, 'source_genesymbol') hdr.insert(3, 'target_genesymbol') else: genesymbols = False all_sources = set([]) for eid in elist: e = self.g.es[eid] all_sources = all_sources | e['sources'] for d in ['straight', 'reverse']: uniprots = getattr(e['dirs'], d) if e['dirs'].dirs[uniprots]: thisEdge = [ uniprots[0], uniprots[1] ] if genesymbols: thisEdge.extend([ self.g.vs[self.p.nodDct[uniprots[0]]]['label'], self.g.vs[self.p.nodDct[uniprots[1]]]['label'] ]) thisEdge.extend([ 1, int(e['dirs'].is_stimulation(uniprots)), int(e['dirs'].is_inhibition(uniprots)) ]) dsources = e['dirs'].get_dir(uniprots, sources=True) dsources = dsources | e['dirs'].get_dir( 'undirected', sources=True) if 'sources' in hdr: thisEdge.append(list(dsources)) if 'references' in hdr: thisEdge.append([ r.pmid for r in flat_list([ rs for s, rs in iteritems(e['refs_by_source']) if s in dsources ]) ]) thisEdge.append(self._dip_urls(e)) res.append(thisEdge) if not e['dirs'].is_directed(): thisEdge = [e['dirs'].nodes[0], e['dirs'].nodes[1]] if genesymbols: thisEdge.extend([ self.g.vs[self.p.nodDct[e['dirs'].nodes[0]]]['label'], self.g.vs[self.p.nodDct[e['dirs'].nodes[1]]]['label'] ]) thisEdge.extend([0, 0, 0]) if 'sources' in hdr: thisEdge.append(list(e['sources'])) if 'references' in hdr: thisEdge.append([r.pmid for r in e['references']]) thisEdge.append(self._dip_urls(e)) res.append(thisEdge) if 'DIP' in all_sources: hdr.append('dip_url') else: res = map(lambda r: r[:-1], res) if b'format' in req.args and req.args[b'format'] == b'json': return json.dumps([dict(zip(hdr, r)) for r in res]) else: return self._table_output(res, hdr, req) def _table_output(self, res, hdr, req): return '%s%s' % ('' if not bool(req.args[b'header']) else '%s\n' % '\t'.join(hdr), '\n'.join([ '\t'.join([ ';'.join(f) if type(f) is list or type(f) is set else str(f) for f in r ]) for r in res ])) def _get_eids(self, req): names = None if len(req.postpath) <= 1 else req.postpath[1].split(',') ids = range( 0, self.g.vcount()) if names is None else self.p.names2vids(names) ids = set(ids) elist = set(range(0, self.g.ecount())) if names is None else set([]) if names is not None: alist = dict(zip(ids, [self.g.neighbors(i) for i in ids])) for one, others in iteritems(alist): for two in others: e = self.g.get_eid(one, two, directed=True, error=False) if e != -1: elist.add(e) if self.g.is_directed(): e = self.g.get_eid( two, one, directed=True, error=False) if e != -1: elist.add(e) return elist def ptms(self, req): fields = [ b'is_stimulation', b'is_inhibition', b'sources', b'references' ] result = [] elist = self._get_eids(req) res = [] hdr = [ 'enzyme', 'substrate', 'residue_type', 'residue_offset', 'modification' ] if ( b'genesymbols' in req.args and self._parse_arg(req.args[b'genesymbols']) ): genesymbols = True hdr.insert(2, 'enzyme_genesymbol') hdr.insert(3, 'substrate_genesymbol') else: genesymbols = False if b'fields' in req.args: hdr += [ f.decode('utf-8') for f in fields if f in b','.join(req.args[b'fields']).split(b',') ] if 'ptm' in self.g.es.attributes(): for eid in elist: e = self.g.es[eid] for ptm in e['ptm']: if 'ptmtype' not in req.args or ptm.ptm.typ in req.args[ 'ptmtype']: thisPtm = [ ptm.domain.protein, ptm.ptm.protein ] if genesymbols: thisPtm.extend([ self.g.vs[self.p.nodDct[ ptm.domain.protein]]['label'], self.g.vs[self.p.nodDct[ ptm.ptm.protein]]['label'] ]) thisPtm.extend([ ptm.ptm.residue.name, ptm.ptm.residue.number, ptm.ptm.typ ]) if 'is_stimulation' in hdr: thisPtm.append( int(e['dirs'].is_stimulation(( ptm.domain.protein, ptm.ptm.protein)))) if 'is_inhibition' in hdr: thisPtm.append( int(e['dirs'].is_inhibition(( ptm.domain.protein, ptm.ptm.protein)))) if 'sources' in hdr: thisPtm.append(list(ptm.ptm.sources)) if 'references' in hdr: thisPtm.append(list(ptm.refs)) res.append(thisPtm) if b'format' in req.args and req.args[b'format'] == b'json': return json.dumps([dict(zip(hdr, r)) for r in res]) else: return self._table_output(res, hdr, req) def resources(self, req): hdr = [ 'database', 'proteins', 'interactions', 'directions', 'stimulations', 'inhibitions', 'signs' ] res = [[ # database name s, # number of proteins len([1 for v in self.g.vs if s in v['sources']]), # number of interacting pairs len([1 for e in self.g.es if s in e['sources']]), # number of directions sum([s in e['dirs'].sources[d] \ for e in self.g.es for d in e['dirs'].which_dirs()]), # number of stimulations sum([s in e['dirs'].positive_sources[d] \ for e in self.g.es for d in e['dirs'].which_dirs()]), # number of inhibitions sum([s in e['dirs'].negative_sources[d] \ for e in self.g.es for d in e['dirs'].which_dirs()]), # number of signs sum([s in sg for e in self.g.es for d in e['dirs'].which_dirs() \ for sg in [e['dirs'].positive_sources[d], e['dirs'].negative_sources[d]]]), ] for s in self.p.sources] if b'format' in req.args and req.args[b'format'] == b'json': return json.dumps([dict(zip(hdr, r)) for r in res]) else: return self._table_output(res, hdr, req) def _dip_urls(self, e): result = [] if 'dip_id' in e.attributes(): for dip_id in e['dip_id']: try: result.append(urls.urls['dip']['ik'] % int(dip_id.split('-')[1][:-1])) except: print(dip_id) return ';'.join(result)