Source code for threeML.io.download_from_http

import os
import re
from builtins import object
from pathlib import Path

import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from threeML.config.config import threeML_config
from threeML.io.file_utils import (
    file_existing_and_readable,
    path_exists_and_is_directory,
    sanitize_filename,
)
from threeML.io.logging import setup_logger
from threeML.utils.progress_bar import tqdm

log = setup_logger(__name__)


[docs] class RemoteDirectoryNotFound(IOError): pass
[docs] class HTTPError(IOError): pass
[docs] class ApacheDirectory(object): """Allows to interact with a directory listing like the one returned by an Apache server.""" def __init__(self, url): self._request_result = requests.get(url) # Make sure the request was ok if not self._request_result.ok: if self._request_result.reason == "Not Found": raise RemoteDirectoryNotFound( "Remote directory %s does not exist" % url ) else: raise HTTPError( "HTTP request failed with reason: %s" % self._request_result.reason ) self._text = self._request_result.text # Get the listing of files and directories self._entries = self._get_directory_entries() # Now split directories and files self._files = [] self._directories = [] for entry in self._entries: if entry[1] == "FILE": self._files.append(entry[0]) else: self._directories.append(entry[0]) def _get_directory_entries(self): """List files and directories listed in the listing. :return: a list of tuples (entry name, type (DIR or FILE)) """ # Get the files listed in the directory # A line in an Apache listing is like this: # <img src="/icons/unknown.gif" alt="[ ]"> # <a href="glg_cspec_b0_bn100101988_v02.rsp">glg_cspec_b0_bn100101988_v02.rsp # </a> # 16-Nov-2012 15:14 96K regexp = re.compile(r"<img src=.+ alt=(.+)>\s?<a href=.+>(.+)</a>.+") # Apache puts files in a <pre></pre> tag, so lines are ended simply with \n lines = self._text.split("\n") # Now loop over the lines and extract the file name entries = [] for line in lines: token = re.match(regexp, line) if token is not None: # This line contains a file or a directory type_token, filename_token = token.groups() # Figure out if this is a directory or a file. A directory has a # alt="[DIR]" attribute in the # <img> tag, a file has a alt="[ ]" or other things (if a known type) if type_token.upper().find("DIR") >= 0: entry_type = "DIR" else: entry_type = "FILE" # Append entry entries.append((filename_token, entry_type)) return entries @property def files(self): return self._files @property def directories(self): return self._directories
[docs] def download( self, remote_filename, destination_path: str, new_filename=None, progress=True, compress=False, timeout=10, ): assert ( remote_filename in self.files ), "File %s is not contained in this directory (%s)" % ( remote_filename, self._request_result.url, ) destination_path: Path = sanitize_filename(destination_path, abspath=True) assert path_exists_and_is_directory(destination_path), ( f"Provided destination {destination_path} does not exist or " "is not a directory" ) # If no filename is specified, use the same name that the file has on the remote # server if new_filename is None: new_filename: str = remote_filename.split("/")[-1] # Get the fully qualified path for the remote and the local file remote_path: str = self._request_result.url + remote_filename local_path: Path = destination_path / new_filename # Ask the server for the file, but do not download it just yet # (stream=True will get the HTTP header but nothing else) # Use stream=True for two reasons: # * so that the file is not downloaded all in memory before being written to the # disk # * so that we can report progress is requested with Session() as session: # use a Session context manager to allow retries # using requests retries = Retry(total=3, backoff_factor=0.1) session.mount("https://", HTTPAdapter(max_retries=retries)) session.mount("http://", HTTPAdapter(max_retries=retries)) this_request = session.get(remote_path, stream=True, timeout=timeout) # Figure out the size of the file file_size = int(this_request.headers["Content-Length"]) log.debug(f"downloading {remote_filename} of size {file_size}") # Now check if we really need to download this file if compress: # Add a .gz at the end of the file path log.debug(f"file {remote_filename} will be downloaded and compressed") local_path: Path = Path(f"{local_path}.gz") if file_existing_and_readable(local_path): local_size = os.path.getsize(local_path) if local_size == file_size or compress: # if the compressed file already exists # it will have a smaller size # No need to download it again log.info(f"file {remote_filename} is already downloaded!") return local_path if local_path.is_file(): first_byte = os.path.getsize(local_path) else: first_byte = 0 # Chunk size shouldn't bee too small otherwise we are causing a bottleneck # in the download speed chunk_size = 1024 * 10 # If the user wants to compress the file, use gzip, otherwise the normal # opener if compress: import gzip opener = gzip.open else: opener = open if threeML_config["interface"]["progress_bars"]: # Set a title for the progress bar bar_title = "Downloading %s" % new_filename bar = tqdm( initial=first_byte, unit_scale=True, unit_divisor=1024, unit="B", total=int(this_request.headers["Content-Length"]), desc=bar_title, ) with opener(local_path, "wb") as f: for chunk in this_request.iter_content(chunk_size=chunk_size): if chunk: # filter out keep-alive new chunks f.write(chunk) bar.update(len(chunk)) bar.close() else: with opener(local_path, "wb") as f: for chunk in this_request.iter_content(chunk_size=chunk_size): if chunk: # filter out keep-alive new chunks f.write(chunk) return local_path
[docs] def download_all_files(self, destination_path, progress=True, pattern=None): """Download all files in the current directory. :param destination_path: the path for the destination directory in the local file system :param progress: (True or False) whether to display progress or not :param pattern: (default: None) If not None, only files matching this pattern (a regular expression) will be downloaded :return: list of the downloaded files as absolute paths in the local file system """ local_files = [] for file in self.files: if pattern is not None: if re.match(pattern, os.path.basename(file)) is None: continue this_local_file = self.download(file, destination_path, progress=progress) local_files.append(this_local_file) return local_files