diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index cf7dd3146f..7966524458 100644 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -21,8 +21,7 @@ from openpilot.common.basedir import BASEDIR from openpilot.selfdrive.pandad import can_capnp_to_list from openpilot.selfdrive.test.helpers import read_segment_list from openpilot.system.hardware.hw import DEFAULT_DOWNLOAD_CACHE_ROOT -from openpilot.tools.lib.logreader import LogReader, LogsUnavailable, openpilotci_source_zst, openpilotci_source, internal_source, \ - internal_source_zst, comma_api_source +from openpilot.tools.lib.logreader import LogReader, LogsUnavailable, openpilotci_source, internal_source, comma_api_source from openpilot.tools.lib.route import SegmentName SafetyModel = car.CarParams.SafetyModel @@ -124,8 +123,7 @@ class TestCarModelBase(unittest.TestCase): segment_range = f"{cls.test_route.route}/{seg}" try: - sources = ([internal_source, internal_source_zst] if len(INTERNAL_SEG_LIST) else - [openpilotci_source_zst, openpilotci_source, comma_api_source]) + sources = [internal_source] if len(INTERNAL_SEG_LIST) else [openpilotci_source, comma_api_source] lr = LogReader(segment_range, sources=sources, sort_by_time=True) return cls.get_testing_data_from_logreader(lr) except (LogsUnavailable, AssertionError): diff --git a/tools/lib/filereader.py b/tools/lib/filereader.py index 46fad7db7a..70a5d3fe8c 100644 --- a/tools/lib/filereader.py +++ b/tools/lib/filereader.py @@ -1,6 +1,7 @@ import os import posixpath import socket +from functools import cache from urllib.parse import urlparse from openpilot.tools.lib.url_file import URLFile @@ -30,6 +31,7 @@ def resolve_name(fn): return fn +@cache def file_exists(fn): fn = resolve_name(fn) if fn.startswith(("http://", "https://")): diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index 2045f0cbb5..6d235b7c6b 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 import bz2 -from functools import cache, partial +from functools import partial import multiprocessing import capnp import enum @@ -13,6 +13,7 @@ import warnings import zstandard as zstd from collections.abc import Callable, Iterable, Iterator +from typing import cast from urllib.parse import parse_qs, urlparse from cereal import log as capnp_log @@ -100,9 +101,13 @@ class ReadMode(enum.StrEnum): AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user +class FileName(enum.Enum): + RLOG = ("rlog.zst", "rlog.bz2") + QLOG = ("qlog.zst", "qlog.bz2") + + LogPath = str | None -ValidFileCallable = Callable[[LogPath], bool] -Source = Callable[[SegmentRange, ReadMode], list[LogPath]] +Source = Callable[[SegmentRange, FileName], list[LogPath]] InternalUnavailableException = Exception("Internal source not available") @@ -111,129 +116,112 @@ class LogsUnavailable(Exception): pass -@cache -def default_valid_file(fn: LogPath) -> bool: - return fn is not None and file_exists(fn) - - -def auto_strategy(rlog_paths: list[LogPath], qlog_paths: list[LogPath], interactive: bool, valid_file: ValidFileCallable) -> list[LogPath]: - # auto select logs based on availability - missing_rlogs = [rlog is None or not valid_file(rlog) for rlog in rlog_paths].count(True) - if missing_rlogs != 0: - if interactive: - if input(f"{missing_rlogs}/{len(rlog_paths)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/n) ").lower() != "y": - return rlog_paths - else: - cloudlog.warning(f"{missing_rlogs}/{len(rlog_paths)} rlogs were not found, falling back to qlogs for those segments...") - - return [rlog if valid_file(rlog) else (qlog if valid_file(qlog) else None) - for (rlog, qlog) in zip(rlog_paths, qlog_paths, strict=True)] - return rlog_paths - - -def apply_strategy(mode: ReadMode, rlog_paths: list[LogPath], qlog_paths: list[LogPath], valid_file: ValidFileCallable = default_valid_file) -> list[LogPath]: - if mode == ReadMode.RLOG: - return rlog_paths - elif mode == ReadMode.QLOG: - return qlog_paths - elif mode == ReadMode.AUTO: - return auto_strategy(rlog_paths, qlog_paths, False, valid_file) - elif mode == ReadMode.AUTO_INTERACTIVE: - return auto_strategy(rlog_paths, qlog_paths, True, valid_file) - raise ValueError(f"invalid mode: {mode}") - - -def comma_api_source(sr: SegmentRange, mode: ReadMode) -> list[LogPath]: +def comma_api_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: route = Route(sr.route_name) - rlog_paths = [route.log_paths()[seg] for seg in sr.seg_idxs] - qlog_paths = [route.qlog_paths()[seg] for seg in sr.seg_idxs] - # comma api will have already checked if the file exists - def valid_file(fn): - return fn is not None - - return apply_strategy(mode, rlog_paths, qlog_paths, valid_file=valid_file) + if fns == FileName.RLOG: + return [route.log_paths()[seg] for seg in sr.seg_idxs] + else: + return [route.qlog_paths()[seg] for seg in sr.seg_idxs] -def internal_source(sr: SegmentRange, mode: ReadMode, file_ext: str = "bz2", - endpoint_url: str = DATA_ENDPOINT) -> list[LogPath]: +def internal_source(sr: SegmentRange, fns: FileName, endpoint_url: str = DATA_ENDPOINT) -> list[LogPath]: if not internal_source_available(endpoint_url): raise InternalUnavailableException def get_internal_url(sr: SegmentRange, seg, file): - return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}.{file_ext}" - - # TODO: list instead of using static URLs to support routes with multiple file extensions - rlog_paths = [get_internal_url(sr, seg, "rlog") for seg in sr.seg_idxs] - qlog_paths = [get_internal_url(sr, seg, "qlog") for seg in sr.seg_idxs] - - return apply_strategy(mode, rlog_paths, qlog_paths) - + return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}" -def internal_source_zst(sr: SegmentRange, mode: ReadMode, file_ext: str = "zst", - endpoint_url: str = DATA_ENDPOINT) -> list[LogPath]: - return internal_source(sr, mode, file_ext, endpoint_url) + return eval_source([[get_internal_url(sr, seg, fn) for fn in fns.value] for seg in sr.seg_idxs]) -def openpilotci_source(sr: SegmentRange, mode: ReadMode, file_ext: str = "bz2") -> list[LogPath]: - rlog_paths = [get_url(sr.route_name, seg, f"rlog.{file_ext}") for seg in sr.seg_idxs] - qlog_paths = [get_url(sr.route_name, seg, f"qlog.{file_ext}") for seg in sr.seg_idxs] +def openpilotci_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: + return eval_source([[get_url(sr.route_name, seg, fn) for fn in fns.value] for seg in sr.seg_idxs]) - return apply_strategy(mode, rlog_paths, qlog_paths) +def comma_car_segments_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: + return eval_source([get_comma_segments_url(sr.route_name, seg) for seg in sr.seg_idxs]) -def openpilotci_source_zst(sr: SegmentRange, mode: ReadMode) -> list[LogPath]: - return openpilotci_source(sr, mode, "zst") - -def comma_car_segments_source(sr: SegmentRange, mode: ReadMode = ReadMode.RLOG) -> list[LogPath]: - return [get_comma_segments_url(sr.route_name, seg) for seg in sr.seg_idxs] - - -def testing_closet_source(sr: SegmentRange, mode=ReadMode.RLOG) -> list[LogPath]: +def testing_closet_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: if not internal_source_available('http://testing.comma.life'): raise InternalUnavailableException - return [f"http://testing.comma.life/download/{sr.route_name.replace('|', '/')}/{seg}/rlog" for seg in sr.seg_idxs] + return eval_source([f"http://testing.comma.life/download/{sr.route_name.replace('|', '/')}/{seg}/rlog" for seg in sr.seg_idxs]) -def direct_source(file_or_url: str) -> list[LogPath]: +def direct_source(file_or_url: str) -> list[str]: return [file_or_url] -def get_invalid_files(files): - for f in files: - if f is None or not file_exists(f): - yield f +def eval_source(files: list[list[str] | str]) -> list[LogPath]: + # Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst) + valid_files: list[LogPath] = [] + + for urls in files: + if isinstance(urls, str): + urls = [urls] + for url in urls: + if file_exists(url): + valid_files.append(url) + break + else: + valid_files.append(None) -def check_source(source: Source, *args) -> list[LogPath]: - files = source(*args) - assert len(files) > 0, "No files on source" - assert next(get_invalid_files(files), False) is False, "Some files are invalid" - return files + return valid_files -def auto_source(sr: SegmentRange, sources: list[Source], mode: ReadMode = ReadMode.RLOG) -> list[LogPath]: +def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]: exceptions = {} - # for automatic fallback modes, auto_source needs to first check if rlogs exist for any source - if mode in [ReadMode.AUTO, ReadMode.AUTO_INTERACTIVE]: + sr = SegmentRange(identifier) + mode = default_mode if sr.selector is None else ReadMode(sr.selector) + + if mode == ReadMode.QLOG: + try_fns = [FileName.QLOG] + else: + try_fns = [FileName.RLOG] + + # If selector allows it, fallback to qlogs + if mode in (ReadMode.AUTO, ReadMode.AUTO_INTERACTIVE): + try_fns.append(FileName.QLOG) + + # Build a dict of valid files as we evaluate each source. May contain mix of rlogs, qlogs, and None. + # This function only returns when we've sourced all files, or throws an exception + valid_files: dict[int, LogPath] = {} + for fn in try_fns: for source in sources: try: - return check_source(source, sr, ReadMode.RLOG) - except Exception: - pass + files = source(sr, fn) - # Automatically determine viable source - for source in sources: - try: - return check_source(source, sr, mode) - except Exception as e: - exceptions[source.__name__] = e + # Check every source returns an expected number of files + assert len(files) == len(valid_files) or len(valid_files) == 0, f"Source {source.__name__} returned unexpected number of files" + + # Build a dict of valid files + for idx, f in enumerate(files): + if valid_files.get(idx) is None: + valid_files[idx] = f + + # We've found all files, return them + if all(f is not None for f in valid_files.values()): + return cast(list[str], list(valid_files.values())) + + except Exception as e: + exceptions[source.__name__] = e - raise LogsUnavailable("auto_source could not find any valid source, exceptions for sources:\n - " + - "\n - ".join([f"{k}: {repr(v)}" for k, v in exceptions.items()])) + if fn == try_fns[0]: + missing_logs = list(valid_files.values()).count(None) + if mode == ReadMode.AUTO: + cloudlog.warning(f"{missing_logs}/{len(valid_files)} rlogs were not found, falling back to qlogs for those segments...") + elif mode == ReadMode.AUTO_INTERACTIVE: + if input(f"{missing_logs}/{len(valid_files)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/N) ").lower() != "y": + break + + missing_logs = list(valid_files.values()).count(None) + raise LogsUnavailable(f"{missing_logs}/{len(valid_files)} logs were not found, please ensure all logs " + + "are uploaded. You can fall back to qlogs with '/a' selector at the end of the route name.\n\n" + + "Exceptions for sources:\n - " + "\n - ".join([f"{k}: {repr(v)}" for k, v in exceptions.items()])) def parse_indirect(identifier: str) -> str: @@ -250,7 +238,7 @@ def parse_direct(identifier: str): class LogReader: - def _parse_identifier(self, identifier: str) -> list[LogPath]: + def _parse_identifier(self, identifier: str) -> list[str]: # useradmin, etc. identifier = parse_indirect(identifier) @@ -259,21 +247,14 @@ class LogReader: if direct_parsed is not None: return direct_source(identifier) - sr = SegmentRange(identifier) - mode = self.default_mode if sr.selector is None else ReadMode(sr.selector) - - identifiers = auto_source(sr, self.sources, mode) - - invalid_count = len(list(get_invalid_files(identifiers))) - assert invalid_count == 0, (f"{invalid_count}/{len(identifiers)} invalid log(s) found, please ensure all logs " + - "are uploaded or auto fallback to qlogs with '/a' selector at the end of the route name.") + identifiers = auto_source(identifier, self.sources, self.default_mode) return identifiers def __init__(self, identifier: str | list[str], default_mode: ReadMode = ReadMode.RLOG, sources: list[Source] = None, sort_by_time=False, only_union_types=False): if sources is None: - sources = [internal_source, internal_source_zst, openpilotci_source, openpilotci_source_zst, - comma_api_source, comma_car_segments_source, testing_closet_source] + sources = [internal_source, openpilotci_source, comma_api_source, + comma_car_segments_source, testing_closet_source] self.default_mode = default_mode self.sources = sources