From 9c291968f6c06123c15bb9f1df8b576030580296 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Mon, 20 Jul 2020 17:10:08 +0200 Subject: [PATCH] Speedup URLFile (#1888) * add parallel download support to URLFile * make python 3.8 happy * Fix chunk size * Automatic number of threads * No daemon threads in unlogger * Cache length * dont touch old filereader * Remove debug info * remove debug script * Ignore type old-commit-hash: c70700758d9e3227a157d2e9fdcb9d3204b433c2 --- tools/lib/filereader.py | 8 ++- {common => tools/lib}/url_file.py | 0 tools/lib/url_file_parallel.py | 81 +++++++++++++++++++++++++++++++ tools/replay/unlogger.py | 3 -- 4 files changed, 88 insertions(+), 4 deletions(-) rename {common => tools/lib}/url_file.py (100%) create mode 100644 tools/lib/url_file_parallel.py diff --git a/tools/lib/filereader.py b/tools/lib/filereader.py index c68462a08c..826bce5fdf 100644 --- a/tools/lib/filereader.py +++ b/tools/lib/filereader.py @@ -1,4 +1,10 @@ -from common.url_file import URLFile +import os + +if "COMMA_PARALLEL_DOWNLOADS" in os.environ: + from tools.lib.url_file_parallel import URLFileParallel as URLFile +else: + from tools.lib.url_file import URLFile # type: ignore + def FileReader(fn, debug=False): if fn.startswith("http://") or fn.startswith("https://"): diff --git a/common/url_file.py b/tools/lib/url_file.py similarity index 100% rename from common/url_file.py rename to tools/lib/url_file.py diff --git a/tools/lib/url_file_parallel.py b/tools/lib/url_file_parallel.py new file mode 100644 index 0000000000..f6dc37a3b4 --- /dev/null +++ b/tools/lib/url_file_parallel.py @@ -0,0 +1,81 @@ +# pylint: skip-file + +import os +import pycurl +from tools.lib.url_file import URLFile +from io import BytesIO +from tenacity import retry, wait_random_exponential, stop_after_attempt + +from multiprocessing import Pool + + +class URLFileParallel(URLFile): + def __init__(self, url, debug=False): + self._length = None + self._url = url + self._pos = 0 + self._local_file = None + + def get_curl(self): + curl = pycurl.Curl() + curl.setopt(pycurl.NOSIGNAL, 1) + curl.setopt(pycurl.TIMEOUT_MS, 500000) + curl.setopt(pycurl.FOLLOWLOCATION, True) + return curl + + @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) + def get_length(self): + if self._length is not None: + return self._length + + c = self.get_curl() + c.setopt(pycurl.URL, self._url) + c.setopt(c.NOBODY, 1) + c.perform() + + length = int(c.getinfo(c.CONTENT_LENGTH_DOWNLOAD)) + self._length = length + return length + + @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) + def download_chunk(self, start, end=None): + if end is None: + trange = f'bytes={start}-' + else: + trange = f'bytes={start}-{end}' + + dats = BytesIO() + c = self.get_curl() + c.setopt(pycurl.URL, self._url) + c.setopt(pycurl.WRITEDATA, dats) + c.setopt(pycurl.HTTPHEADER, ["Range: " + trange, "Connection: keep-alive"]) + c.perform() + + response_code = c.getinfo(pycurl.RESPONSE_CODE) + if response_code != 206 and response_code != 200: + raise Exception("Error {} ({}): {}".format(response_code, self._url, repr(dats.getvalue())[:500])) + + return dats.getvalue() + + def read(self, ll=None): + start = self._pos + end = None if ll is None else self._pos + ll - 1 + max_threads = int(os.environ.get("COMMA_PARALLEL_DOWNLOADS", "0")) + + end = self.get_length() if end is None else end + threads = min((end - start) // (512 * 1024), max_threads) # At least 512k per thread + + if threads > 1: + chunk_size = (end - start) // threads + chunks = [ + (start + chunk_size * i, + start + chunk_size * (i + 1) - 1 if i != threads - 1 else end) + for i in range(threads)] + + with Pool(threads) as pool: + ret = b"".join(pool.starmap(self.download_chunk, chunks)) + else: + ret = self.download_chunk(start, end) + + self._pos += len(ret) + return ret diff --git a/tools/replay/unlogger.py b/tools/replay/unlogger.py index eebfe28203..2cef80b770 100755 --- a/tools/replay/unlogger.py +++ b/tools/replay/unlogger.py @@ -413,9 +413,6 @@ def main(argv): args=(command_address, forward_commands_address, data_address, args.realtime, _get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop)) - for p in subprocesses.values(): - p.daemon = True - subprocesses["data"].start() subprocesses["control"].start()