@ -1,4 +1,5 @@
#!/usr/bin/env python3
import capnp
import contextlib
import io
import shutil
@ -11,6 +12,7 @@ import requests
from parameterized import parameterized
from unittest import mock
from cereal import log as capnp_log
from openpilot . tools . lib . logreader import LogIterable , LogReader , comma_api_source , parse_indirect , ReadMode , InternalUnavailableException
from openpilot . tools . lib . route import SegmentRange
from openpilot . tools . lib . url_file import URLFileException
@ -216,6 +218,43 @@ class TestLogReader(unittest.TestCase):
log_len = len ( list ( lr ) )
self . assertEqual ( qlog_len , log_len )
@pytest . mark . slow
def test_sort_by_time ( self ) :
msgs = list ( LogReader ( f " { TEST_ROUTE } /0/q " ) )
self . assertNotEqual ( msgs , sorted ( msgs , key = lambda m : m . logMonoTime ) )
msgs = list ( LogReader ( f " { TEST_ROUTE } /0/q " , sort_by_time = True ) )
self . assertEqual ( msgs , sorted ( msgs , key = lambda m : m . logMonoTime ) )
def test_only_union_types ( self ) :
with tempfile . NamedTemporaryFile ( ) as qlog :
# write valid Event messages
num_msgs = 100
with open ( qlog . name , " wb " ) as f :
f . write ( b " " . join ( capnp_log . Event . new_message ( ) . to_bytes ( ) for _ in range ( num_msgs ) ) )
msgs = list ( LogReader ( qlog . name ) )
self . assertEqual ( len ( msgs ) , num_msgs )
[ m . which ( ) for m in msgs ]
# append non-union Event message
event_msg = capnp_log . Event . new_message ( )
non_union_bytes = bytearray ( event_msg . to_bytes ( ) )
non_union_bytes [ event_msg . total_size . word_count * 8 ] = 0xff # set discriminant value out of range using Event word offset
with open ( qlog . name , " ab " ) as f :
f . write ( non_union_bytes )
# ensure new message is added, but is not a union type
msgs = list ( LogReader ( qlog . name ) )
self . assertEqual ( len ( msgs ) , num_msgs + 1 )
with self . assertRaises ( capnp . KjException ) :
[ m . which ( ) for m in msgs ]
# should not be added when only_union_types=True
msgs = list ( LogReader ( qlog . name , only_union_types = True ) )
self . assertEqual ( len ( msgs ) , num_msgs )
[ m . which ( ) for m in msgs ]
if __name__ == " __main__ " :
unittest . main ( )