#!/usr/bin/env python3
import os
import json
import random
import unittest
import time
from cffi import FFI
from cereal import log
import cereal . messaging as messaging
from common . params import Params
from selfdrive . manager . process_config import managed_processes
SENSOR_DECIMATION = 1
VISION_DECIMATION = 1
LIBLOCATIOND_PATH = os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , ' ../liblocationd.so ' ) )
class TestLocationdLib ( unittest . TestCase ) :
def setUp ( self ) :
header = ''' typedef ...* Localizer_t;
Localizer_t localizer_init ( ) ;
void localizer_get_message_bytes ( Localizer_t localizer , uint64_t logMonoTime , bool inputsOK , bool sensorsOK , bool gpsOK , char * buff , size_t buff_size ) ;
void localizer_handle_msg_bytes ( Localizer_t localizer , const char * data , size_t size ) ; '''
self . ffi = FFI ( )
self . ffi . cdef ( header )
self . lib = self . ffi . dlopen ( LIBLOCATIOND_PATH )
self . localizer = self . lib . localizer_init ( )
self . buff_size = 2048
self . msg_buff = self . ffi . new ( f ' char[ { self . buff_size } ] ' )
def localizer_handle_msg ( self , msg_builder ) :
bytstr = msg_builder . to_bytes ( )
self . lib . localizer_handle_msg_bytes ( self . localizer , self . ffi . from_buffer ( bytstr ) , len ( bytstr ) )
def localizer_get_msg ( self , t = 0 , inputsOK = True , sensorsOK = True , gpsOK = True ) :
self . lib . localizer_get_message_bytes ( self . localizer , t , inputsOK , sensorsOK , gpsOK , self . ffi . addressof ( self . msg_buff , 0 ) , self . buff_size )
return log . Event . from_bytes ( self . ffi . buffer ( self . msg_buff ) , nesting_limit = self . buff_size / / 8 )
def test_liblocalizer ( self ) :
msg = messaging . new_message ( ' liveCalibration ' )
msg . liveCalibration . validBlocks = random . randint ( 1 , 10 )
msg . liveCalibration . rpyCalib = [ random . random ( ) for _ in range ( 3 ) ]
self . localizer_handle_msg ( msg )
liveloc = self . localizer_get_msg ( )
self . assertTrue ( liveloc is not None )
def test_device_fell ( self ) :
msg = messaging . new_message ( ' sensorEvents ' , 1 )
msg . sensorEvents [ 0 ] . sensor = 1
msg . sensorEvents [ 0 ] . type = 1
msg . sensorEvents [ 0 ] . init ( ' acceleration ' )
msg . sensorEvents [ 0 ] . acceleration . v = [ 10.0 , 0.0 , 0.0 ] # zero with gravity
self . localizer_handle_msg ( msg )
ret = self . localizer_get_msg ( )
self . assertTrue ( ret . liveLocationKalman . deviceStable )
msg = messaging . new_message ( ' sensorEvents ' , 1 )
msg . sensorEvents [ 0 ] . sensor = 1
msg . sensorEvents [ 0 ] . type = 1
msg . sensorEvents [ 0 ] . init ( ' acceleration ' )
msg . sensorEvents [ 0 ] . acceleration . v = [ 50.1 , 0.0 , 0.0 ] # more than 40 m/s**2
self . localizer_handle_msg ( msg )
ret = self . localizer_get_msg ( )
self . assertFalse ( ret . liveLocationKalman . deviceStable )
def test_posenet_spike ( self ) :
for _ in range ( SENSOR_DECIMATION ) :
msg = messaging . new_message ( ' carState ' )
msg . carState . vEgo = 6.0 # more than 5 m/s
self . localizer_handle_msg ( msg )
ret = self . localizer_get_msg ( )
self . assertTrue ( ret . liveLocationKalman . posenetOK )
for _ in range ( 20 * VISION_DECIMATION ) : # size of hist_old
msg = messaging . new_message ( ' cameraOdometry ' )
msg . cameraOdometry . rot = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . rotStd = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . trans = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . transStd = [ 2.0 , 0.0 , 0.0 ]
self . localizer_handle_msg ( msg )
for _ in range ( 20 * VISION_DECIMATION ) : # size of hist_new
msg = messaging . new_message ( ' cameraOdometry ' )
msg . cameraOdometry . rot = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . rotStd = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . trans = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . transStd = [ 8.1 , 0.0 , 0.0 ] # more than 4 times larger
self . localizer_handle_msg ( msg )
ret = self . localizer_get_msg ( )
self . assertFalse ( ret . liveLocationKalman . posenetOK )
class TestLocationdProc ( unittest . TestCase ) :
MAX_WAITS = 1000
def setUp ( self ) :
random . seed ( 123489234 )
self . pm = messaging . PubMaster ( { ' gpsLocationExternal ' , ' cameraOdometry ' } )
managed_processes [ ' locationd ' ] . prepare ( )
managed_processes [ ' locationd ' ] . start ( )
time . sleep ( 1 )
def tearDown ( self ) :
managed_processes [ ' locationd ' ] . stop ( )
def send_msg ( self , msg ) :
self . pm . send ( msg . which ( ) , msg )
waits_left = self . MAX_WAITS
while waits_left and not self . pm . all_readers_updated ( msg . which ( ) ) :
time . sleep ( 0 )
waits_left - = 1
time . sleep ( 0.0001 )
def test_params_gps ( self ) :
# first reset params
Params ( ) . put ( ' LastGPSPosition ' , json . dumps ( { " latitude " : 0.0 , " longitude " : 0.0 , " altitude " : 0.0 } ) )
lat = 30 + ( random . random ( ) * 10.0 )
lon = - 70 + ( random . random ( ) * 10.0 )
alt = 5 + ( random . random ( ) * 10.0 )
for _ in range ( 1000 ) : # because of kalman filter, send often
msg = messaging . new_message ( ' gpsLocationExternal ' )
msg . logMonoTime = 0
msg . gpsLocationExternal . flags = 1
msg . gpsLocationExternal . verticalAccuracy = 0.0
msg . gpsLocationExternal . vNED = [ 0.0 , 0.0 , 0.0 ]
msg . gpsLocationExternal . latitude = lat
msg . gpsLocationExternal . longitude = lon
msg . gpsLocationExternal . altitude = alt
self . send_msg ( msg )
for _ in range ( 250 ) : # params is only written so often
msg = messaging . new_message ( ' cameraOdometry ' )
msg . logMonoTime = 0
msg . cameraOdometry . rot = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . rotStd = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . trans = [ 0.0 , 0.0 , 0.0 ]
msg . cameraOdometry . transStd = [ 0.0 , 0.0 , 0.0 ]
self . send_msg ( msg )
time . sleep ( 1 ) # wait for async params write
lastGPS = json . loads ( Params ( ) . get ( ' LastGPSPosition ' ) )
self . assertAlmostEqual ( lastGPS [ ' latitude ' ] , lat , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' longitude ' ] , lon , places = 3 )
self . assertAlmostEqual ( lastGPS [ ' altitude ' ] , alt , places = 3 )
if __name__ == " __main__ " :
unittest . main ( )