@ -4,10 +4,12 @@ import json
import random
import unittest
import time
import capnp
from cffi import FFI
from cereal import log
import cereal . messaging as messaging
from cereal . services import service_list
from common . params import Params
from selfdrive . manager . process_config import managed_processes
@ -103,13 +105,15 @@ void localizer_handle_msg_bytes(Localizer_t localizer, const char *data, size_t
ret = self . localizer_get_msg ( )
self . assertFalse ( ret . liveLocationKalman . posenetOK )
class TestLocationdProc ( unittest . TestCase ) :
MAX_WAITS = 1000
LLD_MSGS = { ' gpsLocationExternal ' , ' cameraOdometry ' , ' carState ' , ' sensorEvents ' , ' liveCalibration ' }
def setUp ( self ) :
random . seed ( 123489234 )
self . pm = messaging . PubMaster ( { ' gpsLocationExternal ' , ' cameraOdometry ' } )
self . pm = messaging . PubMaster ( self . LLD_MSGS )
managed_processes [ ' locationd ' ] . prepare ( )
managed_processes [ ' locationd ' ] . start ( )
@ -127,43 +131,53 @@ class TestLocationdProc(unittest.TestCase):
waits_left - = 1
time . sleep ( 0.0001 )
def test_params_gps ( self ) :
# first reset params
Params ( ) . put ( ' LastGPSPosition ' , json . dumps ( { " latitude " : 0.0 , " longitude " : 0.0 , " altitude " : 0.0 } ) )
lat = 30 + ( random . random ( ) * 10.0 )
lon = - 70 + ( random . random ( ) * 10.0 )
alt = 5 + ( random . random ( ) * 10.0 )
def get_fake_msg ( self , name , t ) :
try :
msg = messaging . new_message ( name )
except capnp . lib . capnp . KjException :
msg = messaging . new_message ( name , 0 )
for _ in range ( 1000 ) : # because of kalman filter, send often
msg = messaging . new_message ( ' gpsLocationExternal ' )
msg . logMonoTime = 0
if name == " gpsLocationExternal " :
msg . gpsLocationExternal . flags = 1
msg . gpsLocationExternal . verticalAccuracy = 1.0
msg . gpsLocationExternal . speedAccuracy = 1.0
msg . gpsLocationExternal . bearingAccuracyDeg = 1.0
msg . gpsLocationExternal . vNED = [ 0.0 , 0.0 , 0.0 ]
msg . gpsLocationExternal . latitude = lat
msg . gpsLocationExternal . longitude = lon
msg . gpsLocationExternal . altitude = alt
self . send_msg ( msg )
for _ in range ( 250 ) : # params is only written so often
msg = messaging . new_message ( ' cameraOdometry ' )
msg . logMonoTime = 0
msg . gpsLocationExternal . latitude = self . lat
msg . gpsLocationExternal . longitude = self . lon
msg . gpsLocationExternal . altitude = self . alt
elif name == ' cameraOdometry ' :
msg . cameraOdometry . rot = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . rotStd = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . trans = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . transStd = [ 0.0 , 0.0 , 0.0 ]
self . send_msg ( msg )
msg . logMonoTime = t
return msg
def test_params_gps ( self ) :
# first reset params
Params ( ) . delete ( ' LastGPSPosition ' )
self . lat = 30 + ( random . random ( ) * 10.0 )
self . lon = - 70 + ( random . random ( ) * 10.0 )
self . alt = 5 + ( random . random ( ) * 10.0 )
self . fake_duration = 90 # secs
# get fake messages at the correct frequency, listed in services.py
fake_msgs = [ ]
for sec in range ( self . fake_duration ) :
for name in self . LLD_MSGS :
for j in range ( int ( service_list [ name ] . frequency ) ) :
fake_msgs . append ( self . get_fake_msg ( name , int ( ( sec + j / service_list [ name ] . frequency ) * 1e9 ) ) )
for fake_msg in sorted ( fake_msgs , key = lambda x : x . logMonoTime ) :
self . send_msg ( fake_msg )
time . sleep ( 1 ) # wait for async params write
lastGPS = json . loads ( Params ( ) . get ( ' LastGPSPosition ' ) )
self . assertAlmostEqual ( lastGPS [ ' latitude ' ] , lat , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' longitude ' ] , lon , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' altitude ' ] , alt , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' latitude ' ] , self . lat , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' longitude ' ] , self . lon , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' altitude ' ] , self . alt , places = 3 )
if __name__ == " __main__ " :