Revert "File sourcing: Not all files are logs (#36025)"

This reverts commit 18b7ddef8f.
pull/36034/head
Maxime Desroches 2 days ago
parent dd5f5fdb98
commit 3570022b9a
  1. 57
      tools/lib/file_sources.py
  2. 61
      tools/lib/logreader.py
  3. 5
      tools/lib/tests/test_logreader.py

@ -1,57 +0,0 @@
from collections.abc import Callable
from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url
from openpilot.tools.lib.openpilotci import get_url
from openpilot.tools.lib.filereader import DATA_ENDPOINT, file_exists, internal_source_available
from openpilot.tools.lib.route import Route, SegmentRange, FileName
# When passed a tuple of file names, each source will return the first that exists (rlog.zst, rlog.bz2)
FileNames = tuple[str, ...]
Source = Callable[[SegmentRange, list[int], FileNames], dict[int, str]]
InternalUnavailableException = Exception("Internal source not available")
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
route = Route(sr.route_name)
# comma api will have already checked if the file exists
if fns == FileName.RLOG:
return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None}
else:
return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None}
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]:
if not internal_source_available(endpoint_url):
raise InternalUnavailableException
def get_internal_url(sr: SegmentRange, seg, file):
return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}"
return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs})
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs})
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: FileNames) -> dict[int, str]:
return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs})
def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]:
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst)
valid_files: dict[int, str] = {}
for seg_idx, urls in files.items():
if isinstance(urls, str):
urls = [urls]
# Add first valid file URL
for url in urls:
if file_exists(url):
valid_files[seg_idx] = url
break
return valid_files

@ -12,15 +12,16 @@ import urllib.parse
import warnings import warnings
import zstandard as zstd import zstandard as zstd
from collections.abc import Iterable, Iterator from collections.abc import Callable, Iterable, Iterator
from typing import cast from typing import cast
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs, urlparse
from cereal import log as capnp_log from cereal import log as capnp_log
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.tools.lib.filereader import FileReader from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url
from openpilot.tools.lib.file_sources import comma_api_source, internal_source, openpilotci_source, comma_car_segments_source, Source from openpilot.tools.lib.openpilotci import get_url
from openpilot.tools.lib.route import SegmentRange, FileName from openpilot.tools.lib.filereader import DATA_ENDPOINT, FileReader, file_exists, internal_source_available
from openpilot.tools.lib.route import Route, SegmentRange, FileName
from openpilot.tools.lib.log_time_series import msgs_to_time_series from openpilot.tools.lib.log_time_series import msgs_to_time_series
LogMessage = type[capnp._DynamicStructReader] LogMessage = type[capnp._DynamicStructReader]
@ -139,15 +140,65 @@ class ReadMode(enum.StrEnum):
AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user
LogFileName = tuple[str, ...]
Source = Callable[[SegmentRange, list[int], LogFileName], dict[int, str]]
InternalUnavailableException = Exception("Internal source not available")
class LogsUnavailable(Exception): class LogsUnavailable(Exception):
pass pass
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
route = Route(sr.route_name)
# comma api will have already checked if the file exists
if fns == FileName.RLOG:
return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None}
else:
return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None}
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]:
if not internal_source_available(endpoint_url):
raise InternalUnavailableException
def get_internal_url(sr: SegmentRange, seg, file):
return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}"
return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs})
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs})
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs})
def direct_source(file_or_url: str) -> list[str]: def direct_source(file_or_url: str) -> list[str]:
return [file_or_url] return [file_or_url]
# TODO this should apply to camera files as well def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]:
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst)
valid_files: dict[int, str] = {}
for seg_idx, urls in files.items():
if isinstance(urls, str):
urls = [urls]
# Add first valid file URL
for url in urls:
if file_exists(url):
valid_files[seg_idx] = url
break
return valid_files
def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]: def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]:
exceptions = {} exceptions = {}

@ -10,8 +10,7 @@ import requests
from parameterized import parameterized from parameterized import parameterized
from cereal import log as capnp_log from cereal import log as capnp_log
from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, parse_indirect, ReadMode from openpilot.tools.lib.logreader import LogsUnavailable, LogIterable, LogReader, comma_api_source, parse_indirect, ReadMode, InternalUnavailableException
from openpilot.tools.lib.log_sources import comma_api_source, InternalUnavailableException
from openpilot.tools.lib.route import SegmentRange from openpilot.tools.lib.route import SegmentRange
from openpilot.tools.lib.url_file import URLFileException from openpilot.tools.lib.url_file import URLFileException
@ -91,7 +90,7 @@ class TestLogReader:
@pytest.mark.parametrize("cache_enabled", [True, False]) @pytest.mark.parametrize("cache_enabled", [True, False])
def test_direct_parsing(self, mocker, cache_enabled): def test_direct_parsing(self, mocker, cache_enabled):
file_exists_mock = mocker.patch("openpilot.tools.lib.filereader.file_exists") file_exists_mock = mocker.patch("openpilot.tools.lib.logreader.file_exists")
os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0" os.environ["FILEREADER_CACHE"] = "1" if cache_enabled else "0"
qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False) qlog = tempfile.NamedTemporaryFile(mode='wb', delete=False)

Loading…
Cancel
Save