File sourcing: simplify return type (#36028)

* rm str | none pattern

* clean up

* more clean up

* stash

* Revert "stash"

This reverts commit 3e2472160c.

* fix da prints

* fix cmt
pull/36025/head
Shane Smiskol 2 days ago committed by GitHub
parent 927548621b
commit 5ec9aee216
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 43
      tools/lib/logreader.py

@ -140,9 +140,8 @@ class ReadMode(enum.StrEnum):
AUTO_INTERACTIVE = "i" # default to rlogs, fallback to qlogs with a prompt from the user
LogPath = str | None
LogFileName = tuple[str, ...]
Source = Callable[[SegmentRange, list[int], LogFileName], dict[int, LogPath]]
Source = Callable[[SegmentRange, list[int], LogFileName], dict[int, str]]
InternalUnavailableException = Exception("Internal source not available")
@ -151,17 +150,17 @@ class LogsUnavailable(Exception):
pass
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, LogPath]:
def comma_api_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
route = Route(sr.route_name)
# comma api will have already checked if the file exists
if fns == FileName.RLOG:
return {seg: route.log_paths()[seg] for seg in seg_idxs}
return {seg: route.log_paths()[seg] for seg in seg_idxs if route.log_paths()[seg] is not None}
else:
return {seg: route.qlog_paths()[seg] for seg in seg_idxs}
return {seg: route.qlog_paths()[seg] for seg in seg_idxs if route.qlog_paths()[seg] is not None}
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, endpoint_url: str = DATA_ENDPOINT) -> dict[int, LogPath]:
def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, endpoint_url: str = DATA_ENDPOINT) -> dict[int, str]:
if not internal_source_available(endpoint_url):
raise InternalUnavailableException
@ -171,11 +170,11 @@ def internal_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName, end
return eval_source({seg: [get_internal_url(sr, seg, fn) for fn in fns] for seg in seg_idxs})
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, LogPath]:
def openpilotci_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: [get_url(sr.route_name, seg, fn) for fn in fns] for seg in seg_idxs})
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, LogPath]:
def comma_car_segments_source(sr: SegmentRange, seg_idxs: list[int], fns: LogFileName) -> dict[int, str]:
return eval_source({seg: get_comma_segments_url(sr.route_name, seg) for seg in seg_idxs})
@ -183,21 +182,19 @@ def direct_source(file_or_url: str) -> list[str]:
return [file_or_url]
def eval_source(files: dict[int, list[str] | str]) -> dict[int, LogPath]:
def eval_source(files: dict[int, list[str] | str]) -> dict[int, str]:
# Returns valid file URLs given a list of possible file URLs for each segment (e.g. rlog.bz2, rlog.zst)
valid_files: dict[int, LogPath] = {}
valid_files: dict[int, str] = {}
for seg_idx, urls in files.items():
if isinstance(urls, str):
urls = [urls]
# Add first valid file URL or None
# Add first valid file URL
for url in urls:
if file_exists(url):
valid_files[seg_idx] = url
break
else:
valid_files[seg_idx] = None
return valid_files
@ -220,37 +217,35 @@ def auto_source(identifier: str, sources: list[Source], default_mode: ReadMode)
# Build a dict of valid files as we evaluate each source. May contain mix of rlogs, qlogs, and None.
# This function only returns when we've sourced all files, or throws an exception
valid_files: dict[int, LogPath] = {}
valid_files: dict[int, str] = {}
for fn in try_fns:
for source in sources:
try:
files = source(sr, needed_seg_idxs, fn)
# Build a dict of valid files
for idx, f in files.items():
if valid_files.get(idx) is None:
valid_files[idx] = f
valid_files |= files
# Don't check for segment files that have already been found
needed_seg_idxs = [idx for idx in needed_seg_idxs if valid_files.get(idx) is None]
needed_seg_idxs = [idx for idx in needed_seg_idxs if idx not in valid_files]
# We've found all files, return them
if all(f is not None for f in valid_files.values()):
if len(needed_seg_idxs) == 0:
return cast(list[str], list(valid_files.values()))
except Exception as e:
exceptions[source.__name__] = e
if fn == try_fns[0]:
missing_logs = list(valid_files.values()).count(None)
missing_logs = len(needed_seg_idxs)
if mode == ReadMode.AUTO:
cloudlog.warning(f"{missing_logs}/{len(valid_files)} rlogs were not found, falling back to qlogs for those segments...")
cloudlog.warning(f"{missing_logs}/{len(sr.seg_idxs)} rlogs were not found, falling back to qlogs for those segments...")
elif mode == ReadMode.AUTO_INTERACTIVE:
if input(f"{missing_logs}/{len(valid_files)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/N) ").lower() != "y":
if input(f"{missing_logs}/{len(sr.seg_idxs)} rlogs were not found, would you like to fallback to qlogs for those segments? (y/N) ").lower() != "y":
break
missing_logs = list(valid_files.values()).count(None)
raise LogsUnavailable(f"{missing_logs}/{len(valid_files)} logs were not found, please ensure all logs " +
missing_logs = len(needed_seg_idxs)
raise LogsUnavailable(f"{missing_logs}/{len(sr.seg_idxs)} logs were not found, please ensure all logs " +
"are uploaded. You can fall back to qlogs with '/a' selector at the end of the route name.\n\n" +
"Exceptions for sources:\n - " + "\n - ".join([f"{k}: {repr(v)}" for k, v in exceptions.items()]))

Loading…
Cancel
Save