#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#fromfuture.utilsimportiteritemsfrompast.builtinsimportxrange,rangeimportosimportsysimportwarningsimportjsonfromtypingimportAny,Callable,Dict,IO,List,Optional,Unionimportxlrdimportopenpyxlimportglomimportpypath.share.sessionassession_modimportpypath.share.commonascommonimportpypath_common._constantsas_const_logger=session_mod.Logger(name='inputs_common')_log=_logger._log_console=_logger._consoleif'unicode'notin__builtins__:unicode=str
[docs]defread_xls(xls_file,sheet=0,use_openpyxl=False,cell_range=None,):""" Generic function to read MS Excel XLS file, and convert one sheet to CSV, or return as a list of lists """table=[]opened_here=Falseifisinstance(xls_file,str):ifos.path.exists(xls_file):xls_file=open(xls_file,'rb')opened_here=Trueelse:raiseFileNotFoundError(xls_file)ifnotuse_openpyxl:try:_log('Reading XLS(X) by xlrd.')ifhasattr(xls_file,'read'):book=xlrd.open_workbook(file_contents=xls_file.read(),on_demand=True,)try:ifisinstance(sheet,int):sheet=book.sheet_by_index(sheet)else:sheet=book.sheet_by_name(sheet)exceptxlrd.biffh.XLRDError:sheet=book.sheet_by_index(0)table=[[str(c.value)forcinsheet.row(i)]foriinxrange(sheet.nrows)]use_openpyxl=FalseexceptIOError:raiseFileNotFoundError(xls_file)exceptExceptionase:_log('Failed to read by xlrd, falling back to openpyxl.')_logger._log_traceback()use_openpyxl=Trueifuse_openpyxl:try:_log('Reading XLS(X) by openpyxl.')book=openpyxl.load_workbook(filename=xls_file,read_only=True,data_only=True,)exceptExceptionase:_log(f'Failed to read `{xls_file}` by openpyxl.')_logger._log_traceback()raiseValueError('Could not open xls: %s'%xls_file)try:iftype(sheet)isint:sheet=book.worksheets[sheet]else:sheet=book[sheet]except:sheet=book.worksheets[0]# this is to suppress the openpyxl unknown extension warnings# which we can not avoid as the xlsx files were produced not by uswithwarnings.catch_warnings():warnings.simplefilter('ignore')table=[[(cellifisinstance(cell,str)elsecell.valueifcellisnotNoneelse'')forcellinrow]forrowin(sheet[cell_range]ifcell_rangeelsesheet.values)]if'book'inlocals()andhasattr(book,'release_resources'):book.release_resources()ifopened_here:xls_file.close()returntable
[docs]defread_table(cols,fileObject=None,data=None,sep='\t',sep2=None,rem=None,hdr=None,encoding='ascii',):""" Generic function to read data tables. fileObject : file-like Any file like object: file opened for read, or StringIO buffer cols : dict Dictionary of columns to read. Keys identifying fields are returned in the result. Values are column numbers. sep : str Field separator of the file. sep2 : dict Subfield separators and prefixes. E.g. {2: ',', 3: '|'} hdr : int Number of header lines. If None, no headers assumed. rem : list Strings to remove. For each line these elements will be replaced with ''. """rem=remor[]ifdataisNone:ifhasattr(fileObject,'readline'):fileObject.seek(0)ifhdr:forhinxrange(0,hdr):_=next(fileObject)data=fileObjectelse:data=[l.strip()forlindata.split('\n')iflen(l)>0][hdr:]res=[]forlindata:iftype(l)isbytes:l=l.decode(encoding)forrinrem:l=l.replace(r,'')l=[f.strip()forfinl.split(sep)]iflen(l)>max(cols.values()):dic={}forname,coliniteritems(cols):field=l[col].strip()_sep2=(sep2[col]ifisinstance(sep2,dict)andcolinsep2elsesep2ifisinstance(sep2,str)elseNone)if_sep2:field=tuple(sf.strip()forsfinfield.split(_sep2)ifsf)dic[name]=fieldres.append(dic)iffileObjectisnotNone:fileObject.close()returnres
[docs]defjson_extract(data:Union[dict,list,str,IO],spec:dict,)->List[dict]:""" Extracts fields of arbitrary depth from JSON data into a list of dicts. Args data: JSON as a string or a file-like object. spec: Dict of glom field specifications. """data=json_read(data)ifisinstance(data,dict):data=[data]ifnotisinstance(data,list):msg='Don\'t know how to process data of type `%s`.'%type(data)raiseTypeError(msg)return[glom.glom(rec,spec,default=_cons.GLOM_ERROR)forrecindata]
[docs]defjson_read(data:Union[str,IO,Any])->Union[list,dict,Any]:""" Reads JSON from file or string, pass through for any other value. """ifisinstance(data,IO):data=json.load(data)elifisinstance(data,str):data=json.loads(data)returndata
[docs]defglom_fields(fields:Optional[GlomFields]=None)->Dict[str,GlomSpec]:""" Generates a glom spec dict from a list or dict, protecting each field by glom.Coalesce. """fields=fieldsor{}fields=fieldsifisinstance(fields,dict)elsedict(zip(fields,fields))fields=dict((k,glom.Coalesce(v,default=None))fork,vinfields.items())returnfields