@ -1,4 +1,5 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import numpy as np
import os
import os
import unittest
import unittest
import requests
import requests
@ -9,6 +10,7 @@ import cereal.messaging as messaging
from typing import Any
from typing import Any
from cereal . visionipc import VisionIpcClient , VisionStreamType
from cereal . visionipc import VisionIpcClient , VisionStreamType
from openpilot . selfdrive . manager . process_config import managed_processes
from openpilot . selfdrive . manager . process_config import managed_processes
from openpilot . system . hardware import TICI
LLK_DECIMATION = 10
LLK_DECIMATION = 10
CACHE_PATH = " /data/mbgl-cache-navd.db "
CACHE_PATH = " /data/mbgl-cache-navd.db "
@ -16,6 +18,11 @@ CACHE_PATH = "/data/mbgl-cache-navd.db"
LOCATION1 = ( 32.7174 , - 117.16277 )
LOCATION1 = ( 32.7174 , - 117.16277 )
LOCATION2 = ( 32.7558 , - 117.2037 )
LOCATION2 = ( 32.7558 , - 117.2037 )
DEFAULT_ITERATIONS = 30 * LLK_DECIMATION
LOCATION1_REPEATED = [ LOCATION1 ] * DEFAULT_ITERATIONS
LOCATION2_REPEATED = [ LOCATION2 ] * DEFAULT_ITERATIONS
def gen_llk ( location = LOCATION1 ) :
def gen_llk ( location = LOCATION1 ) :
msg = messaging . new_message ( ' liveLocationKalman ' )
msg = messaging . new_message ( ' liveLocationKalman ' )
msg . liveLocationKalman . positionGeodetic = { ' value ' : [ * location , 0 ] , ' std ' : [ 0. , 0. , 0. ] , ' valid ' : True }
msg . liveLocationKalman . positionGeodetic = { ' value ' : [ * location , 0 ] , ' std ' : [ 0. , 0. , 0. ] , ' valid ' : True }
@ -110,15 +117,14 @@ class TestMapRenderer(unittest.TestCase):
assert self . vipc . connect ( False )
assert self . vipc . connect ( False )
self . vipc . recv ( )
self . vipc . recv ( )
def _run_test ( self , expect_valid , locations = LOCATION1_REPEATED ) :
def _run_test ( self , expect_valid , location = LOCATION1 ) :
starting_frame_id = None
starting_frame_id = None
self . location = location
render_times = [ ]
# run test
# run test
prev_frame_id = - 1
prev_frame_id = - 1
for i in range ( 30 * LLK_DECIMATION ) :
for i , location in enumerate ( locations ) :
frame_expected = ( i + 1 ) % LLK_DECIMATION == 0
frame_expected = ( i + 1 ) % LLK_DECIMATION == 0
if self . sm . logMonoTime [ ' mapRenderState ' ] == 0 :
if self . sm . logMonoTime [ ' mapRenderState ' ] == 0 :
@ -131,7 +137,7 @@ class TestMapRenderer(unittest.TestCase):
if starting_frame_id is None :
if starting_frame_id is None :
starting_frame_id = prev_frame_id
starting_frame_id = prev_frame_id
llk = gen_llk ( self . location )
llk = gen_llk ( location )
self . pm . send ( " liveLocationKalman " , llk )
self . pm . send ( " liveLocationKalman " , llk )
self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
self . sm . update ( 1000 if frame_expected else 0 )
self . sm . update ( 1000 if frame_expected else 0 )
@ -157,6 +163,7 @@ class TestMapRenderer(unittest.TestCase):
assert self . sm [ ' mapRenderState ' ] . renderTime == 0.
assert self . sm [ ' mapRenderState ' ] . renderTime == 0.
else :
else :
assert 0. < self . sm [ ' mapRenderState ' ] . renderTime < 0.1
assert 0. < self . sm [ ' mapRenderState ' ] . renderTime < 0.1
render_times . append ( self . sm [ ' mapRenderState ' ] . renderTime )
# check vision ipc output
# check vision ipc output
assert self . vipc . recv ( ) is not None
assert self . vipc . recv ( ) is not None
@ -164,6 +171,8 @@ class TestMapRenderer(unittest.TestCase):
assert self . vipc . timestamp_sof == llk . logMonoTime
assert self . vipc . timestamp_sof == llk . logMonoTime
assert self . vipc . frame_id == self . sm [ ' mapRenderState ' ] . frameId
assert self . vipc . frame_id == self . sm [ ' mapRenderState ' ] . frameId
return render_times
def test_with_internet ( self ) :
def test_with_internet ( self ) :
self . _setup_test ( )
self . _setup_test ( )
self . _run_test ( True )
self . _run_test ( True )
@ -180,13 +189,42 @@ class TestMapRenderer(unittest.TestCase):
self . server . disable_internet ( )
self . server . disable_internet ( )
# change locations to force mapsd to refetch
# change locations to force mapsd to refetch
self . _run_test ( False , LOCATION2 )
self . _run_test ( False , LOCATION2_REPEATED )
self . server . enable_internet ( )
self . server . enable_internet ( )
self . _run_test ( True , LOCATION2 )
self . _run_test ( True , LOCATION2_REPEATED )
self . _run_test ( True , LOCATION2_REPEATED )
def test_render_time_distribution ( self ) :
if not TICI :
raise unittest . SkipTest
self . _setup_test ( )
# from location1 -> location2 and back
locations = np . array ( [ * np . linspace ( LOCATION1 , LOCATION2 , 2000 ) , * np . linspace ( LOCATION2 , LOCATION1 , 2000 ) ] ) . tolist ( )
render_times = self . _run_test ( True , locations )
_min = np . min ( render_times )
_max = np . max ( render_times )
_mean = np . mean ( render_times )
_median = np . median ( render_times )
_stddev = np . std ( render_times )
print ( f " Stats: min: { _min } , max: { _max } , mean: { _mean } , median: { _median } , stddev: { _stddev } , count: { len ( render_times ) } " )
def assert_stat ( stat , nominal , tol = 0.3 ) :
tol = ( nominal / ( 1 + tol ) ) , ( nominal * ( 1 + tol ) )
self . assertTrue ( tol [ 0 ] < stat < tol [ 1 ] , f " { stat } not in tolerance { tol } " )
assert_stat ( _mean , 0.0035 )
assert_stat ( _median , 0.0034 )
assert_stat ( _stddev , 0.00093 , tol = 0.5 )
self . assertLess ( _max , 0.2 )
self . assertGreater ( _min , 0.0010 )
self . location = LOCATION1
self . _run_test ( True , LOCATION2 )
if __name__ == " __main__ " :
if __name__ == " __main__ " :
unittest . main ( )
unittest . main ( )