File sourcing: Not all files are logs (#36025)

* Not all files are logs

* more refactor

* linting ok

* fix tests

* import exception

* whoops forgot to git add

* fix

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
pull/36029/head
Harald Schäfer 3 days ago committed by GitHub
parent 5ec9aee216
commit 18b7ddef8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 57
      tools/lib/file_sources.py
  2. 61
      tools/lib/logreader.py
  3. 5
      tools/lib/tests/test_logreader.py

@ -0,0 +1,57 @@
from collections.abc import Callable
from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url
from openpilot.tools.lib.openpilotci import get_url
from openpilot.tools.lib.filereader import DATA_ENDPOINT, file_exists, internal_source_available
from openpilot.tools.lib.route import Route, SegmentRange, FileName
# When passed a tuple of file names, each source will return the first that exists (rlog.zst, rlog.bz2)
FileNames = tuple[str, ...]
Source = Callable[[SegmentRange, list[int], FileNames], dict[int, str]]
InternalUnavailableException = Exception("Internal source not available")
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
route = Route(sr.route_name)
# comma api will have already checked if the file exists
if fns == FileName.RLOG:
return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None}
else:
return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None}
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]:
if not internal_source_available(endpoint_url):
raise InternalUnavailableException
def get_internal_url(sr: SegmentRange, seg, file):
return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}"
return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs})
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs})
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs})
def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]:
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst)
valid_files: dict[int, str] = {}
for seg_idx, urls in files.items():
if isinstance(urls, str):
urls = [urls]
# Add first valid file URL
for url in urls:
if file_exists(url):
valid_files[seg_idx] = url
break
return valid_files

@ -12,16 +12,15 @@ import urllib.parse
import warnings import warnings
import zstandard as zstd import zstandard as zstd
from collections.abc import Callable, Iterable, Iterator from collections.abc import Iterable, Iterator
from typing import cast from typing import cast
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs, urlparse
from cereal import log as capnp_log from cereal import log as capnp_log
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url from openpilot.tools.lib.filereader import FileReader
from openpilot.tools.lib.openpilotci import get_url from openpilot.tools.lib.file_sources import comma_api_source, internal_source, openpilotci_source, comma_car_segments_source, Source
from openpilot.tools.lib.filereader import DATA_ENDPOINT, FileReader, file_exists, internal_source_available from openpilot.tools.lib.route import SegmentRange, FileName
from openpilot.tools.lib.route import Route, SegmentRange, FileName
from openpilot.tools.lib.log_time_series import msgs_to_time_series from openpilot.tools.lib.log_time_series import msgs_to_time_series
LogMessage = type[capnp._DynamicStructReader] LogMessage = type[capnp._DynamicStructReader]
@ -140,65 +139,15 @@ class ReadMode(enum.StrEnum):
AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user
LogFileName = tuple[str, ...]
Source = Callable[[SegmentRange, list[int], LogFileName], dict[int, str]]
InternalUnavailableException = Exception("Internal source not available")
class LogsUnavailable(Exception): class LogsUnavailable(Exception):
pass pass
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
route = Route(sr.route_name)
# comma api will have already checked if the file exists
if fns == FileName.RLOG:
return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None}
else:
return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None}
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]:
if not internal_source_available(endpoint_url):
raise InternalUnavailableException
def get_internal_url(sr: SegmentRange, seg, file):
return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}"
return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs})
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs})
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs})
def direct_source(file_or_url: str) -> list[str]: def direct_source(file_or_url: str) -> list[str]:
return [file_or_url] return [file_or_url]
def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]: # TODO this should apply to camera files as well
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst)
valid_files: dict[int, str] = {}
for seg_idx, urls in files.items():
if isinstance(urls, str):
urls = [urls]
# Add first valid file URL
for url in urls:
if file_exists(url):
valid_files[seg_idx] = url
break
return valid_files
def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]: def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]:
exceptions = {} exceptions = {}

@ -10,7 +10,8 @@ import requests
from parameterized import parameterized from parameterized import parameterized
from cereal import log as capnp_log from cereal import log as capnp_log
from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, comma_api_source, parse_indirect, ReadMode, InternalUnavailableException from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, parse_indirect, ReadMode
from openpilot.tools.lib.log_sources import comma_api_source, InternalUnavailableException
from openpilot.tools.lib.route import SegmentRange from openpilot.tools.lib.route import SegmentRange
from openpilot.tools.lib.url_file import URLFileException from openpilot.tools.lib.url_file import URLFileException
@ -90,7 +91,7 @@ class TestLogReader:
@pytest.mark.parametrize("cache_enabled", [True, False]) @pytest.mark.parametrize("cache_enabled", [True, False])
def test_direct_parsing(self, mocker, cache_enabled): def test_direct_parsing(self, mocker, cache_enabled):
file_exists_mock = mocker.patch("openpilot.tools.lib.logreader.file_exists") file_exists_mock = mocker.patch("openpilot.tools.lib.filereader.file_exists")
os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0" os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0"
qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False) qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False)

Loading…
Cancel
Save