#!/usr/bin/env python3
import sys
import unittest
import hypothesis . strategies as st
import numpy as np
from hypothesis import given , settings , note
from cereal import log
from selfdrive . car . toyota . values import CAR as TOYOTA
import selfdrive . test . process_replay . process_replay as pr
def get_process_config ( process ) :
return [ cfg for cfg in pr . CONFIGS if cfg . proc_name == process ] [ 0 ]
def get_event_union_strategy ( r , name ) :
return st . fixed_dictionaries ( {
' valid ' : st . just ( True ) ,
' logMonoTime ' : st . integers ( min_value = 0 , max_value = 2 * * 64 - 1 ) ,
name : r [ name [ 0 ] . upper ( ) + name [ 1 : ] ] ,
} )
def get_strategy_for_events ( event_types , finite = False ) :
# TODO: generate automatically based on capnp definitions
def floats ( * * kwargs ) :
allow_nan = False if finite else None
allow_infinity = False if finite else None
return st . floats ( * * kwargs , allow_nan = allow_nan , allow_infinity = allow_infinity )
r = { }
r [ ' liveLocationKalman.Measurement ' ] = st . fixed_dictionaries ( {
' value ' : st . lists ( floats ( ) , min_size = 3 , max_size = 3 ) ,
' std ' : st . lists ( floats ( ) , min_size = 3 , max_size = 3 ) ,
' valid ' : st . just ( True ) ,
} )
r [ ' LiveLocationKalman ' ] = st . fixed_dictionaries ( {
' angularVelocityCalibrated ' : r [ ' liveLocationKalman.Measurement ' ] ,
' inputsOK ' : st . booleans ( ) ,
' posenetOK ' : st . booleans ( ) ,
} )
r [ ' CarState ' ] = st . fixed_dictionaries ( {
' vEgo ' : floats ( width = 32 ) ,
' vEgoRaw ' : floats ( width = 32 ) ,
' steeringPressed ' : st . booleans ( ) ,
' steeringAngleDeg ' : floats ( width = 32 ) ,
} )
r [ ' CameraOdometry ' ] = st . fixed_dictionaries ( {
' frameId ' : st . integers ( min_value = 0 , max_value = 2 * * 32 - 1 ) ,
' timestampEof ' : st . integers ( min_value = 0 , max_value = 2 * * 64 - 1 ) ,
' trans ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
' rot ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
' transStd ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
' rotStd ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
} )
r [ ' SensorEventData.SensorVec ' ] = st . fixed_dictionaries ( {
' v ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
' status ' : st . just ( 1 ) ,
} )
r [ ' SensorEventData_gyro ' ] = st . fixed_dictionaries ( {
' version ' : st . just ( 1 ) ,
' sensor ' : st . just ( 5 ) ,
' type ' : st . just ( 16 ) ,
' timestamp ' : st . integers ( min_value = 0 , max_value = 2 * * 63 - 1 ) ,
' source ' : st . just ( 8 ) , # BMX055
' gyroUncalibrated ' : r [ ' SensorEventData.SensorVec ' ] ,
} )
r [ ' SensorEventData_accel ' ] = st . fixed_dictionaries ( {
' version ' : st . just ( 1 ) ,
' sensor ' : st . just ( 1 ) ,
' type ' : st . just ( 1 ) ,
' timestamp ' : st . integers ( min_value = 0 , max_value = 2 * * 63 - 1 ) ,
' source ' : st . just ( 8 ) , # BMX055
' acceleration ' : r [ ' SensorEventData.SensorVec ' ] ,
} )
r [ ' SensorEvents ' ] = st . lists ( st . one_of ( r [ ' SensorEventData_gyro ' ] , r [ ' SensorEventData_accel ' ] ) , min_size = 1 )
r [ ' GpsLocationExternal ' ] = st . fixed_dictionaries ( {
' flags ' : st . just ( 1 ) ,
' latitude ' : floats ( ) ,
' longitude ' : floats ( ) ,
' altitude ' : floats ( ) ,
' speed ' : floats ( width = 32 ) ,
' bearingDeg ' : floats ( width = 32 ) ,
' accuracy ' : floats ( width = 32 ) ,
' timestamp ' : st . integers ( min_value = 0 , max_value = 2 * * 63 - 1 ) ,
' source ' : st . just ( 6 ) , # Ublox
' vNED ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
' verticalAccuracy ' : floats ( width = 32 ) ,
' bearingAccuracyDeg ' : floats ( width = 32 ) ,
' speedAccuracy ' : floats ( width = 32 ) ,
} )
r [ ' LiveCalibration ' ] = st . fixed_dictionaries ( {
' rpyCalib ' : st . lists ( floats ( width = 32 ) , min_size = 3 , max_size = 3 ) ,
} )
return st . lists ( st . one_of ( * [ get_event_union_strategy ( r , n ) for n in event_types ] ) )
def get_strategy_for_process ( process , finite = False ) :
return get_strategy_for_events ( get_process_config ( process ) . pubs , finite )
def convert_to_lr ( msgs ) :
return [ log . Event . new_message ( * * m ) . as_reader ( ) for m in msgs ]
def is_finite ( d , exclude = [ ] , prefix = " " ) : # pylint: disable=dangerous-default-value
ret = True
for k , v in d . items ( ) :
name = prefix + f " { k } "
if name in exclude :
continue
if isinstance ( v , dict ) :
if not is_finite ( v , exclude , name + " . " ) :
ret = False
else :
try :
if not np . isfinite ( v ) . all ( ) :
note ( ( name , v ) )
ret = False
except TypeError :
pass
return ret
def test_process ( dat , name ) :
cfg = get_process_config ( name )
lr = convert_to_lr ( dat )
pr . TIMEOUT = 0.1
return pr . replay_process ( cfg , lr , TOYOTA . COROLLA_TSS2 )
class TestFuzzy ( unittest . TestCase ) :
@given ( get_strategy_for_process ( ' paramsd ' ) )
@settings ( deadline = 1000 )
def test_paramsd ( self , dat ) :
for r in test_process ( dat , ' paramsd ' ) :
d = r . liveParameters . to_dict ( )
assert is_finite ( d )
@given ( get_strategy_for_process ( ' locationd ' , finite = True ) )
@settings ( deadline = 1000 )
def test_locationd ( self , dat ) :
exclude = [
' positionGeodetic.std ' ,
' velocityNED.std ' ,
' orientationNED.std ' ,
' calibratedOrientationECEF.std ' ,
]
for r in test_process ( dat , ' locationd ' ) :
d = r . liveLocationKalman . to_dict ( )
assert is_finite ( d , exclude )
if __name__ == " __main__ " :
procs = {
' locationd ' : TestFuzzy ( ) . test_locationd ,
' paramsd ' : TestFuzzy ( ) . test_paramsd ,
}
if len ( sys . argv ) != 2 :
print ( " Usage: ./test_fuzzy.py <process name> " )
sys . exit ( 0 )
proc = sys . argv [ 1 ]
if proc not in procs :
print ( f " { proc } not available " )
sys . exit ( 0 )
else :
procs [ proc ] ( )