RobustLogReader that can recover corrupted bz2 files (#22835)

* LogReader with bzip2 recovery

* only rlogs

* add comment

* plotjuggler should also use robust logreader
old-commit-hash: 6d6f989b7f
commatwo_master
Willem Melching 4 years ago committed by GitHub
parent 8108ca7fe8
commit c11eeb6c80
  1. 4
      selfdrive/debug/filter_log_message.py
  2. 60
      tools/lib/robust_logreader.py
  3. 4
      tools/plotjuggler/juggle.py

@ -3,7 +3,7 @@ import argparse
import json import json
import cereal.messaging as messaging import cereal.messaging as messaging
from tools.lib.logreader import LogReader from tools.lib.robust_logreader import RobustLogReader as LogReader
from tools.lib.route import Route from tools.lib.route import Route
LEVELS = { LEVELS = {
@ -56,7 +56,7 @@ if __name__ == "__main__":
logs = None logs = None
if len(args.route): if len(args.route):
r = Route(args.route[0]) r = Route(args.route[0])
logs = [q if r is None else r for (q, r) in zip(r.qlog_paths(), r.log_paths())] logs = r.log_paths()
if len(args.route) == 2 and logs: if len(args.route) == 2 and logs:
n = int(args.route[1]) n = int(args.route[1])

@ -0,0 +1,60 @@
#!/usr/bin/env python3
import os
import bz2
import urllib.parse
import subprocess
import tqdm
import glob
from tempfile import TemporaryDirectory
import capnp
from tools.lib.logreader import FileReader, LogReader
from cereal import log as capnp_log
class RobustLogReader(LogReader):
def __init__(self, fn, canonicalize=True, only_union_types=False): # pylint: disable=super-init-not-called
data_version = None
_, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
with FileReader(fn) as f:
dat = f.read()
if ext == "":
pass
elif ext == ".bz2":
try:
dat = bz2.decompress(dat)
except ValueError:
print("Failed to decompress, falling back to bzip2recover")
with TemporaryDirectory() as directory:
# Run bzip2recovery on log
with open(os.path.join(directory, 'out.bz2'), 'wb') as f:
f.write(dat)
subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory)
# Decompress and concatenate parts
dat = b""
for n in sorted(glob.glob(f"{directory}/rec*.bz2")):
print(f"Decompressing {n}")
with open(n, 'rb') as f:
dat += bz2.decompress(f.read())
else:
raise Exception(f"unknown extension {ext}")
progress = None
while True:
try:
ents = capnp_log.Event.read_multiple_bytes(dat)
self._ents = list(ents)
break
except capnp.lib.capnp.KjException:
if progress is None:
progress = tqdm.tqdm(total=len(dat))
# Cut off bytes at the end until capnp is able to read
dat = dat[:-1]
progress.update(1)
self._ts = [x.logMonoTime for x in self._ents]
self.data_version = data_version
self._only_union_types = only_union_types

@ -10,7 +10,7 @@ from common.basedir import BASEDIR
from selfdrive.test.process_replay.compare_logs import save_log from selfdrive.test.process_replay.compare_logs import save_log
from tools.lib.api import CommaApi from tools.lib.api import CommaApi
from tools.lib.auth_config import get_token from tools.lib.auth_config import get_token
from tools.lib.logreader import LogReader from tools.lib.robust_logreader import RobustLogReader
from tools.lib.route import Route from tools.lib.route import Route
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
@ -22,7 +22,7 @@ def load_segment(segment_name):
return [] return []
try: try:
return list(LogReader(segment_name)) return list(RobustLogReader(segment_name))
except ValueError as e: except ValueError as e:
print(f"Error parsing {segment_name}: {e}") print(f"Error parsing {segment_name}: {e}")
return [] return []

Loading…
Cancel
Save