@ -33,46 +33,53 @@ class TestMapRenderer(unittest.TestCase):
def tearDown ( self ) :
managed_processes [ ' mapsd ' ] . stop ( )
def _run_test ( self , valid ) :
def _run_test ( self , expect_ valid) :
# start + sync up
managed_processes [ ' mapsd ' ] . start ( )
for _ in range ( 100 ) :
self . pm . send ( " liveLocationKalman " , gen_llk ( ) )
self . sm . update ( 100 )
if self . sm . updated [ ' mapRenderState ' ] :
break
assert self . sm . updated [ ' mapRenderState ' ] , " renderer didn ' t start "
assert VisionIpcClient . available_streams ( " navd " , False ) == { VisionStreamType . VISION_STREAM_MAP , }
assert self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
# connect to vipc
assert VisionIpcClient . available_streams ( " navd " , False ) == { VisionStreamType . VISION_STREAM_MAP , }
assert self . vipc . connect ( False )
self . vipc . recv ( )
# run test
for i in range ( 20 * LLK_DECIMATION ) :
prev_frame_id = - 1
for i in range ( 30 * LLK_DECIMATION ) :
frame_expected = ( i + 1 ) % LLK_DECIMATION == 0
if self . sm . logMonoTime [ ' mapRenderState ' ] == 0 :
prev_valid = False
prev_frame_id = - 1
else :
prev_frame_id = self . sm [ ' mapRenderState ' ] . frameId
prev_valid = self . sm . valid [ ' mapRenderState ' ]
llk = gen_llk ( )
self . pm . send ( " liveLocationKalman " , llk )
self . sm . update ( 200 if frame_expected else 10 )
self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
self . sm . update ( 1000 if frame_expected else 0 )
assert self . sm . updated [ ' mapRenderState ' ] == frame_expected , " renderer running at wrong frequency "
if not frame_expected :
continue
# give a few frames to go valid
if expect_valid and not self . sm . valid [ ' mapRenderState ' ] and not prev_valid and self . sm [ ' mapRenderState ' ] . frameId < 5 :
continue
# check output
assert self . sm . valid [ ' mapRenderState ' ] == valid
assert self . sm . valid [ ' mapRenderState ' ] == expect_ valid
assert self . sm [ ' mapRenderState ' ] . frameId == ( prev_frame_id + 1 )
assert self . sm [ ' mapRenderState ' ] . locationMonoTime == llk . logMonoTime
if not valid :
if not expect_ valid:
assert self . sm [ ' mapRenderState ' ] . renderTime == 0.
else :
assert 0. < self . sm [ ' mapRenderState ' ] . renderTime < 0.1
# check vision ipc output
assert self . vipc . recv ( ) is not None
assert self . vipc . valid == valid
assert self . vipc . valid == expect_ valid
assert self . vipc . timestamp_sof == llk . logMonoTime
assert self . vipc . frame_id == self . sm [ ' mapRenderState ' ] . frameId