|
|
|
@ -1,6 +1,6 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import bz2 |
|
|
|
|
from functools import cache, partial |
|
|
|
|
from functools import partial |
|
|
|
|
import multiprocessing |
|
|
|
|
import capnp |
|
|
|
|
import enum |
|
|
|
@ -13,14 +13,15 @@ import warnings |
|
|
|
|
import zstandard as zstd |
|
|
|
|
|
|
|
|
|
from collections.abc import Callable, Iterable, Iterator |
|
|
|
|
from typing import cast |
|
|
|
|
from urllib.parse import parse_qs, urlparse |
|
|
|
|
|
|
|
|
|
from cereal import log as capnp_log |
|
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
|
from openpilot.tools.lib.comma_car_segments import get_url as get_comma_segments_url |
|
|
|
|
from openpilot.tools.lib.openpilotci import get_url |
|
|
|
|
from openpilot.tools.lib.filereader import FileReader, file_exists, internal_source_available |
|
|
|
|
from openpilot.tools.lib.route import Route, SegmentRange |
|
|
|
|
from openpilot.tools.lib.filereader import DATA_ENDPOINT, FileReader, file_exists, internal_source_available |
|
|
|
|
from openpilot.tools.lib.route import QCAMERA_FILENAMES, CAMERA_FILENAMES, DCAMERA_FILENAMES, ECAMERA_FILENAMES, Route, SegmentRange |
|
|
|
|
from openpilot.tools.lib.log_time_series import msgs_to_time_series |
|
|
|
|
|
|
|
|
|
LogMessage = type[capnp._DynamicStructReader] |
|
|
|
@ -96,14 +97,21 @@ class _LogFileReader: |
|
|
|
|
class ReadMode(enum.StrEnum): |
|
|
|
|
RLOG = "r" # only read rlogs |
|
|
|
|
QLOG = "q" # only read qlogs |
|
|
|
|
SANITIZED = "s" # read from the commaCarSegments database |
|
|
|
|
AUTO = "a" # default to rlogs, fallback to qlogs |
|
|
|
|
AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FileName(enum.Enum): |
|
|
|
|
RLOG = ("rlog.zst", "rlog.bz2") |
|
|
|
|
QLOG = ("qlog.zst", "qlog.bz2") |
|
|
|
|
QCAMERA = tuple(QCAMERA_FILENAMES) |
|
|
|
|
FCAMERA = tuple(CAMERA_FILENAMES) |
|
|
|
|
ECAMERA = tuple(ECAMERA_FILENAMES) |
|
|
|
|
DCAMERA = tuple(DCAMERA_FILENAMES) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
LogPath = str | None |
|
|
|
|
ValidFileCallable = Callable[[LogPath], bool] |
|
|
|
|
Source = Callable[[SegmentRange, ReadMode], list[LogPath]] |
|
|
|
|
Source = Callable[[SegmentRange, FileName], list[LogPath]] |
|
|
|
|
|
|
|
|
|
InternalUnavailableException = Exception("Internal source not available") |
|
|
|
|
|
|
|
|
@ -112,139 +120,129 @@ class LogsUnavailable(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cache |
|
|
|
|
def default_valid_file(fn: LogPath) -> bool: |
|
|
|
|
return fn is not None and file_exists(fn) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def auto_strategy(rlog_paths: list[LogPath], qlog_paths: list[LogPath], interactive: bool, valid_file: ValidFileCallable) -> list[LogPath]: |
|
|
|
|
# auto select logs based on availability |
|
|
|
|
missing_rlogs = [rlog is None or not valid_file(rlog) for rlog in rlog_paths].count(True) |
|
|
|
|
if missing_rlogs != 0: |
|
|
|
|
if interactive: |
|
|
|
|
if input(f"{missing_rlogs}/{len(rlog_paths)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/n) ").lower() != "y": |
|
|
|
|
return rlog_paths |
|
|
|
|
else: |
|
|
|
|
cloudlog.warning(f"{missing_rlogs}/{len(rlog_paths)} rlogs were not found, falling back to qlogs for those segments...") |
|
|
|
|
|
|
|
|
|
return [rlog if valid_file(rlog) else (qlog if valid_file(qlog) else None) |
|
|
|
|
for (rlog, qlog) in zip(rlog_paths, qlog_paths, strict=True)] |
|
|
|
|
return rlog_paths |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def apply_strategy(mode: ReadMode, rlog_paths: list[LogPath], qlog_paths: list[LogPath], valid_file: ValidFileCallable = default_valid_file) -> list[LogPath]: |
|
|
|
|
if mode == ReadMode.RLOG: |
|
|
|
|
return rlog_paths |
|
|
|
|
elif mode == ReadMode.QLOG: |
|
|
|
|
return qlog_paths |
|
|
|
|
elif mode == ReadMode.AUTO: |
|
|
|
|
return auto_strategy(rlog_paths, qlog_paths, False, valid_file) |
|
|
|
|
elif mode == ReadMode.AUTO_INTERACTIVE: |
|
|
|
|
return auto_strategy(rlog_paths, qlog_paths, True, valid_file) |
|
|
|
|
raise ValueError(f"invalid mode: {mode}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def comma_api_source(sr: SegmentRange, mode: ReadMode) -> list[LogPath]: |
|
|
|
|
def comma_api_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: |
|
|
|
|
route = Route(sr.route_name) |
|
|
|
|
|
|
|
|
|
rlog_paths = [route.log_paths()[seg] for seg in sr.seg_idxs] |
|
|
|
|
qlog_paths = [route.qlog_paths()[seg] for seg in sr.seg_idxs] |
|
|
|
|
|
|
|
|
|
# comma api will have already checked if the file exists |
|
|
|
|
def valid_file(fn): |
|
|
|
|
return fn is not None |
|
|
|
|
|
|
|
|
|
return apply_strategy(mode, rlog_paths, qlog_paths, valid_file=valid_file) |
|
|
|
|
if fns == FileName.RLOG: |
|
|
|
|
return [route.log_paths()[seg] for seg in sr.seg_idxs] |
|
|
|
|
else: |
|
|
|
|
return [route.qlog_paths()[seg] for seg in sr.seg_idxs] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def internal_source(sr: SegmentRange, mode: ReadMode, file_ext: str = "bz2") -> list[LogPath]: |
|
|
|
|
if not internal_source_available(): |
|
|
|
|
def internal_source(sr: SegmentRange, fns: FileName, endpoint_url: str = DATA_ENDPOINT) -> list[LogPath]: |
|
|
|
|
if not internal_source_available(endpoint_url): |
|
|
|
|
raise InternalUnavailableException |
|
|
|
|
|
|
|
|
|
def get_internal_url(sr: SegmentRange, seg, file): |
|
|
|
|
return f"cd:/{sr.dongle_id}/{sr.log_id}/{seg}/{file}.{file_ext}" |
|
|
|
|
return f"{endpoint_url.rstrip('/')}/{sr.dongle_id}/{sr.log_id}/{seg}/{file}" |
|
|
|
|
|
|
|
|
|
# TODO: list instead of using static URLs to support routes with multiple file extensions |
|
|
|
|
rlog_paths = [get_internal_url(sr, seg, "rlog") for seg in sr.seg_idxs] |
|
|
|
|
qlog_paths = [get_internal_url(sr, seg, "qlog") for seg in sr.seg_idxs] |
|
|
|
|
return eval_source([[get_internal_url(sr, seg, fn) for fn in fns.value] for seg in sr.seg_idxs]) |
|
|
|
|
|
|
|
|
|
return apply_strategy(mode, rlog_paths, qlog_paths) |
|
|
|
|
|
|
|
|
|
def openpilotci_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: |
|
|
|
|
return eval_source([[get_url(sr.route_name, seg, fn) for fn in fns.value] for seg in sr.seg_idxs]) |
|
|
|
|
|
|
|
|
|
def internal_source_zst(sr: SegmentRange, mode: ReadMode, file_ext: str = "zst") -> list[LogPath]: |
|
|
|
|
return internal_source(sr, mode, file_ext) |
|
|
|
|
|
|
|
|
|
def comma_car_segments_source(sr: SegmentRange, fns: FileName) -> list[LogPath]: |
|
|
|
|
return eval_source([get_comma_segments_url(sr.route_name, seg) for seg in sr.seg_idxs]) |
|
|
|
|
|
|
|
|
|
def openpilotci_source(sr: SegmentRange, mode: ReadMode, file_ext: str = "bz2") -> list[LogPath]: |
|
|
|
|
rlog_paths = [get_url(sr.route_name, seg, f"rlog.{file_ext}") for seg in sr.seg_idxs] |
|
|
|
|
qlog_paths = [get_url(sr.route_name, seg, f"qlog.{file_ext}") for seg in sr.seg_idxs] |
|
|
|
|
|
|
|
|
|
return apply_strategy(mode, rlog_paths, qlog_paths) |
|
|
|
|
def direct_source(file_or_url: str) -> list[str]: |
|
|
|
|
return [file_or_url] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def openpilotci_source_zst(sr: SegmentRange, mode: ReadMode) -> list[LogPath]: |
|
|
|
|
return openpilotci_source(sr, mode, "zst") |
|
|
|
|
def eval_source(files: list[list[str] | str]) -> list[LogPath]: |
|
|
|
|
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst) |
|
|
|
|
valid_files: list[LogPath] = [] |
|
|
|
|
|
|
|
|
|
for urls in files: |
|
|
|
|
if isinstance(urls, str): |
|
|
|
|
urls = [urls] |
|
|
|
|
|
|
|
|
|
def comma_car_segments_source(sr: SegmentRange, mode=ReadMode.RLOG) -> list[LogPath]: |
|
|
|
|
return [get_comma_segments_url(sr.route_name, seg) for seg in sr.seg_idxs] |
|
|
|
|
for url in urls: |
|
|
|
|
if file_exists(url): |
|
|
|
|
valid_files.append(url) |
|
|
|
|
break |
|
|
|
|
else: |
|
|
|
|
valid_files.append(None) |
|
|
|
|
|
|
|
|
|
return valid_files |
|
|
|
|
|
|
|
|
|
def testing_closet_source(sr: SegmentRange, mode=ReadMode.RLOG) -> list[LogPath]: |
|
|
|
|
if not internal_source_available('http://testing.comma.life'): |
|
|
|
|
raise InternalUnavailableException |
|
|
|
|
return [f"http://testing.comma.life/download/{sr.route_name.replace('|', '/')}/{seg}/rlog" for seg in sr.seg_idxs] |
|
|
|
|
|
|
|
|
|
def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode) -> list[str]: |
|
|
|
|
exceptions = {} |
|
|
|
|
|
|
|
|
|
def direct_source(file_or_url: str) -> list[LogPath]: |
|
|
|
|
return [file_or_url] |
|
|
|
|
|
|
|
|
|
sr = SegmentRange(identifier) |
|
|
|
|
mode = default_mode if sr.selector is None else ReadMode(sr.selector) |
|
|
|
|
|
|
|
|
|
def get_invalid_files(files): |
|
|
|
|
for f in files: |
|
|
|
|
if f is None or not file_exists(f): |
|
|
|
|
yield f |
|
|
|
|
if mode == ReadMode.QLOG: |
|
|
|
|
try_fns = [FileName.QLOG] |
|
|
|
|
else: |
|
|
|
|
try_fns = [FileName.RLOG] |
|
|
|
|
|
|
|
|
|
# If selector allows it, fallback to qlogs |
|
|
|
|
if mode in (ReadMode.AUTO, ReadMode.AUTO_INTERACTIVE): |
|
|
|
|
try_fns.append(FileName.QLOG) |
|
|
|
|
|
|
|
|
|
def check_source(source: Source, *args) -> list[LogPath]: |
|
|
|
|
files = source(*args) |
|
|
|
|
assert len(files) > 0, "No files on source" |
|
|
|
|
assert next(get_invalid_files(files), False) is False, "Some files are invalid" |
|
|
|
|
return files |
|
|
|
|
# Build a dict of valid files as we evaluate each source. May contain mix of rlogs, qlogs, and None. |
|
|
|
|
# This function only returns when we've sourced all files, or throws an exception |
|
|
|
|
valid_files: dict[int, LogPath] = {} |
|
|
|
|
for fn in try_fns: |
|
|
|
|
for source in sources: |
|
|
|
|
try: |
|
|
|
|
files = source(sr, fn) |
|
|
|
|
|
|
|
|
|
# Check every source returns an expected number of files |
|
|
|
|
assert len(files) == len(valid_files) or len(valid_files) == 0, f"Source {source.__name__} returned unexpected number of files" |
|
|
|
|
|
|
|
|
|
def auto_source(sr: SegmentRange, mode=ReadMode.RLOG, sources: list[Source] = None) -> list[LogPath]: |
|
|
|
|
if mode == ReadMode.SANITIZED: |
|
|
|
|
return comma_car_segments_source(sr, mode) |
|
|
|
|
# Build a dict of valid files |
|
|
|
|
for idx, f in enumerate(files): |
|
|
|
|
if valid_files.get(idx) is None: |
|
|
|
|
valid_files[idx] = f |
|
|
|
|
|
|
|
|
|
if sources is None: |
|
|
|
|
sources = [internal_source, internal_source_zst, openpilotci_source, openpilotci_source_zst, |
|
|
|
|
comma_api_source, comma_car_segments_source, testing_closet_source] |
|
|
|
|
exceptions = {} |
|
|
|
|
# We've found all files, return them |
|
|
|
|
if all(f is not None for f in valid_files.values()): |
|
|
|
|
return cast(list[str], list(valid_files.values())) |
|
|
|
|
|
|
|
|
|
# for automatic fallback modes, auto_source needs to first check if rlogs exist for any source |
|
|
|
|
if mode in [ReadMode.AUTO, ReadMode.AUTO_INTERACTIVE]: |
|
|
|
|
for source in sources: |
|
|
|
|
try: |
|
|
|
|
return check_source(source, sr, ReadMode.RLOG) |
|
|
|
|
except Exception: |
|
|
|
|
pass |
|
|
|
|
except Exception as e: |
|
|
|
|
exceptions[source.__name__] = e |
|
|
|
|
|
|
|
|
|
# Automatically determine viable source |
|
|
|
|
for source in sources: |
|
|
|
|
try: |
|
|
|
|
return check_source(source, sr, mode) |
|
|
|
|
except Exception as e: |
|
|
|
|
exceptions[source.__name__] = e |
|
|
|
|
if fn == try_fns[0]: |
|
|
|
|
missing_logs = list(valid_files.values()).count(None) |
|
|
|
|
if mode == ReadMode.AUTO: |
|
|
|
|
cloudlog.warning(f"{missing_logs}/{len(valid_files)} rlogs were not found, falling back to qlogs for those segments...") |
|
|
|
|
elif mode == ReadMode.AUTO_INTERACTIVE: |
|
|
|
|
if input(f"{missing_logs}/{len(valid_files)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/N) ").lower() != "y": |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
raise LogsUnavailable("auto_source could not find any valid source, exceptions for sources:\n - " + |
|
|
|
|
"\n - ".join([f"{k}: {repr(v)}" for k, v in exceptions.items()])) |
|
|
|
|
missing_logs = list(valid_files.values()).count(None) |
|
|
|
|
raise LogsUnavailable(f"{missing_logs}/{len(valid_files)} logs were not found, please ensure all logs " + |
|
|
|
|
"are uploaded. You can fall back to qlogs with '/a' selector at the end of the route name.\n\n" + |
|
|
|
|
"Exceptions for sources:\n - " + "\n - ".join([f"{k}: {repr(v)}" for k, v in exceptions.items()])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_indirect(identifier: str) -> str: |
|
|
|
|
if "useradmin.comma.ai" in identifier: |
|
|
|
|
query = parse_qs(urlparse(identifier).query) |
|
|
|
|
return query["onebox"][0] |
|
|
|
|
identifier = query["onebox"][0] |
|
|
|
|
elif "connect.comma.ai" in identifier: |
|
|
|
|
path = urlparse(identifier).path.strip("/").split("/") |
|
|
|
|
path = ['/'.join(path[:2]), *path[2:]] # recombine log id |
|
|
|
|
|
|
|
|
|
identifier = path[0] |
|
|
|
|
if len(path) > 2: |
|
|
|
|
# convert url with seconds to segments |
|
|
|
|
start, end = int(path[1]) // 60, int(path[2]) // 60 + 1 |
|
|
|
|
identifier = f"{identifier}/{start}:{end}" |
|
|
|
|
|
|
|
|
|
# add selector if it exists |
|
|
|
|
if len(path) > 3: |
|
|
|
|
identifier += f"/{path[3]}" |
|
|
|
|
else: |
|
|
|
|
# add selector if it exists |
|
|
|
|
identifier = "/".join(path) |
|
|
|
|
|
|
|
|
|
return identifier |
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -255,7 +253,7 @@ def parse_direct(identifier: str): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LogReader: |
|
|
|
|
def _parse_identifier(self, identifier: str) -> list[LogPath]: |
|
|
|
|
def _parse_identifier(self, identifier: str) -> list[str]: |
|
|
|
|
# useradmin, etc. |
|
|
|
|
identifier = parse_indirect(identifier) |
|
|
|
|
|
|
|
|
@ -264,20 +262,16 @@ class LogReader: |
|
|
|
|
if direct_parsed is not None: |
|
|
|
|
return direct_source(identifier) |
|
|
|
|
|
|
|
|
|
sr = SegmentRange(identifier) |
|
|
|
|
mode = self.default_mode if sr.selector is None else ReadMode(sr.selector) |
|
|
|
|
|
|
|
|
|
identifiers = self.source(sr, mode) |
|
|
|
|
|
|
|
|
|
invalid_count = len(list(get_invalid_files(identifiers))) |
|
|
|
|
assert invalid_count == 0, (f"{invalid_count}/{len(identifiers)} invalid log(s) found, please ensure all logs " + |
|
|
|
|
"are uploaded or auto fallback to qlogs with '/a' selector at the end of the route name.") |
|
|
|
|
identifiers = auto_source(identifier, self.sources, self.default_mode) |
|
|
|
|
return identifiers |
|
|
|
|
|
|
|
|
|
def __init__(self, identifier: str | list[str], default_mode: ReadMode = ReadMode.RLOG, |
|
|
|
|
source: Source = auto_source, sort_by_time=False, only_union_types=False): |
|
|
|
|
sources: list[Source] = None, sort_by_time=False, only_union_types=False): |
|
|
|
|
if sources is None: |
|
|
|
|
sources = [internal_source, openpilotci_source, comma_api_source, comma_car_segments_source] |
|
|
|
|
|
|
|
|
|
self.default_mode = default_mode |
|
|
|
|
self.source = source |
|
|
|
|
self.sources = sources |
|
|
|
|
self.identifier = identifier |
|
|
|
|
if isinstance(identifier, str): |
|
|
|
|
self.identifier = [identifier] |
|
|
|
|