diff --git a/tools/lib/filereader.py b/tools/lib/filereader.py index 826bce5fdf..5c9b375bb2 100644 --- a/tools/lib/filereader.py +++ b/tools/lib/filereader.py @@ -1,9 +1,4 @@ -import os - -if "COMMA_PARALLEL_DOWNLOADS" in os.environ: - from tools.lib.url_file_parallel import URLFileParallel as URLFile -else: - from tools.lib.url_file import URLFile # type: ignore +from tools.lib.url_file import URLFile def FileReader(fn, debug=False): diff --git a/tools/lib/url_file_parallel.py b/tools/lib/url_file_parallel.py deleted file mode 100644 index f6dc37a3b4..0000000000 --- a/tools/lib/url_file_parallel.py +++ /dev/null @@ -1,81 +0,0 @@ -# pylint: skip-file - -import os -import pycurl -from tools.lib.url_file import URLFile -from io import BytesIO -from tenacity import retry, wait_random_exponential, stop_after_attempt - -from multiprocessing import Pool - - -class URLFileParallel(URLFile): - def __init__(self, url, debug=False): - self._length = None - self._url = url - self._pos = 0 - self._local_file = None - - def get_curl(self): - curl = pycurl.Curl() - curl.setopt(pycurl.NOSIGNAL, 1) - curl.setopt(pycurl.TIMEOUT_MS, 500000) - curl.setopt(pycurl.FOLLOWLOCATION, True) - return curl - - @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) - def get_length(self): - if self._length is not None: - return self._length - - c = self.get_curl() - c.setopt(pycurl.URL, self._url) - c.setopt(c.NOBODY, 1) - c.perform() - - length = int(c.getinfo(c.CONTENT_LENGTH_DOWNLOAD)) - self._length = length - return length - - @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) - def download_chunk(self, start, end=None): - if end is None: - trange = f'bytes={start}-' - else: - trange = f'bytes={start}-{end}' - - dats = BytesIO() - c = self.get_curl() - c.setopt(pycurl.URL, self._url) - c.setopt(pycurl.WRITEDATA, dats) - c.setopt(pycurl.HTTPHEADER, ["Range: " + trange, "Connection: keep-alive"]) - c.perform() - - response_code = c.getinfo(pycurl.RESPONSE_CODE) - if response_code != 206 and response_code != 200: - raise Exception("Error {} ({}): {}".format(response_code, self._url, repr(dats.getvalue())[:500])) - - return dats.getvalue() - - def read(self, ll=None): - start = self._pos - end = None if ll is None else self._pos + ll - 1 - max_threads = int(os.environ.get("COMMA_PARALLEL_DOWNLOADS", "0")) - - end = self.get_length() if end is None else end - threads = min((end - start) // (512 * 1024), max_threads) # At least 512k per thread - - if threads > 1: - chunk_size = (end - start) // threads - chunks = [ - (start + chunk_size * i, - start + chunk_size * (i + 1) - 1 if i != threads - 1 else end) - for i in range(threads)] - - with Pool(threads) as pool: - ret = b"".join(pool.starmap(self.download_chunk, chunks)) - else: - ret = self.download_chunk(start, end) - - self._pos += len(ret) - return ret