From 870d19f33de194679e060b49f7a970c3cfaa3836 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Tue, 19 Aug 2025 19:59:50 -0700 Subject: [PATCH] Reapply "File sourcing: Not all files are logs (#36025)" This reverts commit 3570022b9a7e90c98a0188d5b7f7f2e14710af61. Fix test --- tools/lib/file_sources.py | 57 +++++++++++++++++++++++++++++ tools/lib/logreader.py | 61 +++---------------------------- tools/lib/tests/test_logreader.py | 5 ++- 3 files changed, 65 insertions(+), 58 deletions(-) create mode 100755 tools/lib/file_sources.py diff --git a/tools/lib/file_sources.py b/tools/lib/file_sources.py new file mode 100755 index 0000000000..cb7bf15114 --- /dev/null +++ b/tools/lib/file_sources.py @@ -0,0 +1,57 @@ +from collections.abc import Callable + +from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url +from openpilot.tools.lib.openpilotci import get_url +from openpilot.tools.lib.filereader import DATA_ENDPOINT, file_exists, internal_source_available +from openpilot.tools.lib.route import Route, SegmentRange, FileName + +# When passed a tuple of file names, each source will return the first that exists (rlog.zst, rlog.bz2) +FileNames = tuple[str, ...] +Source = Callable[[SegmentRange, list[int], FileNames], dict[int, str]] + +InternalUnavailableException = Exception("Internal source not available") + + +def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]: + route = Route(sr.route_name) + + # comma api will have already checked if the file exists + if fns == FileName.RLOG: + return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None} + else: + return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None} + + +def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]: + if not internal_source_available(endpoint_url): + raise InternalUnavailableException + + def get_internal_url(sr: SegmentRange, seg, file): + return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}" + + return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs}) + + +def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]: + return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs}) + + +def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]: + return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs}) + + +def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]: + # Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst) + valid_files: dict[int, str] = {} + + for seg_idx, urls in files.items(): + if isinstance(urls, str): + urls = [urls] + + # Add first valid file URL + for url in urls: + if file_exists(url): + valid_files[seg_idx] = url + break + + return valid_files diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index 50e3b9dc7a..8d84cdbd5d 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -12,16 +12,15 @@ import urllib.parse import warnings import zstandard as zstd -from collections.abc import Callable, Iterable, Iterator +from collections.abc import Iterable, Iterator from typing import cast from urllib.parse import parse_qs, urlparse from cereal import log as capnp_log from openpilot.common.swaglog import cloudlog -from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url -from openpilot.tools.lib.openpilotci import get_url -from openpilot.tools.lib.filereader import DATA_ENDPOINT, FileReader, file_exists, internal_source_available -from openpilot.tools.lib.route import Route, SegmentRange, FileName +from openpilot.tools.lib.filereader import FileReader +from openpilot.tools.lib.file_sources import comma_api_source, internal_source, openpilotci_source, comma_car_segments_source, Source +from openpilot.tools.lib.route import SegmentRange, FileName from openpilot.tools.lib.log_time_series import msgs_to_time_series LogMessage = type[capnp._DynamicStructReader] @@ -140,65 +139,15 @@ class ReadMode(enum.StrEnum): AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user -LogFileName = tuple[str, ...] -Source = Callable[[SegmentRange, list[int], LogFileName], dict[int, str]] - -InternalUnavailableException = Exception("Internal source not available") - - class LogsUnavailable(Exception): pass -def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]: - route = Route(sr.route_name) - - # comma api will have already checked if the file exists - if fns == FileName.RLOG: - return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None} - else: - return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None} - - -def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]: - if not internal_source_available(endpoint_url): - raise InternalUnavailableException - - def get_internal_url(sr: SegmentRange, seg, file): - return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}" - - return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs}) - - -def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]: - return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs}) - - -def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]: - return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs}) - - def direct_source(file_or_url: str) -> list[str]: return [file_or_url] -def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]: - # Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst) - valid_files: dict[int, str] = {} - - for seg_idx, urls in files.items(): - if isinstance(urls, str): - urls = [urls] - - # Add first valid file URL - for url in urls: - if file_exists(url): - valid_files[seg_idx] = url - break - - return valid_files - - +# TODO this should apply to camera files as well def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]: exceptions = {} diff --git a/tools/lib/tests/test_logreader.py b/tools/lib/tests/test_logreader.py index a9f7b2352d..8d0870171f 100644 --- a/tools/lib/tests/test_logreader.py +++ b/tools/lib/tests/test_logreader.py @@ -10,7 +10,8 @@ import requests from parameterized import parameterized from cereal import log as capnp_log -from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, comma_api_source, parse_indirect, ReadMode, InternalUnavailableException +from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, parse_indirect, ReadMode +from openpilot.tools.lib.file_sources import comma_api_source, InternalUnavailableException from openpilot.tools.lib.route import SegmentRange from openpilot.tools.lib.url_file import URLFileException @@ -90,7 +91,7 @@ class TestLogReader: @pytest.mark.parametrize("cache_enabled", [True, False]) def test_direct_parsing(self, mocker, cache_enabled): - file_exists_mock = mocker.patch("openpilot.tools.lib.logreader.file_exists") + file_exists_mock = mocker.patch("openpilot.tools.lib.filereader.file_exists") os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0" qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False)