make normal logreader more robust (#24577)

old-commit-hash: 194e5fdf1c
taco
Willem Melching 3 years ago committed by GitHub
parent 1d9d8d874b
commit f957c16a7b
  1. 8
      selfdrive/debug/filter_log_message.py
  2. 18
      tools/lib/logreader.py
  3. 60
      tools/lib/robust_logreader.py
  4. 4
      tools/plotjuggler/juggle.py

@ -1,9 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import argparse import argparse
import json import json
import cereal.messaging as messaging import cereal.messaging as messaging
from tools.lib.robust_logreader import RobustLogReader as LogReader from tools.lib.logreader import LogReader
from tools.lib.route import Route from tools.lib.route import Route
LEVELS = { LEVELS = {
@ -46,15 +47,18 @@ def print_androidlog(t, msg):
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--level', default='DEBUG') parser.add_argument('--level', default='DEBUG')
parser.add_argument('--addr', default='127.0.0.1') parser.add_argument('--addr', default='127.0.0.1')
parser.add_argument("route", type=str, nargs='*', help="route name + segment number for offline usage") parser.add_argument("route", type=str, nargs='*', help="route name + segment number for offline usage")
args = parser.parse_args() args = parser.parse_args()
print(args)
logs = None logs = None
if len(args.route): if len(args.route):
if os.path.exists(args.route[0]):
logs = [args.route[0]]
else:
r = Route(args.route[0]) r = Route(args.route[0])
logs = [q_log if r_log is None else r_log for (q_log, r_log) in zip(r.qlog_paths(), r.log_paths())] logs = [q_log if r_log is None else r_log for (q_log, r_log) in zip(r.qlog_paths(), r.log_paths())]

@ -4,6 +4,8 @@ import sys
import bz2 import bz2
import urllib.parse import urllib.parse
import capnp import capnp
import warnings
from cereal import log as capnp_log from cereal import log as capnp_log
from tools.lib.filereader import FileReader from tools.lib.filereader import FileReader
@ -70,9 +72,12 @@ class MultiLogIterator:
def reset(self): def reset(self):
self.__init__(self._log_paths, sort_by_time=self.sort_by_time) self.__init__(self._log_paths, sort_by_time=self.sort_by_time)
class LogReader: class LogReader:
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False): def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False):
data_version = None self.data_version = None
self._only_union_types = only_union_types
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path) _, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f: with FileReader(fn) as f:
dat = f.read() dat = f.read()
@ -86,10 +91,15 @@ class LogReader:
else: else:
raise Exception(f"unknown extension {ext}") raise Exception(f"unknown extension {ext}")
self._ents = list(sorted(ents, key=lambda x: x.logMonoTime) if sort_by_time else ents) _ents = []
try:
for e in ents:
_ents.append(e)
except capnp.KjException:
warnings.warn("Corrupted events detected", RuntimeWarning)
self._ents = list(sorted(_ents, key=lambda x: x.logMonoTime) if sort_by_time else _ents)
self._ts = [x.logMonoTime for x in self._ents] self._ts = [x.logMonoTime for x in self._ents]
self.data_version = data_version
self._only_union_types = only_union_types
def __iter__(self): def __iter__(self):
for ent in self._ents: for ent in self._ents:

@ -1,60 +0,0 @@
#!/usr/bin/env python3
import os
import bz2
import urllib.parse
import subprocess
import tqdm
import glob
from tempfile import TemporaryDirectory
import capnp
from tools.lib.logreader import FileReader, LogReader
from cereal import log as capnp_log
class RobustLogReader(LogReader):
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False): # pylint: disable=super-init-not-called
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f:
dat = f.read()
if ext == "":
pass
elif ext == ".bz2":
try:
dat = bz2.decompress(dat)
except ValueError:
print("Failed to decompress, falling back to bzip2recover")
with TemporaryDirectory() as directory:
# Run bzip2recovery on log
with open(os.path.join(directory, 'out.bz2'), 'wb') as f:
f.write(dat)
subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory)
# Decompress and concatenate parts
dat = b""
for n in sorted(glob.glob(f"{directory}/rec*.bz2")):
print(f"Decompressing {n}")
with open(n, 'rb') as f:
dat += bz2.decompress(f.read())
else:
raise Exception(f"unknown extension {ext}")
progress = None
while True:
try:
ents = capnp_log.Event.read_multiple_bytes(dat)
self._ents = list(sorted(ents, key=lambda x: x.logMonoTime) if sort_by_time else ents)
break
except capnp.lib.capnp.KjException:
if progress is None:
progress = tqdm.tqdm(total=len(dat))
# Cut off bytes at the end until capnp is able to read
dat = dat[:-1]
progress.update(1)
self._ts = [x.logMonoTime for x in self._ents]
self.data_version = data_version
self._only_union_types = only_union_types

@ -12,7 +12,7 @@ import argparse
from common.basedir import BASEDIR from common.basedir import BASEDIR
from selfdrive.test.process_replay.compare_logs import save_log from selfdrive.test.process_replay.compare_logs import save_log
from tools.lib.robust_logreader import RobustLogReader from tools.lib.logreader import LogReader
from tools.lib.route import Route, SegmentName from tools.lib.route import Route, SegmentName
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
@ -50,7 +50,7 @@ def load_segment(segment_name):
return [] return []
try: try:
return list(RobustLogReader(segment_name)) return list(LogReader(segment_name))
except ValueError as e: except ValueError as e:
print(f"Error parsing {segment_name}: {e}") print(f"Error parsing {segment_name}: {e}")
return [] return []

Loading…
Cancel
Save