process replay: logreader as bytes (#24610)

* willem's changes

* classmethod for bytes

* submodules

* submodules

* Update tools/lib/logreader.py

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>

* add back files

* little cleanup

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 86f73a507e
taco
Lukas Petersson 3 years ago committed by GitHub
parent eeccd04c11
commit a16a20005b
  1. 20
      selfdrive/test/process_replay/test_processes.py
  2. 21
      tools/lib/logreader.py

@ -12,6 +12,7 @@ from selfdrive.test.openpilotci import get_url, upload_file
from selfdrive.test.process_replay.compare_logs import compare_logs, save_log
from selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, check_enabled, replay_process
from selfdrive.version import get_commit
from tools.lib.filereader import FileReader
from tools.lib.logreader import LogReader
original_segments = [
@ -57,7 +58,8 @@ REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit")
def run_test_process(data):
segment, cfg, args, cur_log_fn, ref_log_path, lr = data
segment, cfg, args, cur_log_fn, ref_log_path, lr_dat = data
lr = LogReader.from_bytes(lr_dat)
res = None
if not args.upload_only:
res, log_msgs = test_process(cfg, lr, ref_log_path, args.ignore_fields, args.ignore_msgs)
@ -72,10 +74,10 @@ def run_test_process(data):
return (segment, cfg.proc_name, res)
def get_logreader(segment):
def get_log_data(segment):
r, n = segment.rsplit("--", 1)
lr = LogReader(get_url(r, n))
return (segment, lr)
with FileReader(get_url(r, n)) as f:
return (segment, f.read())
def test_process(cfg, lr, ref_log_path, ignore_fields=None, ignore_msgs=None):
@ -186,10 +188,10 @@ if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool:
if not args.upload_only:
download_segments = [seg for car, seg in segments if car in tested_cars]
lreaders: Dict[str, LogReader] = {}
p1 = pool.map(get_logreader, download_segments)
log_data: Dict[str, LogReader] = {}
p1 = pool.map(get_log_data, download_segments)
for segment, lr in tqdm(p1, desc="Getting Logs", total=len(download_segments)):
lreaders[segment] = lr
log_data[segment] = lr
pool_args: Any = []
for car_brand, segment in segments:
@ -207,8 +209,8 @@ if __name__ == "__main__":
ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2")
ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn)
lr = None if args.upload_only else lreaders[segment]
pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, lr))
dat = None if args.upload_only else log_data[segment]
pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat))
results: Any = defaultdict(dict)
p2 = pool.map(run_test_process, pool_args)

@ -74,22 +74,24 @@ class MultiLogIterator:
class LogReader:
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False):
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None):
self.data_version = None
self._only_union_types = only_union_types
ext = None
if not dat:
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
if ext not in ('', '.bz2'):
# old rlogs weren't bz2 compressed
raise Exception(f"unknown extension {ext}")
with FileReader(fn) as f:
dat = f.read()
if ext == "":
# old rlogs weren't bz2 compressed
ents = capnp_log.Event.read_multiple_bytes(dat)
elif ext == ".bz2":
if ext == ".bz2" or dat.startswith(b'BZh9'):
dat = bz2.decompress(dat)
ents = capnp_log.Event.read_multiple_bytes(dat)
else:
raise Exception(f"unknown extension {ext}")
_ents = []
try:
@ -101,6 +103,10 @@ class LogReader:
self._ents = list(sorted(_ents, key=lambda x: x.logMonoTime) if sort_by_time else _ents)
self._ts = [x.logMonoTime for x in self._ents]
@classmethod
def from_bytes(cls, dat):
return cls("", dat=dat)
def __iter__(self):
for ent in self._ents:
if self._only_union_types:
@ -112,7 +118,6 @@ class LogReader:
else:
yield ent
def logreader_from_route_or_segment(r, sort_by_time=False):
sn = SegmentName(r, allow_route_name=True)
route = Route(sn.route_name.canonical_name)

Loading…
Cancel
Save