make normal logreader more robust (#24577)
	
		
	
				
					
				
			
							parent
							
								
									3771ccc655
								
							
						
					
					
						commit
						194e5fdf1c
					
				
				 4 changed files with 24 additions and 70 deletions
			
			
		@ -1,60 +0,0 @@ | 
				
			|||||||
#!/usr/bin/env python3 | 
					 | 
				
			||||||
import os | 
					 | 
				
			||||||
import bz2 | 
					 | 
				
			||||||
import urllib.parse | 
					 | 
				
			||||||
import subprocess | 
					 | 
				
			||||||
import tqdm | 
					 | 
				
			||||||
import glob | 
					 | 
				
			||||||
from tempfile import TemporaryDirectory | 
					 | 
				
			||||||
import capnp | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
from tools.lib.logreader import FileReader, LogReader | 
					 | 
				
			||||||
from cereal import log as capnp_log | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class RobustLogReader(LogReader): | 
					 | 
				
			||||||
  def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False):  # pylint: disable=super-init-not-called | 
					 | 
				
			||||||
    data_version = None | 
					 | 
				
			||||||
    _, ext = os.path.splitext(urllib.parse.urlparse(fn).path) | 
					 | 
				
			||||||
    with FileReader(fn) as f: | 
					 | 
				
			||||||
      dat = f.read() | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    if ext == "": | 
					 | 
				
			||||||
      pass | 
					 | 
				
			||||||
    elif ext == ".bz2": | 
					 | 
				
			||||||
      try: | 
					 | 
				
			||||||
        dat = bz2.decompress(dat) | 
					 | 
				
			||||||
      except ValueError: | 
					 | 
				
			||||||
        print("Failed to decompress, falling back to bzip2recover") | 
					 | 
				
			||||||
        with TemporaryDirectory() as directory: | 
					 | 
				
			||||||
          # Run bzip2recovery on log | 
					 | 
				
			||||||
          with open(os.path.join(directory, 'out.bz2'), 'wb') as f: | 
					 | 
				
			||||||
            f.write(dat) | 
					 | 
				
			||||||
          subprocess.check_call(["bzip2recover", "out.bz2"], cwd=directory) | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
          # Decompress and concatenate parts | 
					 | 
				
			||||||
          dat = b"" | 
					 | 
				
			||||||
          for n in sorted(glob.glob(f"{directory}/rec*.bz2")): | 
					 | 
				
			||||||
            print(f"Decompressing {n}") | 
					 | 
				
			||||||
            with open(n, 'rb') as f: | 
					 | 
				
			||||||
              dat += bz2.decompress(f.read()) | 
					 | 
				
			||||||
    else: | 
					 | 
				
			||||||
      raise Exception(f"unknown extension {ext}") | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    progress = None | 
					 | 
				
			||||||
    while True: | 
					 | 
				
			||||||
      try: | 
					 | 
				
			||||||
        ents = capnp_log.Event.read_multiple_bytes(dat) | 
					 | 
				
			||||||
        self._ents = list(sorted(ents, key=lambda x: x.logMonoTime) if sort_by_time else ents) | 
					 | 
				
			||||||
        break | 
					 | 
				
			||||||
      except capnp.lib.capnp.KjException: | 
					 | 
				
			||||||
        if progress is None: | 
					 | 
				
			||||||
          progress = tqdm.tqdm(total=len(dat)) | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Cut off bytes at the end until capnp is able to read | 
					 | 
				
			||||||
        dat = dat[:-1] | 
					 | 
				
			||||||
        progress.update(1) | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    self._ts = [x.logMonoTime for x in self._ents] | 
					 | 
				
			||||||
    self.data_version = data_version | 
					 | 
				
			||||||
    self._only_union_types = only_union_types | 
					 | 
				
			||||||
					Loading…
					
					
				
		Reference in new issue