From cb6a47c6bf45bffd376a9e313aada71a6ec8f306 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 8 Feb 2024 10:24:45 -0800 Subject: [PATCH] simplify URLFile (#31365) * simplify URLFile * more space old-commit-hash: ec9f3dcef367e6c29756ededa7e83783ef280cfa --- tools/lib/url_file.py | 48 ++++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/tools/lib/url_file.py b/tools/lib/url_file.py index be9c815c93..77ed063226 100644 --- a/tools/lib/url_file.py +++ b/tools/lib/url_file.py @@ -1,12 +1,10 @@ import logging import os +import socket import time -import threading from hashlib import sha256 -from urllib3 import PoolManager +from urllib3 import PoolManager, Retry from urllib3.util import Timeout -from tenacity import retry, wait_random_exponential, stop_after_attempt -from typing import Optional from openpilot.common.file_helpers import atomic_write_in_dir from openpilot.system.hardware.hw import Paths @@ -25,14 +23,23 @@ class URLFileException(Exception): pass +def new_pool_manager() -> PoolManager: + socket_options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),] + retries = Retry(total=5, backoff_factor=0.5, status_forcelist=[409, 429, 503, 504]) + return PoolManager(num_pools=10, maxsize=100, socket_options=socket_options, retries=retries) + + +def set_pool_manager(): + URLFile._pool_manager = new_pool_manager() +os.register_at_fork(after_in_child=set_pool_manager) + + class URLFile: - _pid: Optional[int] = None - _pool_manager: Optional[PoolManager] = None - _pool_manager_lock = threading.Lock() + _pool_manager = new_pool_manager() - def __init__(self, url, debug=False, cache=None): - self._pool_manager = None + def __init__(self, url, timeout=10, debug=False, cache=None): self._url = url + self._timeout = Timeout(connect=timeout, read=timeout) self._pos = 0 self._length = None self._local_file = None @@ -54,20 +61,11 @@ class URLFile: self._local_file.close() self._local_file = None - def _http_client(self) -> PoolManager: - if self._pool_manager is None: - pid = os.getpid() - with URLFile._pool_manager_lock: - if URLFile._pid != pid or URLFile._pool_manager is None: # unsafe to share after fork - URLFile._pid = pid - URLFile._pool_manager = PoolManager(num_pools=10, maxsize=10) - self._pool_manager = URLFile._pool_manager - return self._pool_manager - - @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) + def _request(self, method, url, headers=None): + return URLFile._pool_manager.request(method, url, timeout=self._timeout, headers=headers) + def get_length_online(self): - timeout = Timeout(connect=50.0, read=500.0) - response = self._http_client().request('HEAD', self._url, timeout=timeout, preload_content=False) + response = self._request('HEAD', self._url) if not (200 <= response.status <= 299): return -1 length = response.headers.get('content-length', 0) @@ -122,10 +120,9 @@ class URLFile: self._pos = file_end return response - @retry(wait=wait_random_exponential(multiplier=1, max=5), stop=stop_after_attempt(3), reraise=True) def read_aux(self, ll=None): download_range = False - headers = {'Connection': 'keep-alive'} + headers = {} if self._pos != 0 or ll is not None: if ll is None: end = self.get_length() - 1 @@ -139,8 +136,7 @@ class URLFile: if self._debug: t1 = time.time() - timeout = Timeout(connect=50.0, read=500.0) - response = self._http_client().request('GET', self._url, timeout=timeout, preload_content=False, headers=headers) + response = self._request('GET', self._url, headers=headers) ret = response.data if self._debug: