#!/usr/bin/env python# -*- coding: utf-8 -*-## This file is part of the `pypath` python module## Copyright 2014-2023# EMBL, EMBL-EBI, Uniklinik RWTH Aachen, Heidelberg University## Authors: see the file `README.rst`# Contact: Dénes Türei (turei.denes@gmail.com)## Distributed under the GPLv3 License.# See accompanying file LICENSE.txt or copy at# https://www.gnu.org/licenses/gpl-3.0.html## Website: https://pypath.omnipathdb.org/#fromfuture.utilsimportiteritemsfrompast.builtinsimportxrange,rangeimportimportlibasimpimportsysimportosimportioimportshutilimportstructimportpypath.share.settingsassettingsimportpypath.share.sessionassession_modimportpypath.share.cacheascache_mod_logger=session_mod.log()importpycurltry:fromcStringIOimportStringIOBytesIO=StringIOexcept:try:fromStringIOimportStringIOfromStringIOimportStringIOasBytesIOexcept:fromioimportBytesIOfromioimportStringIOtry:importcPickleaspickleexcept:importpickleimporturllibtry:importurllib2exceptImportError:# this works seemless in Py3:importurllib.requesturllib2=urllib.requesttry:importurlparseexcept:# this works seemless in Py3:importurllib.parseurlparse=urllib.parseifnothasattr(urllib,'quote'):importurllib.parse_urllib=urlliburllib=_urllib.parsetry:importpysftpexcept:_logger.msg('Module `pysftp` not available. ''Only downloading of a small number of resources ''relies on this module. ''Please install by PIP if it is necessary for you.','curl',-1,)importcodecsimportgzipimportzipfileimporttarfileimporthashlibimportrefromcontextlibimportclosingimportpypath.share.progressasprogressimportpypath.share.commonascommonimportpypath_common._constantsas_constimportpypath.share.settingsassettingstry:basestringexceptNameError:basestring=strif'long'notin__builtins__:long=intif'unicode'notin__builtins__:unicode=strCURSOR_UP_ONE='\x1b[1A'ERASE_LINE='\x1b[2K'# global contexts for modifying Curl() behviourCACHE=NoneCACHEDEL=FalseCACHEPRINT=FalseDRYRUN=FalsePRESERVE=FalseDEBUG=FalseLASTCURL=Noneshow_cache=False_re_url=re.compile(r'^(?:http|https|ftp)://')
class_global_context(object):""" This is a metaclass for context handlers working by setting a module level variable to certain value. """def__init__(self,name,on_off):""" :param str name: Name of the module level variable. :param on_off: Value of the module level variable in the context. """self.name=nameself.module=sys.modules[__name__]self.on_off=on_offdef__enter__(self):self._store_value=getattr(self.module,self.name)setattr(self.module,self.name,self.on_off)def__exit__(self,exception_type,exception_value,traceback):ifexception_typeisnotNone:sys.stdout.write('%s, %s, %s\n'%(str(exception_type),str(exception_value),str(traceback)))sys.stdout.flush()setattr(self.module,self.name,self._store_value)class_global_context_on(_global_context):""" This is a metaclass for context handlers working by setting a module level variable to `True`. """def__init__(self,name):""" :param str name: Name of the module level variable. """super(_global_context_on,self).__init__(name,True)class_global_context_off(_global_context):""" This is a metaclass for context handlers working by setting a module level variable to `False`. """def__init__(self,name):""" :param str name: Name of the module level variable. """super(_global_context_off,self).__init__(name,False)
[docs]classcache_on(_global_context_on):""" This is a context handler to turn on pypath.curl.Curl() cache. As most of the methods use cache as their default behaviour, probably it won't change anything. Behind the scenes it sets the value of the `pypath.curl.CACHE` module level variable to `True` (by default it is `None`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() print('`curl.CACHE` is ', curl.CACHE) with curl.cache_on(): print('`curl.CACHE` is ', curl.CACHE) pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classcache_off(_global_context_off):""" This is a context handler to turn off pypath.curl.Curl() cache. Data will be downloaded even if it exists in cache. Behind the scenes it sets the value of the `pypath.curl.CACHE` module level variable to `False` (by default it is `None`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() print('`curl.CACHE` is ', curl.CACHE) with curl.cache_on(): print('`curl.CACHE` is ', curl.CACHE) pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classcache_print_on(_global_context_on):""" This is a context handler which makes pypath.curl.Curl() print verbose messages about its cache. Behind the scenes it sets the value of the `pypath.curl.CACHEPRINT` module level variable to `True` (by default it is `False`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_print_on(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classcache_print_off(_global_context_off):""" This is a context handler which stops pypath.curl.Curl() to print verbose messages about its cache. Behind the scenes it sets the value of the `pypath.curl.CACHEPRINT` module level variable to `False`. As by default it is `False`, this context won't modify the default behaviour. Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_print_off(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classcache_delete_on(_global_context_on):""" This is a context handler which results pypath.curl.Curl() deleting the cache files instead of reading it. Then it downloads the data again, or does nothing if the `DRYRUN` context is turned on. Upon deleting cache files console messages will let you know which files have been deleted. Behind the scenes it sets the value of the `pypath.curl.CACHEDEL` module level variable to `True` (by default it is `False`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_delete_on(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classcache_delete_off(_global_context_off):""" This is a context handler which stops pypath.curl.Curl() deleting the cache files. This is the default behaviour, so this context won't change anything by default. Behind the scenes it sets the value of the `pypath.curl.CACHEDEL` module level variable to `False`. Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_delete_off(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classdryrun_on(_global_context_on):""" This is a context handler which results pypath.curl.Curl() to do all setup steps, but do not perform download or cache read. Behind the scenes it sets the value of the `pypath.curl.DRYRUN` module level variable to `True` (by default it is `False`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_dryrun_on(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classdryrun_off(_global_context_off):""" This is a context handler which results pypath.curl.Curl() to perform download or cache read. This is the default behaviour, so applying this context restores the default. Behind the scenes it sets the value of the `pypath.curl.DRYRUN` module level variable to `False`. Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_dryrun_off(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classpreserve_on(_global_context_on):""" This is a context handler which results pypath.curl.Curl() to make a reference to itself in the module level variable `LASTCURL`. This is useful if you have some issue with `Curl`, and you want to access the instance for debugging. Behind the scenes it sets the value of the `pypath.curl.PRESERVE` module level variable to `True` (by default it is `False`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_preserve_on(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classpreserve_off(_global_context_off):""" This is a context handler which avoids pypath.curl.Curl() to make a reference to itself in the module level variable `LASTCURL`. By default it does not do this, so this context only restores the default. Behind the scenes it sets the value of the `pypath.curl.PRESERVE` module level variable to `False`. Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_preserve_off(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classdebug_on(_global_context_on):""" This is a context handler which results pypath.curl.Curl() to print debug information. This is useful if you have some issue with `Curl`, and you want to see what`s going on. Behind the scenes it sets the value of the `pypath.curl.DEBUG` module level variable to `True` (by default it is `False`). Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_debug_on(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classdebug_off(_global_context_off):""" This is a context handler which avoids pypath.curl.Curl() to print debug information. By default it does not do this, so this context only restores the default. Behind the scenes it sets the value of the `pypath.curl.DEBUG` module level variable to `False`. Example: :: import pypath from pypath import curl, data_formats pa = pypath.PyPath() with curl.cache_debug_off(): pa.load_resources({'signor': data_formats.pathway['signor']}) """
[docs]classFileOpener(session_mod.Logger):""" This class opens a file, extracts it in case it is a gzip, tar.gz, tar.bz2 or zip archive, selects the requested files if you only need certain files from a multifile archive, reads the data from the file, or returns the file pointer, as you request. It examines the file type and size. """FORBIDDEN_CHARS=re.compile(r'[/\\<>:"\?\*\|]')
[docs]defopen(self):""" Opens the file if exists. """ifself.fileobjisNoneandos.path.exists(self.fname):ifself.encodingandself.type=='plain':self.fileobj=open(self.fname,self.default_mode,encoding=(Noneifself.default_mode=='rb'elseself.encoding),)else:self.fileobj=open(self.fname,'rb')
[docs]defextract(self):""" Calls the extracting method for compressed files. """getattr(self,'open_%s'%self.type)()
[docs]defopen_tgz(self):""" Extracts files from tar gz. """self._log('Opening tar.gz file `%s`.'%self.fileobj.name)self.files_multipart={}self.sizes={}self.tarfile=tarfile.open(fileobj=self.fileobj,mode='r:gz')self.members=self.tarfile.getmembers()forminself.members:if(self.files_neededisNoneorm.nameinself.files_needed) \
andm.size!=0:# m.size is 0 for dierctoriesthis_file=self.tarfile.extractfile(m)self.sizes[m.name]=m.sizeifself.large:self.files_multipart[m.name]=this_fileelse:self._log('Reading contents of file ''from archive: `%s`.'%m.name)self.files_multipart[m.name]=this_file.read()this_file.close()ifnotself.large:self.tarfile.close()self._log('File closed: `%s`.'%self.fileobj.name)self.result=self.files_multipart
defopen_gz(self):self._log('Opening gzip file `%s`.'%self.fileobj.name)self.fileobj.seek(-4,2)self.size=struct.unpack('I',self.fileobj.read(4))[0]self.fileobj.seek(0)self.gzfile=gzip.GzipFile(fileobj=self.fileobj)ifself.large:io.DEFAULT_BUFFER_SIZE=4096self._gzfile_mode_r=io.TextIOWrapper(self.gzfile,encoding=self.encoding,)self.result=self.iterfile(self.gzfileifself.default_mode=='rb'elseself._gzfile_mode_r)self._log('Result is an iterator over the ''lines of `%s`.'%self.fileobj.name)else:self.result=self.gzfile.read()self.gzfile.close()self._log('Data has been read from gzip file `%s`. ''The file has been closed'%self.fileobj.name)defopen_zip(self):self._log('Opening zip file `%s`.'%self.fileobj.name)self.files_multipart={}self.sizes={}self.fileobj.seek(0)self.zipfile=zipfile.ZipFile(self.fileobj,'r')self.members=self.zipfile.namelist()fori,minenumerate(self.members):self.sizes[m]=self.zipfile.filelist[i].file_sizeifself.files_neededisNoneorminself.files_needed:this_file=self.zipfile.open(m)ifself.large:ifself.default_mode=='rb':# keeping it in binary modeself.files_multipart[m]=this_fileelse:# wrapping the file for decodingself.files_multipart[m]=io.TextIOWrapper(this_file,encoding=self.encoding)else:self.files_multipart[m]=this_file.read()this_file.close()ifnotself.large:self.zipfile.close()self._log('Data has been read from zip file `%s`.''File has been closed'%self.fileobj.name)self.result=self.files_multipartdefopen_plain(self):self._log('Opening plain text file `%s`.'%self.fileobj.name)self.size=os.path.getsize(self.fileobj.name)ifself.large:self.result=self.iterfile(self.fileobj)else:self.result=self.fileobj.read()self.fileobj.close()self._log('Contents of `%s` has been read ''and the file has been closed.'%self.fileobj.name)defget_type(self):self.multifile=Falseifself.fname[-3:].lower()=='zip'orself.compr=='zip':self.type='zip'self.multifile=Trueelifself.fname[-3:].lower()=='tgz'or \
self.fname[-6:].lower()=='tar.gz'or \
self.compr=='tgz'orself.compr=='tar.gz':self.type='tgz'self.multifile=Trueelifself.fname[-2:].lower()=='gz'orself.compr=='gz':self.type='gz'else:self.type='plain'@staticmethoddefiterfile(fileobj):forlineinfileobj:yieldline
[docs]classCurl(FileOpener):""" This class is a wrapper around pycurl. You can set a vast amount of parameters. In addition it has a caching functionality: using this downloads of databases/resources is performed only once. It handles HTTP, FTP, cookies, headers, GET and POST params, multipart/form data, URL quoting, redirects, timeouts, retries, encodings, debugging. It returns either downloaded data, file pointer, files extracted from archives (gzip, tar.gz, zip). It is able to show a progress and status indicator on the console. """
[docs]def__init__(self,url,silent=True,get=None,post=None,req_headers=None,cache=True,debug=False,outf=None,compr=None,encoding=None,files_needed=None,connect_timeout=None,timeout=None,ignore_content_length=False,init_url=None,init_fun='get_jsessionid',init_use_cache=False,follow=True,large=False,default_mode='r',override_post=False,init_headers=False,return_headers=False,compressed=False,binary_data=None,write_cache=True,force_quote=False,sftp_user=None,sftp_passwd=None,sftp_passwd_file='.secrets',sftp_port=22,sftp_host=None,sftp_ask=None,setup=True,call=True,process=True,retries=None,cache_dir=None,bypass_url_encoding=False,empty_attempt_again=True,keep_failed=False,alpn=True,slow=False,http2=True,):ifnothasattr(self,'_logger'):session_mod.Logger.__init__(self,name='curl')self.result=Noneself.download_failed=Falseself.status=0self.get=getself.large=largeself.default_mode=default_modeself.silent=silentself.debug=debugorDEBUGself.url=urlself.local_file=os.path.exists(self.url)self.get=getself.force_quote=force_quoteself.bypass_url_encoding=bypass_url_encodingself.empty_attempt_again=empty_attempt_againself.keep_failed=keep_failedself.alpn=alpnself.http2=http2self._log('Creating Curl object to retrieve ''data from `%s`'%self.url[:200]# we just don't flood the log# with super long URLs)ifnotself.local_file:self.process_url()self.url_fix()self.set_get()else:self._log('The URL is a local file path.')self.filename=os.path.split(self.url)[-1]self.compr=compr# self.get_type()self.progress=Noneself.encoding=encodingself.files_needed=files_neededself.follow_http_redirect=followself.timeout=(settings.get('curl_extended_timeout')ifslowelsesettings.get('curl_timeout'))self.connect_timeout=settings.get('curl_connect_timeout')self.ignore_content_length=ignore_content_lengthself.override_post=override_postself.retries=retriesorsettings.get('curl_retries')or3self.req_headers=req_headersself._req_headers_list()self.post=postself.get=getself.binary_data=binary_dataself.cache_dir=cache_dirself.cache=cacheself.init_cache()ifself.local_file:self.cache_file_name=self.urlself.use_cache=Trueself.write_cache=write_cacheself.outfile=outfself.init_url=init_urlself.init_fun=init_funself.init_use_cache=init_use_cacheself.sftp_host=sftp_hostself.sftp_ask=sftp_askself.sftp_port=sftp_portself.sftp_passwd=sftp_passwdself.sftp_user=sftp_userself.sftp_passwd_file=sftp_passwd_fileifCACHEPRINT:self.show_cache()ifCACHEDEL:self.delete_cache_file()self.init_cache()ifnotself.use_cacheandnotDRYRUN:self.title=Noneself.set_title()ifself.sftp_hostisnotNone:self.sftp_url()self.sftp_call()else:self.progress_setup()ifsetup:self.curl_setup()ifcall:self.curl_call()elifnotself.silent:ifself.local_file:self._log('Loading data from local file `%s`'%self.url)else:self._log('Loading data from cache ''previously downloaded from `%s`'%self.domain)ifprocessandnotself.download_failedandnotDRYRUN:self.process_file()ifDRYRUN:self.print_debug_info('INFO','DRYRUN PERFORMED, RETURNING NONE')ifPRESERVE:self.print_debug_info('INFO','PRESERVING Curl() INSTANCE ''IN pypath.curl.LASTCURL')setattr(sys.modules[__name__],'LASTCURL',self)
[docs]defis_quoted(self,string):""" From http://stackoverflow.com/questions/ 1637762/test-if-string-is-url-encoded-in-php """test=stringwhile(urllib.unquote(test)!=test):test=urllib.unquote(test)returnurllib.quote(test,'/%')==stringorurllib.quote(test)==string
[docs]defurl_fix(self,charset='utf-8'):""" From http://stackoverflow.com/a/121017/854988 """ifself.bypass_url_encoding:returnurl_raw=self.urliftype(self.url)isbytes:self.url=self._bytes_to_unicode(self.url,encoding=charset)scheme,netloc,path,qs,anchor=urlparse.urlsplit(self.url)ifself.force_quoteornotself.is_quoted(path):path=urllib.quote(path,'/%')ifself.force_quoteornotself.is_quoted_plus(qs):qs=urllib.quote_plus(qs,'& = ')self.url=urlparse.urlunsplit((scheme,netloc,path,qs,anchor))ifself.url!=url_raw:self._log('Quoted URL: `%s`.'%self.url[:200])
defset_title(self):ifself.titleisNone:self.title='Downloading `%s` from %s'% \
(self.filename,self.domain)defset_post(self):iftype(self.post)isdict:self.postfields=urllib.urlencode(self.post)self.curl.setopt(self.curl.POSTFIELDS,self.postfields)self.curl.setopt(self.curl.POST,1)self._log('POST parameters set: %s'%self.postfields[:100])else:self.postfields=Nonedefset_get(self):ifself.getisnotNone:ifisinstance(self.get,dict):self.qs='&'.join(map(lambdaparam:'%s=%s'%(param[0],param[1]),map(lambdaparam:(urllib.quote_plus(param[0]),urllib.quote_plus(param[1])),iteritems(self.get))))elifisinstance(self.get,list):self.qs='&'.join(['='.join(param)forparamin[[urllib.quote_plus(arg1),urllib.quote_plus(arg2)]forarg1,arg2in[item.split('=')foriteminself.get]]])self.url='%s%s%s'%(self.url,'&'if'?'inself.urlelse'?',self.qs)self._log('GET parameters added to the URL: `%s`'%self.qs[:100])
[docs]defconstruct_binary_data(self):""" The binary data content of a `form/multipart` type request can be constructed from a list of tuples (<field name>, <field value>), where field name and value are both type of bytes. """bdr=b'---------------------------%s'% \
common.random_string(28).encode('ascii')self.binary_data_param=self.binary_dataself.binary_data=b'\r\n'.join(map(lambdai:b'--%s\r\nContent-Disposition: form-data;'b' name="%s"\r\n\r\n%s'%(bdr,i[0],i[1]),self.binary_data_param))self.binary_data=b'%s\r\n--%s--\r\n'%(self.binary_data,bdr)self.req_headers.append('Content-Type: multipart/form-data; boundary=%s'%bdr.decode('ascii'))self.req_headers.append('Content-Length: %u'%len(self.binary_data))
[docs]defset_binary_data(self):""" Set binary data to be transmitted attached to POST request. `binary_data` is either a bytes string, or a filename, or a list of key-value pairs of a multipart form. """ifself.binary_data:iftype(self.binary_data)islist:self.construct_binary_data()iftype(self.binary_data)isbytes:self.binary_data_size=len(self.binary_data)self.binary_data_file=BytesIO()self.binary_data_file.write(self.binary_data)self.binary_data_file.seek(0)elifos.path.exists(self.binary_data):self.binary_data_size=os.path.getsize(self.binary_data)self.binary_data_file=open(self.binary_data,'rb')self.curl.setopt(pycurl.POST,1)self.curl.setopt(pycurl.POSTFIELDSIZE,self.binary_data_size)self.curl.setopt(pycurl.READFUNCTION,self.binary_data_file.read)self.curl.setopt(pycurl.CUSTOMREQUEST,'POST')self.curl.setopt(pycurl.POSTREDIR,3)self._log('Binary data added to query (not showing).')
defcurl_init(self,url=False):self.curl=pycurl.Curl()self.set_url(url=url)self.curl.setopt(self.curl.SSL_VERIFYPEER,False)ifDEBUG:self._log('Following HTTP redirects: %s'%(str(self.follow_http_redirect)))self.curl.setopt(self.curl.FOLLOWLOCATION,self.follow_http_redirect)self.curl.setopt(self.curl.CONNECTTIMEOUT,self.connect_timeout)self.curl.setopt(self.curl.TIMEOUT,self.timeout)self.curl.setopt(self.curl.TCP_KEEPALIVE,1)self.curl.setopt(self.curl.TCP_KEEPIDLE,2)self.curl.setopt(self.curl.SSL_ENABLE_ALPN,self.alpn)ifnotself.http2:self.curl.setopt(self.curl.HTTP_VERSION,pycurl.CURL_HTTP_VERSION_1_1,)ifself.ignore_content_length:self.curl.setopt(self.curl.IGNORE_CONTENT_LENGTH,136)defset_url(self,url=False):url=urlorself.urlifisinstance(url,basestring):url=url.encode('utf-8')self.curl.setopt(self.curl.URL,url)defset_target(self):target_path=(# by default we write into the cache,# and later copy to `outfile` on demand;# see the `copy_file` methodself.cache_file_nameifself.write_cacheelse# otherwise we do not write to the cache# but only to the outfile if it is setself.outfileifself.outfileisnotNoneelse# if both are disabled, we discard the downloaded dataos.devnull)self.target=open(target_path,'wb')self.curl.setopt(self.curl.WRITEFUNCTION,self.target.write)def_req_headers_list(self):self.req_headers=self.req_headersor[]ifisinstance(self.req_headers,dict):self.req_headers=['%s: %s'%hdrforhdrinself.req_headers.items()]defset_req_headers(self):self.init_request()ifself.override_post:self._log('Overriding HTTP method.')self.req_headers.append('X-HTTP-Method-Override: GET')self.curl.setopt(self.curl.HTTPHEADER,[h.encode('ascii')forhinself.req_headers])defset_resp_headers(self):self.resp_headers=[]self.curl.setopt(self.curl.HEADERFUNCTION,self.resp_headers.append)defset_debug(self):ifself.debug:self.curl.setopt(pycurl.VERBOSE,1)self.curl.setopt(pycurl.DEBUGFUNCTION,self.print_debug_info)defset_compressed(self):ifself.compressed:self.curl.setopt(pycurl.ENCODING,'gzip, deflate')defcurl_setup(self,url=False):self.curl_init(url=url)self.curl_progress_setup()self.set_target()self.set_debug()self.set_post()self.set_binary_data()self.set_req_headers()self.set_resp_headers()defcurl_call(self):self._log('Setting up and calling pycurl.')forattemptinxrange(self.retries):try:ifself.debug:self.print_debug_info('INFO','pypath.curl.Curl().curl_call() :: attempt #%u'%attempt)ifattempt>0:# apparently we have to set it again# before each performself.set_binary_data()self.curl.perform()self.target.flush()if(self.target.name!=os.devnullandos.path.exists(self.target.name)andos.stat(self.target.name).st_size==0andself.empty_attempt_again):self._log('Empty file retrieved, attempting downlad again')continueifself.url.startswith('http'):self.status=self.curl.getinfo(pycurl.HTTP_CODE)ifself.status==200:self.terminate_progress()breakifself.url.startswith('ftp'):self.status==500forhinself.resp_headers:ifh[:3]==b'226':self.status=200self.terminate_progress()breakifself.status==200:breakexceptpycurl.errorase:self.status=500ifself.progressisnotNone:self.progress.terminate(status='failed')self.progress=Noneself.print_debug_info('ERROR','PycURL error: %s'%str(e.args))self.curl.close()self.target.close()ifself.status!=200:self.download_failed=Trueself._log('Download error: HTTP %u'%self.status)if(self.target.name!=os.devnullandos.path.exists(self.target.name)andos.stat(self.target.name).st_size==0andself.status!=302):self.status=500self.download_failed=Trueself._log('Download error: empty file retrieved.')if((self.status>=400orself.download_failed)):withopen(self.target.name,'rb')asfp:contents=fp.read(5000)try:contents=contents.decode('utf8')exceptUnicodeDecodeError:contents=str(contents)self._log('First 5000 bytes of response: %s'%contents)ifnotself.keep_failed:self._log('Download failed, removing the resulted file.')self.remove_target()defremove_target(self):self._log('Removing file: `%s`'%self.target.name)self.target.close()ifos.path.exists(self.target.name):try:os.remove(self.target.name)exceptPermissionError:self._log('Could not remove `%s`, permission denied.'%self.target.name)defprogress_setup(self):ifnotself.silentandself.progressisNoneandnotself.debug:self.progress=progress.Progress(name=self.title,interval=1,status='initializing curl')defcurl_progress_setup(self):ifself.progressisnotNone:self.curl.setopt(pycurl.NOPROGRESS,0)ifhasattr(pycurl,'XFERINFOFUNCTION'):self.curl.setopt(pycurl.XFERINFOFUNCTION,self.update_progress)elifhasattr(pycurl,'PROGRESSFUNCTION'):self.curl.setopt(pycurl.PROGRESSFUNCTION,self.update_progress)def_bytes_to_unicode(self,string,encoding=None):iftype(string)isunicode:returnstringifencodingisnotNone:returnstring.decode(encoding)else:try:returnstring.decode('utf-8')exceptUnicodeDecodeError:try:returnstring.decode('iso-8859-1')except:self.print_debug_info('ERROR','String decoding error')returnu''defunicode2bytes(self,string,encoding=None):iftype(string)isbytes:returnstringifencodingisnotNone:returnstring.encode(encoding)else:try:returnstring.encode('ascii')exceptUnicodeEncodeError:try:returnstring.encode('utf-8')except:self.print_debug_info('ERROR','String encoding error')returnb''defbytes_prefix(self,b):ifb>1000000000:return(b/1000000000.0,u'GB')elifb>1000000:return(b/1000000.0,u'MB')elifb>1000:return(b/1000.0,u'kB')else:return(float(b),u'B')defget_headers(self):self.resp_headers_dict={}ifhasattr(self,'resp_headers'):forheader_lineinself.resp_headers:header_line=self._bytes_to_unicode(header_line)if':'notinheader_line:continuename,value=header_line.split(':',1)name=name.strip()value=value.strip()name=name.lower()self.resp_headers_dict[name]=valuedefguess_encoding(self):ifself.encodingisNone:ifnotself.use_cache:if'content-type'inself.resp_headers:content_type=self.resp_headers['content-type'].lower()match=re.search(r'charset = (\S+)',content_type)ifmatch:self.encoding=match.group(1)ifself.encodingisNone:self.encoding='utf-8'defget_type(self):self.multifile=Falseifself.filename[-3:].lower()=='zip'orself.compr=='zip':self.type='zip'self.multifile=Trueelifself.filename[-3:].lower()=='tgz'or \
self.filename[-6:].lower()=='tar.gz'or \
self.compr=='tgz'orself.compr=='tar.gz':self.type='tgz'self.multifile=Trueelifself.filename[-2:].lower()=='gz'orself.compr=='gz':self.type='gz'else:self.type='plain'defget_jsessionid(self):self.jsessionid=[u'']rejsess=re.compile(r'.*(JSESSIONID\s?=\s?\w*)')forhdrinself.resp_headers:jsess=rejsess.findall(hdr.decode('utf-8'))iflen(jsess)>0:self.jsessionid=[u'Cookie: %s'%jsess[0]]returnself.jsessioniddefupdate_progress(self,download_total,downloaded,upload_total,uploaded):ifself.progressisnotNone:self.total=self.bytes_prefix(download_total)self.done=self.bytes_prefix(downloaded)msg=u'%.02f%s/%.02f%s'% \
(self.done[0],self.done[1],self.total[0],self.total[1])self.progress.set_total(float(download_total))self.progress.set_done(float(downloaded))self.progress.step(step=0,msg=msg,status='downloading')defterminate_progress(self):ifself.progressisnotNone:self.progress.terminate(status='%.02f%s downloaded'%(self.total[0],self.total[1]))self.progress=Nonedefinit_request(self):ifself.init_urlisnotNone:ifself.progressisnotNone:self.progress.set_status('requesting cookie')self.init_curl=Curl(self.init_url,silent=True,debug=self.debug,cache=self.init_use_cache,req_headers=self.req_headers,follow=False,)headers=getattr(self.init_curl,self.init_fun)()self.req_headers.extend(headers)# caching:definit_cache(self):self.get_hash()self.cache_dir_exists()self.get_cache_file_name()self._log('Cache file path: `%s`'%self.cache_file_name)self.select_cache_file()defget_hash(self):ifisinstance(self.cache,str):returnself.post_str=(''ifself.postisNoneelse('?'+'&'.join(sorted([i[0]+' = '+i[1]foriiniteritems(self.post)]))))ifself.binary_data:bindata=str(self.binary_data)else:bindata=''self.urlmd5=hashlib.md5(self.unicode2bytes('%s%s%s'% \
(self.url,self.post_str,bindata))).hexdigest()defcache_dir_exists(self):self.cache_dir=cache_mod.get_cachedir(self.cache_dir)defget_cache_file_name(self):self.cache_file_name=(self.cacheifisinstance(self.cache,str)elseos.path.join(os.getcwd(),self.cache_dir,self.replace_forbidden('%s-%s'%(self.urlmd5,self.filename))))
[docs]@classmethoddefcache_path(self,**kwargs)->str:""" Returns the cache path without performing download or creating file. Args: kwargs: Arguments to `Curl`. """kwargs.update({'setup':False,'call':False,'process':False})returnCurl(**kwargs).cache_file_name
[docs]@classmethoddefreplace_forbidden(cls,name:str,repl:str='_')->str:""" Replaces the characters that are forbidden in certain file systems. The slash is forbidden in Unix, while many other characters in Windows environments. """returncls.FORBIDDEN_CHARS.sub(repl,name)
defdelete_cache_file(self):ifos.path.exists(self.cache_file_name):self.print_debug_info('INFO','CACHE FILE = %s'%self.cache_file_name)self.print_debug_info('INFO','DELETING CACHE FILE')os.remove(self.cache_file_name)self.use_cache=Falseelse:self.print_debug_info('INFO','CACHE FILE = %s'%self.cache_file_name)self.print_debug_info('INFO','CACHE FILE DOES NOT EXIST')defselect_cache_file(self):self.use_cache=Falseiftype(CACHE)isbool:self.cache=CACHEif(self.cacheandos.path.exists(self.cache_file_name)and# if the cache file is empty# try to download againos.stat(self.cache_file_name).st_size>0):self._log('Cache file found, no need for download.')self.use_cache=Truedefshow_cache(self):self.print_debug_info('INFO','URL = %s'%self.url)self.print_debug_info('INFO','CACHE FILE = %s'%self.cache_file_name)self.print_debug_info('INFO','Using cache: %s; cache file exists: %s'%(self.cache,os.path.exists(self.cache_file_name)))# open files:deftranscode(self):ifnotself.use_cacheandself.type=='plain':self.guess_encoding()ifself.encodingisnotNoneandself.encoding!='utf-8':self._log('Converting encoding from `%s` ''to `utf-8`.'%self.encoding)tmp_file_name=os.path.join(os.getcwd(),self.cache_dir,'transcoding.tmp.txt')os.rename(self.cache_file_name,tmp_file_name)ifself.progressisnotNone:self.print_status('Converting %s encoded data to utf-8'%self.encoding)withopen(tmp_file_name,'rb')astmp_file:withopen(self.cache_file_name,'wb')ascache_file:forlineintmp_file:cache_file.write(line.decode(self.encodingor'utf-8').encode('utf-8'))os.remove(tmp_file_name)self.encoding='utf-8'defcopy_file(self):self.transcode()ifself.outfileisnotNoneandself.outfile!=self.cache_file_name:ifself.write_cache:self._log('Copying file `%s` to `%s`.'%(self.cache_file_name,self.outfile,))shutil.copy(self.cache_file_name,self.outfile)else:self._log('Moving file `%s` to `%s`.'%(self.cache_file_name,self.outfile,))os.rename(self.cache_file_name,self.outfile)else:self.outfile=self.cache_file_namedefprocess_file(self):self.guess_encoding()self.get_type()self.copy_file()self.open_file()self.extract_file()self.decode_result()self.report_ready()defopen_file(self):ifnotself.silent:self.print_status('Opening file `%s`'%self.outfile)super(Curl,self).__init__(self.outfile,extract=False)
[docs]defclose(self):""" Closes all file objects. """iftype(self.result)isdict:forfpinself.result.values():ifhasattr(fp,'close'):fp.close()self.fileobj.close()
defextract_file(self):ifnotself.silent:self._log('Extracting data from file type `%s`'%self.type)self.extract()defdecode_result(self):ifself.progressisnotNone:self._log('Decoding `%s` encoded data'%(self.encodingor'utf-8'))def_decode_result(content):try:ifisinstance(content,str):returncontentelse:returncontent.decode(self.encodingor'utf-8')except:self.print_debug_info('WARNING','Failed ''decoding downloaded bytes content with encoding %s. ''Result might be of type bytes'%(self.encodingor'utf-8'))returncontentifnotself.large:iftype(self.result)isdict:forname,contentiniteritems(self.result):self.result[name]=_decode_result(content)else:self.result=_decode_result(self.result)defget_result_type(self):iftype(self.result)isdict:iflen(self.result):self.result_type='dict of %s'%('byte arrays'iftype(next(iter(self.result.values())))isbyteselse'unicode strings'iftype(next(iter(self.result.values())))isunicodeelse'file objects')else:self.result_type='empty dict'else:self.result_type='%s'%('byte array'iftype(self.result)isbyteselse'unicode string'iftype(self.result)isunicodeelse'file object')defreport_ready(self):self.get_result_type()ifnotself.silent:self._log('File at `%s` successfully retrieved. ''Resulted file type `%s, %s`. ''Local file at `%s`.'%(self.url,'plain text'ifself.type=='plain'else'%s extracted data'%self.type,self.result_type,self.outfile,))defprint_status(self,status):ifself.progressisnotNone:self.terminate_progress()ifself.debug:self.print_debug_info('INFO',status)elifnotself.silent:self._log(status)# sftp part:defsftp_url(self):ifself.sftp_hostisnotNone:self.sftp_filename=self.urlself.url='%s%s'%(self.sftp_host,self.sftp_filename)defsftp_call(self):self.sftp_success=self.sftp_download()ifself.sftp_success:self.status=200else:self.status=501defask_passwd(self,use_passwd_file=True):ifuse_passwd_fileandos.path.exists(self.sftp_passwd_file):withopen(self.sftp_passwd_file,'r')asf:self.sftp_user=f.readline().strip()self.sftp_passwd=f.readline().strip()returnNonesys.stdout.write(self.sftp_ask)sys.stdout.flush()whileTrue:self.user=input('\n\tUsername: ')self.passwd=input('\tPassword (leave empty if no password needed): ')correct=input('Are these details correct? ''User: `%s`, password: `%s` [Y/n]\n'%(self.user,self.passwd))ifcorrect.lower().strip()notin['','y','yes']:continuesave=input('Do you wish to save your login details unencripted\n''to the following file, so you don\'t ''need to enter them next time? File: %s\n''Save login details [Y/n]'%self.sftp_passwd_file)breakifsave.lower().strip()in['','y','yes']:withopen(self.sftp_passwd_file,'w')asf:f.write('%s\n%s'%(self.user,self.passwd))defsftp_download(self):self.sftp_ask=('Please enter your login details for %s\n'%self.hostifself.sftp_askisNoneelseself.sftp_ask)self.sftp_passwd_file=(os.path.join('cache','%s.login'%self.sftp_host)ifself.sftp_passwd_fileisNoneelseself.sftp_passwd_file)ifself.sftp_userisNone:self.ask_passwd()whileTrue:self.sftp_passwd=self.sftp_passwdorNonecnopts=pysftp.CnOpts()cnopts.hostkeys=Nonewithpysftp.Connection(host=self.sftp_host,username=self.sftp_user,password=self.sftp_passwd,port=self.sftp_port,cnopts=cnopts)ascon:try:con.get(self.sftp_filename,self.cache_file_name)breakexceptIOError:msg='Failed to get %s from %s\n'\
'Try again (1) || Enter new login details (2) '\
'|| Cancel (3) ?\n'%(self.sftp_filename,self.sftp_host)whattodo=input(msg)if'1'inwhattodo:continueif'2'inwhattodo:self.ask_passwd(use_passwd_file=False)continueif'3'inwhattodo:returnFalsereturnTrue