@ -2,6 +2,7 @@
import time
import time
import unittest
import unittest
import numpy as np
import numpy as np
import random
import cereal . messaging as messaging
import cereal . messaging as messaging
from cereal . visionipc . visionipc_pyx import VisionIpcServer , VisionStreamType # pylint: disable=no-name-in-module, import-error
from cereal . visionipc . visionipc_pyx import VisionIpcServer , VisionStreamType # pylint: disable=no-name-in-module, import-error
@ -26,7 +27,7 @@ class TestModeld(unittest.TestCase):
self . vipc_server . start_listener ( )
self . vipc_server . start_listener ( )
self . sm = messaging . SubMaster ( [ ' modelV2 ' , ' cameraOdometry ' ] )
self . sm = messaging . SubMaster ( [ ' modelV2 ' , ' cameraOdometry ' ] )
self . pm = messaging . PubMaster ( [ ' roadCameraState ' , ' wideRoadCameraState ' , ' driverCameraState ' , ' liveCalibration' , ' lateralPlan ' ] )
self . pm = messaging . PubMaster ( [ ' roadCameraState ' , ' wideRoadCameraState ' , ' liveCalibration ' , ' lateralPlan ' ] )
managed_processes [ ' modeld ' ] . start ( )
managed_processes [ ' modeld ' ] . start ( )
time . sleep ( 0.2 )
time . sleep ( 0.2 )
@ -36,22 +37,32 @@ class TestModeld(unittest.TestCase):
managed_processes [ ' modeld ' ] . stop ( )
managed_processes [ ' modeld ' ] . stop ( )
del self . vipc_server
del self . vipc_server
def test_modeld ( self ) :
def _send_frames ( self , frame_id , cams = None ) :
for n in range ( 1 , 500 ) :
if cams is None :
for cam in ( ' roadCameraState ' , ' wideRoadCameraState ' ) :
cams = ( ' roadCameraState ' , ' wideRoadCameraState ' )
msg = messaging . new_message ( cam )
cs = getattr ( msg , cam )
cs = None
cs . frameId = n
for cam in cams :
cs . timestampSof = int ( ( n * DT_MDL ) * 1e9 )
msg = messaging . new_message ( cam )
cs . timestampEof = int ( cs . timestampSof + ( DT_MDL * 1e9 ) )
cs = getattr ( msg , cam )
cs . frameId = frame_id
cs . timestampSof = int ( ( frame_id * DT_MDL ) * 1e9 )
cs . timestampEof = int ( cs . timestampSof + ( DT_MDL * 1e9 ) )
self . pm . send ( msg . which ( ) , msg )
self . pm . send ( msg . which ( ) , msg )
self . vipc_server . send ( VIPC_STREAM [ msg . which ( ) ] , IMG_BYTES , cs . frameId ,
self . vipc_server . send ( VIPC_STREAM [ msg . which ( ) ] , IMG_BYTES , cs . frameId ,
cs . timestampSof , cs . timestampEof )
cs . timestampSof , cs . timestampEof )
return cs
self . sm . update ( 5000 )
def _wait ( self ) :
if self . sm [ ' modelV2 ' ] . frameId != self . sm [ ' cameraOdometry ' ] . frameId :
self . sm . update ( 5000 )
self . sm . update ( 1000 )
if self . sm [ ' modelV2 ' ] . frameId != self . sm [ ' cameraOdometry ' ] . frameId :
self . sm . update ( 1000 )
def test_modeld ( self ) :
for n in range ( 1 , 500 ) :
cs = self . _send_frames ( n )
self . _wait ( )
mdl = self . sm [ ' modelV2 ' ]
mdl = self . sm [ ' modelV2 ' ]
self . assertEqual ( mdl . frameId , n )
self . assertEqual ( mdl . frameId , n )
@ -64,8 +75,32 @@ class TestModeld(unittest.TestCase):
self . assertEqual ( odo . frameId , n )
self . assertEqual ( odo . frameId , n )
self . assertEqual ( odo . timestampEof , cs . timestampEof )
self . assertEqual ( odo . timestampEof , cs . timestampEof )
def test_skipped_frames ( self ) :
def test_dropped_frames ( self ) :
pass
"""
modeld should only run on consecutive road frames
"""
frame_id = - 1
road_frames = list ( )
for n in range ( 1 , 50 ) :
if ( random . random ( ) < 0.1 ) and n > 3 :
cams = random . choice ( [ ( ) , ( ' wideRoadCameraState ' , ) ] )
self . _send_frames ( n , cams )
else :
self . _send_frames ( n )
road_frames . append ( n )
self . _wait ( )
if len ( road_frames ) < 3 or road_frames [ - 1 ] - road_frames [ - 2 ] == 1 :
frame_id = road_frames [ - 1 ]
mdl = self . sm [ ' modelV2 ' ]
odo = self . sm [ ' cameraOdometry ' ]
self . assertEqual ( mdl . frameId , frame_id )
self . assertEqual ( mdl . frameIdExtra , frame_id )
self . assertEqual ( odo . frameId , frame_id )
if n != frame_id :
self . assertFalse ( self . sm . updated [ ' modelV2 ' ] )
self . assertFalse ( self . sm . updated [ ' cameraOdometry ' ] )
if __name__ == " __main__ " :
if __name__ == " __main__ " :