#!/usr/bin/env python3
import time
import unittest
from parameterized import parameterized
from cereal import log , car
import cereal . messaging as messaging
from common . params import Params
from selfdrive . boardd . boardd_api_impl import can_list_to_can_capnp # pylint: disable=no-name-in-module,import-error
from selfdrive . car . fingerprints import _FINGERPRINTS
from selfdrive . car . hyundai . values import CAR as HYUNDAI
from selfdrive . car . toyota . values import CAR as TOYOTA
from selfdrive . car . mazda . values import CAR as MAZDA
from selfdrive . controls . lib . events import EVENT_NAME
from selfdrive . test . helpers import with_processes
EventName = car . CarEvent . EventName
Ecu = car . CarParams . Ecu
COROLLA_FW_VERSIONS = [
( Ecu . engine , 0x7e0 , None , b ' \x02 30ZC2000 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 50212000 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 ' ) ,
( Ecu . esp , 0x7b0 , None , b ' F152602190 \x00 \x00 \x00 \x00 \x00 \x00 ' ) ,
( Ecu . eps , 0x7a1 , None , b ' 8965B02181 \x00 \x00 \x00 \x00 \x00 \x00 ' ) ,
( Ecu . fwdRadar , 0x750 , 0xf , b ' 8821F4702100 \x00 \x00 \x00 \x00 ' ) ,
( Ecu . fwdCamera , 0x750 , 0x6d , b ' 8646F0201101 \x00 \x00 \x00 \x00 ' ) ,
( Ecu . dsu , 0x791 , None , b ' 881510201100 \x00 \x00 \x00 \x00 ' ) ,
]
COROLLA_FW_VERSIONS_FUZZY = COROLLA_FW_VERSIONS [ : - 1 ] + [ ( Ecu . dsu , 0x791 , None , b ' xxxxxx ' ) ]
COROLLA_FW_VERSIONS_NO_DSU = COROLLA_FW_VERSIONS [ : - 1 ]
class TestStartup ( unittest . TestCase ) :
@parameterized . expand ( [
# TODO: test EventName.startup for release branches
# officially supported car
( EventName . startupMaster , HYUNDAI . SONATA , False , None ) ,
( EventName . startupMaster , HYUNDAI . SONATA , True , None ) ,
# offically supported car, FW query
( EventName . startupMaster , TOYOTA . COROLLA , False , COROLLA_FW_VERSIONS ) ,
# DSU unplugged
( EventName . startupMaster , TOYOTA . COROLLA , True , COROLLA_FW_VERSIONS_NO_DSU ) ,
( EventName . communityFeatureDisallowed , TOYOTA . COROLLA , False , COROLLA_FW_VERSIONS_NO_DSU ) ,
# community supported car
( EventName . startupMaster , HYUNDAI . KIA_STINGER , True , None ) ,
( EventName . communityFeatureDisallowed , HYUNDAI . KIA_STINGER , False , None ) ,
# dashcamOnly car
( EventName . startupNoControl , MAZDA . CX5 , True , None ) ,
( EventName . startupNoControl , MAZDA . CX5 , False , None ) ,
# unrecognized car
( EventName . startupNoCar , None , True , None ) ,
( EventName . startupNoCar , None , False , None ) ,
# fuzzy match
( EventName . startupFuzzyFingerprint , TOYOTA . COROLLA , True , COROLLA_FW_VERSIONS_FUZZY ) ,
( EventName . communityFeatureDisallowed , TOYOTA . COROLLA , False , COROLLA_FW_VERSIONS_FUZZY ) ,
] )
@with_processes ( [ ' controlsd ' ] )
def test_startup_alert ( self , expected_event , car_model , toggle_enabled , fw_versions ) :
# TODO: this should be done without any real sockets
controls_sock = messaging . sub_sock ( " controlsState " )
pm = messaging . PubMaster ( [ ' can ' , ' pandaState ' ] )
params = Params ( )
params . clear_all ( )
params . put_bool ( " Passive " , False )
params . put_bool ( " OpenpilotEnabledToggle " , True )
params . put_bool ( " CommunityFeaturesToggle " , toggle_enabled )
# Build capnn version of FW array
if fw_versions is not None :
car_fw = [ ]
cp = car . CarParams . new_message ( )
for ecu , addr , subaddress , version in fw_versions :
f = car . CarParams . CarFw . new_message ( )
f . ecu = ecu
f . address = addr
f . fwVersion = version
if subaddress is not None :
f . subAddress = subaddress
car_fw . append ( f )
cp . carVin = " 1 " * 17
cp . carFw = car_fw
params . put ( " CarParamsCache " , cp . to_bytes ( ) )
time . sleep ( 2 ) # wait for controlsd to be ready
msg = messaging . new_message ( ' pandaState ' )
msg . pandaState . pandaType = log . PandaState . PandaType . uno
pm . send ( ' pandaState ' , msg )
# fingerprint
if ( car_model is None ) or ( fw_versions is not None ) :
finger = { addr : 1 for addr in range ( 1 , 100 ) }
else :
finger = _FINGERPRINTS [ car_model ] [ 0 ]
for _ in range ( 500 ) :
msgs = [ [ addr , 0 , b ' \x00 ' * length , 0 ] for addr , length in finger . items ( ) ]
pm . send ( ' can ' , can_list_to_can_capnp ( msgs ) )
time . sleep ( 0.01 )
msgs = messaging . drain_sock ( controls_sock )
if len ( msgs ) :
event_name = msgs [ 0 ] . controlsState . alertType . split ( " / " ) [ 0 ]
self . assertEqual ( EVENT_NAME [ expected_event ] , event_name ,
f " expected { EVENT_NAME [ expected_event ] } for ' { car_model } ' , got { event_name } " )
break
else :
self . fail ( f " failed to fingerprint { car_model } " )
if __name__ == " __main__ " :
unittest . main ( )