openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

141 lines
4.3 KiB

import enum
import numpy as np
import pathlib
import re
from urllib.parse import parse_qs, urlparse
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.tools.lib.helpers import RE
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.route import Route, SegmentRange
class ReadMode(enum.StrEnum):
RLOG = "r" # only read rlogs
QLOG = "q" # only read qlogs
#AUTO = "a" # default to rlogs, fallback to qlogs, not supported yet
def create_slice_from_string(s: str):
m = re.fullmatch(RE.SLICE, s)
assert m is not None, f"Invalid slice: {s}"
start, end, step = m.groups()
start = int(start) if start is not None else None
end = int(end) if end is not None else None
step = int(step) if step is not None else None
if start is not None and ":" not in s and end is None and step is None:
return start
return slice(start, end, step)
def parse_slice(sr: SegmentRange):
route = Route(sr.route_name)
segs = np.arange(route.max_seg_number+1)
s = create_slice_from_string(sr._slice)
return segs[s] if isinstance(s, slice) else [segs[s]]
def comma_api_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
route = Route(sr.route_name)
log_paths = route.log_paths() if mode == ReadMode.RLOG else route.qlog_paths()
invalid_segs = [seg for seg in segs if log_paths[seg] is None]
assert not len(invalid_segs), f"Some of the requested segments are not available: {invalid_segs}"
for seg in segs:
yield LogReader(log_paths[seg], sort_by_time=sort_by_time)
def internal_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
for seg in segs:
yield LogReader(f"cd:/{sr.dongle_id}/{sr.timestamp}/{seg}/{'rlog' if mode == ReadMode.RLOG else 'qlog'}.bz2", sort_by_time=sort_by_time)
def openpilotci_source(sr: SegmentRange, mode=ReadMode.RLOG, sort_by_time=False):
segs = parse_slice(sr)
for seg in segs:
yield LogReader(get_url(sr.route_name, seg, 'rlog' if mode == ReadMode.RLOG else 'qlog'), sort_by_time=sort_by_time)
def direct_source(file_or_url, sort_by_time):
yield LogReader(file_or_url, sort_by_time=sort_by_time)
def auto_source(*args, **kwargs):
# Automatically determine viable source
try:
next(internal_source(*args, **kwargs))
return internal_source(*args, **kwargs)
except Exception:
pass
try:
next(openpilotci_source(*args, **kwargs))
return openpilotci_source(*args, **kwargs)
except Exception:
pass
return comma_api_source(*args, **kwargs)
def parse_useradmin(identifier):
if "useradmin.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["onebox"][0]
return None
def parse_cabana(identifier):
if "cabana.comma.ai" in identifier:
query = parse_qs(urlparse(identifier).query)
return query["route"][0]
return None
def parse_cd(identifier):
if "cd:/" in identifier:
return identifier.replace("cd:/", "")
return None
def parse_direct(identifier):
if "https://" in identifier or "http://" in identifier or pathlib.Path(identifier).exists():
return identifier
return None
def parse_indirect(identifier):
parsed = parse_useradmin(identifier) or parse_cabana(identifier)
if parsed is not None:
return parsed, comma_api_source, True
parsed = parse_cd(identifier)
if parsed is not None:
return parsed, internal_source, True
return identifier, None, False
class SegmentRangeReader:
def _logreaders_from_identifier(self, identifier):
parsed, source, is_indirect = parse_indirect(identifier)
if not is_indirect:
direct_parsed = parse_direct(identifier)
if direct_parsed is not None:
return direct_source(identifier, sort_by_time=self.sort_by_time)
sr = SegmentRange(parsed)
mode = self.default_mode if sr.selector is None else ReadMode(sr.selector)
source = self.default_source if source is None else source
return source(sr, mode, sort_by_time=self.sort_by_time)
def __init__(self, identifier: str, default_mode=ReadMode.RLOG, default_source=auto_source, sort_by_time=False):
self.default_mode = default_mode
self.default_source = default_source
self.sort_by_time = sort_by_time
self.lrs = self._logreaders_from_identifier(identifier)
def __iter__(self):
for lr in self.lrs:
for m in lr:
yield m