You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							60 lines
						
					
					
						
							1.8 KiB
						
					
					
				
			
		
		
	
	
							60 lines
						
					
					
						
							1.8 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import os
 | 
						|
import bz2
 | 
						|
import urllib.parse
 | 
						|
import subprocess
 | 
						|
import tqdm
 | 
						|
import glob
 | 
						|
from tempfile import TemporaryDirectory
 | 
						|
import capnp
 | 
						|
 | 
						|
from tools.lib.logreader import FileReader, LogReader
 | 
						|
from cereal import log as capnp_log
 | 
						|
 | 
						|
 | 
						|
class RobustLogReader(LogReader):
 | 
						|
  def __init__(self, fn, canonicalize=True, only_union_types=False):  # pylint: disable=super-init-not-called
 | 
						|
    data_version = None
 | 
						|
    _, ext = os.path.splitext(urllib.parse.urlparse(fn).path)
 | 
						|
    with FileReader(fn) as f:
 | 
						|
      dat = f.read()
 | 
						|
 | 
						|
    if ext == "":
 | 
						|
      pass
 | 
						|
    elif ext == ".bz2":
 | 
						|
      try:
 | 
						|
        dat = bz2.decompress(dat)
 | 
						|
      except ValueError:
 | 
						|
        print("Failed to decompress, falling back to bzip2recover")
 | 
						|
        with TemporaryDirectory() as directory:
 | 
						|
          # Run bzip2recovery on log
 | 
						|
          with open(os.path.join(directory, 'out.bz2'), 'wb') as f:
 | 
						|
            f.write(dat)
 | 
						|
          subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory)
 | 
						|
 | 
						|
          # Decompress and concatenate parts
 | 
						|
          dat = b""
 | 
						|
          for n in sorted(glob.glob(f"{directory}/rec*.bz2")):
 | 
						|
            print(f"Decompressing {n}")
 | 
						|
            with open(n, 'rb') as f:
 | 
						|
              dat += bz2.decompress(f.read())
 | 
						|
    else:
 | 
						|
      raise Exception(f"unknown extension {ext}")
 | 
						|
 | 
						|
    progress = None
 | 
						|
    while True:
 | 
						|
      try:
 | 
						|
        ents = capnp_log.Event.read_multiple_bytes(dat)
 | 
						|
        self._ents = list(ents)
 | 
						|
        break
 | 
						|
      except capnp.lib.capnp.KjException:
 | 
						|
        if progress is None:
 | 
						|
          progress = tqdm.tqdm(total=len(dat))
 | 
						|
 | 
						|
        # Cut off bytes at the end until capnp is able to read
 | 
						|
        dat = dat[:-1]
 | 
						|
        progress.update(1)
 | 
						|
 | 
						|
    self._ts = [x.logMonoTime for x in self._ents]
 | 
						|
    self.data_version = data_version
 | 
						|
    self._only_union_types = only_union_types
 | 
						|
 |