#!/usr/bin/env python3
import os
import bz2
import urllib . parse
import subprocess
import tqdm
import glob
from tempfile import TemporaryDirectory
import capnp
from tools . lib . logreader import FileReader , LogReader
from cereal import log as capnp_log
class RobustLogReader ( LogReader ) :
def __init__ ( self , fn , canonicalize = True , only_union_types = False , sort_by_time = False ) : # pylint: disable=super-init-not-called
data_version = None
_ , ext = os . path . splitext ( urllib . parse . urlparse ( fn ) . path )
with FileReader ( fn ) as f :
dat = f . read ( )
if ext == " " :
pass
elif ext == " .bz2 " :
try :
dat = bz2 . decompress ( dat )
except ValueError :
print ( " Failed to decompress, falling back to bzip2recover " )
with TemporaryDirectory ( ) as directory :
# Run bzip2recovery on log
with open ( os . path . join ( directory , ' out.bz2 ' ) , ' wb ' ) as f :
f . write ( dat )
subprocess . check_call ( [ " bzip2recover " , " out.bz2 " ] , cwd = directory )
# Decompress and concatenate parts
dat = b " "
for n in sorted ( glob . glob ( f " { directory } /rec*.bz2 " ) ) :
print ( f " Decompressing { n } " )
with open ( n , ' rb ' ) as f :
dat + = bz2 . decompress ( f . read ( ) )
else :
raise Exception ( f " unknown extension { ext } " )
progress = None
while True :
try :
ents = capnp_log . Event . read_multiple_bytes ( dat )
self . _ents = list ( sorted ( ents , key = lambda x : x . logMonoTime ) if sort_by_time else ents )
break
except capnp . lib . capnp . KjException :
if progress is None :
progress = tqdm . tqdm ( total = len ( dat ) )
# Cut off bytes at the end until capnp is able to read
dat = dat [ : - 1 ]
progress . update ( 1 )
self . _ts = [ x . logMonoTime for x in self . _ents ]
self . data_version = data_version
self . _only_union_types = only_union_types