From a16a20005b44a95cbe07bad9abb1334f98b46a7f Mon Sep 17 00:00:00 2001 From: Lukas Petersson Date: Thu, 2 Jun 2022 07:02:42 +0200 Subject: [PATCH] process replay: logreader as bytes (#24610) * willem's changes * classmethod for bytes * submodules * submodules * Update tools/lib/logreader.py Co-authored-by: Adeeb Shihadeh * add back files * little cleanup Co-authored-by: Adeeb Shihadeh old-commit-hash: 86f73a507e2dba56bc8c3a1e753818147fb0eb0f --- .../test/process_replay/test_processes.py | 20 +++++++------ tools/lib/logreader.py | 29 +++++++++++-------- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 694317d52f..1d5969b1ec 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -12,6 +12,7 @@ from selfdrive.test.openpilotci import get_url, upload_file from selfdrive.test.process_replay.compare_logs import compare_logs, save_log from selfdrive.test.process_replay.process_replay import CONFIGS, PROC_REPLAY_DIR, FAKEDATA, check_enabled, replay_process from selfdrive.version import get_commit +from tools.lib.filereader import FileReader from tools.lib.logreader import LogReader original_segments = [ @@ -57,7 +58,8 @@ REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") def run_test_process(data): - segment, cfg, args, cur_log_fn, ref_log_path, lr = data + segment, cfg, args, cur_log_fn, ref_log_path, lr_dat = data + lr = LogReader.from_bytes(lr_dat) res = None if not args.upload_only: res, log_msgs = test_process(cfg, lr, ref_log_path, args.ignore_fields, args.ignore_msgs) @@ -72,10 +74,10 @@ def run_test_process(data): return (segment, cfg.proc_name, res) -def get_logreader(segment): +def get_log_data(segment): r, n = segment.rsplit("--", 1) - lr = LogReader(get_url(r, n)) - return (segment, lr) + with FileReader(get_url(r, n)) as f: + return (segment, f.read()) def test_process(cfg, lr, ref_log_path, ignore_fields=None, ignore_msgs=None): @@ -186,10 +188,10 @@ if __name__ == "__main__": with concurrent.futures.ProcessPoolExecutor(max_workers=args.jobs) as pool: if not args.upload_only: download_segments = [seg for car, seg in segments if car in tested_cars] - lreaders: Dict[str, LogReader] = {} - p1 = pool.map(get_logreader, download_segments) + log_data: Dict[str, LogReader] = {} + p1 = pool.map(get_log_data, download_segments) for segment, lr in tqdm(p1, desc="Getting Logs", total=len(download_segments)): - lreaders[segment] = lr + log_data[segment] = lr pool_args: Any = [] for car_brand, segment in segments: @@ -207,8 +209,8 @@ if __name__ == "__main__": ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) - lr = None if args.upload_only else lreaders[segment] - pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, lr)) + dat = None if args.upload_only else log_data[segment] + pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat)) results: Any = defaultdict(dict) p2 = pool.map(run_test_process, pool_args) diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index b4d3de67fb..46c648ef12 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -74,22 +74,24 @@ class MultiLogIterator: class LogReader: - def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False): + def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None): self.data_version = None self._only_union_types = only_union_types - _, ext = os.path.splitext(urllib.parse.urlparse(fn).path) - with FileReader(fn) as f: - dat = f.read() + ext = None + if not dat: + _, ext = os.path.splitext(urllib.parse.urlparse(fn).path) + if ext not in ('', '.bz2'): + # old rlogs weren't bz2 compressed + raise Exception(f"unknown extension {ext}") - if ext == "": - # old rlogs weren't bz2 compressed - ents = capnp_log.Event.read_multiple_bytes(dat) - elif ext == ".bz2": + with FileReader(fn) as f: + dat = f.read() + + if ext == ".bz2" or dat.startswith(b'BZh9'): dat = bz2.decompress(dat) - ents = capnp_log.Event.read_multiple_bytes(dat) - else: - raise Exception(f"unknown extension {ext}") + + ents = capnp_log.Event.read_multiple_bytes(dat) _ents = [] try: @@ -101,6 +103,10 @@ class LogReader: self._ents = list(sorted(_ents, key=lambda x: x.logMonoTime) if sort_by_time else _ents) self._ts = [x.logMonoTime for x in self._ents] + @classmethod + def from_bytes(cls, dat): + return cls("", dat=dat) + def __iter__(self): for ent in self._ents: if self._only_union_types: @@ -112,7 +118,6 @@ class LogReader: else: yield ent - def logreader_from_route_or_segment(r, sort_by_time=False): sn = SegmentName(r, allow_route_name=True) route = Route(sn.route_name.canonical_name)