import random
import numpy as np
import json
from cereal import messaging
from openpilot . selfdrive . locationd . paramsd import retrieve_initial_vehicle_params , migrate_cached_vehicle_params_if_needed
from openpilot . selfdrive . locationd . models . car_kf import CarKalman
from openpilot . selfdrive . locationd . test . test_locationd_scenarios import TEST_ROUTE
from openpilot . selfdrive . test . process_replay . migration import migrate , migrate_carParams
from openpilot . common . params import Params
from openpilot . tools . lib . logreader import LogReader
def get_random_live_parameters ( CP ) :
msg = messaging . new_message ( " liveParameters " )
msg . liveParameters . steerRatio = ( random . random ( ) + 0.5 ) * CP . steerRatio
msg . liveParameters . stiffnessFactor = random . random ( )
msg . liveParameters . angleOffsetAverageDeg = random . random ( )
msg . liveParameters . debugFilterState . std = [ random . random ( ) for _ in range ( CarKalman . P_initial . shape [ 0 ] ) ]
return msg
class TestParamsd :
def test_read_saved_params ( self ) :
params = Params ( )
lr = migrate ( LogReader ( TEST_ROUTE ) , [ migrate_carParams ] )
CP = next ( m for m in lr if m . which ( ) == " carParams " ) . carParams
msg = get_random_live_parameters ( CP )
params . put ( " LiveParametersV2 " , msg . to_bytes ( ) )
params . put ( " CarParamsPrevRoute " , CP . as_builder ( ) . to_bytes ( ) )
migrate_cached_vehicle_params_if_needed ( params ) # this is not tested here but should not mess anything up or throw an error
sr , sf , offset , p_init = retrieve_initial_vehicle_params ( params , CP , replay = True , debug = True )
np . testing . assert_allclose ( sr , msg . liveParameters . steerRatio )
np . testing . assert_allclose ( sf , msg . liveParameters . stiffnessFactor )
np . testing . assert_allclose ( offset , msg . liveParameters . angleOffsetAverageDeg )
np . testing . assert_equal ( p_init . shape , CarKalman . P_initial . shape )
np . testing . assert_allclose ( np . diagonal ( p_init ) , msg . liveParameters . debugFilterState . std )
# TODO Remove this test after the support for old format is removed
def test_read_saved_params_old_format ( self ) :
params = Params ( )
lr = migrate ( LogReader ( TEST_ROUTE ) , [ migrate_carParams ] )
CP = next ( m for m in lr if m . which ( ) == " carParams " ) . carParams
msg = get_random_live_parameters ( CP )
params . put ( " LiveParameters " , json . dumps ( msg . liveParameters . to_dict ( ) ) )
params . put ( " CarParamsPrevRoute " , CP . as_builder ( ) . to_bytes ( ) )
params . remove ( " LiveParametersV2 " )
migrate_cached_vehicle_params_if_needed ( params )
sr , sf , offset , _ = retrieve_initial_vehicle_params ( params , CP , replay = True , debug = True )
np . testing . assert_allclose ( sr , msg . liveParameters . steerRatio )
np . testing . assert_allclose ( sf , msg . liveParameters . stiffnessFactor )
np . testing . assert_allclose ( offset , msg . liveParameters . angleOffsetAverageDeg )
assert params . get ( " LiveParametersV2 " ) is not None
def test_read_saved_params_corrupted_old_format ( self ) :
params = Params ( )
params . put ( " LiveParameters " , b ' \x00 \x00 \x02 \x00 \x01 \x00 :F \xde \xed \xae ; ' )
params . remove ( " LiveParametersV2 " )
migrate_cached_vehicle_params_if_needed ( params )
assert params . get ( " LiveParameters " ) is None
assert params . get ( " LiveParametersV2 " ) is None