#!/usr/bin/env python3
import time
import unittest
import struct
from common . params import Params
import cereal . messaging as messaging
import system . sensord . pigeond as pd
from system . hardware import TICI
from selfdrive . test . helpers import with_processes
def read_events ( service , duration_sec ) :
service_sock = messaging . sub_sock ( service , timeout = 0.1 )
start_time_sec = time . monotonic ( )
events = [ ]
while time . monotonic ( ) - start_time_sec < duration_sec :
events + = messaging . drain_sock ( service_sock )
time . sleep ( 0.1 )
assert len ( events ) != 0 , f " No ' { service } ' events collected! "
return events
def create_backup ( pigeon ) :
# controlled GNSS stop
pigeon . send ( b " \xB5 \x62 \x06 \x04 \x04 \x00 \x00 \x00 \x08 \x00 \x16 \x74 " )
# store almanac in flash
pigeon . send ( b " \xB5 \x62 \x09 \x14 \x04 \x00 \x00 \x00 \x00 \x00 \x21 \xEC " )
try :
if not pigeon . wait_for_ack ( ack = pd . UBLOX_SOS_ACK , nack = pd . UBLOX_SOS_NACK ) :
assert False , " Could not store almanac "
except TimeoutError :
pass
def verify_ubloxgnss_data ( socket : messaging . SubSocket , max_time : int ) :
start_time = 0
end_time = 0
events = messaging . drain_sock ( socket )
assert len ( events ) != 0 , " no ublxGnss measurements "
for event in events :
if event . ubloxGnss . which ( ) != " measurementReport " :
continue
if start_time == 0 :
start_time = event . logMonoTime
if event . ubloxGnss . measurementReport . numMeas != 0 :
end_time = event . logMonoTime
break
assert end_time != 0 , " no ublox measurements received! "
ttfm = ( end_time - start_time ) / 1e9
assert ttfm < max_time , f " Time to first measurement > { max_time } s, { ttfm } "
# check for satellite count in measurements
sat_count = [ ]
end_id = events . index ( event ) # pylint:disable=undefined-loop-variable
for event in events [ end_id : ] :
if event . ubloxGnss . which ( ) == " measurementReport " :
sat_count . append ( event . ubloxGnss . measurementReport . numMeas )
num_sat = int ( sum ( sat_count ) / len ( sat_count ) )
assert num_sat > = 5 , f " Not enough satellites { num_sat } (TestBox setup!) "
def verify_gps_location ( socket : messaging . SubSocket , max_time : int ) :
events = messaging . drain_sock ( socket )
assert len ( events ) != 0 , " no gpsLocationExternal measurements "
start_time = events [ 0 ] . logMonoTime
end_time = 0
for event in events :
gps_valid = event . gpsLocationExternal . flags % 2
if gps_valid :
end_time = event . logMonoTime
break
assert end_time != 0 , " GPS location never converged! "
ttfl = ( end_time - start_time ) / 1e9
assert ttfl < max_time , f " Time to first location > { max_time } s, { ttfl } "
hacc = events [ - 1 ] . gpsLocationExternal . accuracy
vacc = events [ - 1 ] . gpsLocationExternal . verticalAccuracy
assert hacc < 20 , f " Horizontal accuracy too high, { hacc } "
assert vacc < 45 , f " Vertical accuracy too high, { vacc } "
def verify_time_to_first_fix ( pigeon ) :
# get time to first fix from nav status message
nav_status = b " "
while True :
pigeon . send ( b " \xb5 \x62 \x01 \x03 \x00 \x00 \x04 \x0d " )
nav_status = pigeon . receive ( )
if nav_status [ : 4 ] == b " \xb5 \x62 \x01 \x03 " :
break
values = struct . unpack ( " <HHHIBBBBIIH " , nav_status [ : 24 ] )
ttff = values [ 8 ] / 1000
# srms = values[9]/1000
assert ttff < 40 , f " Time to first fix > 40s, { ttff } "
class TestGPS ( unittest . TestCase ) :
@classmethod
def setUpClass ( cls ) :
if not TICI :
raise unittest . SkipTest
ublox_available = Params ( ) . get_bool ( " UbloxAvailable " )
if not ublox_available :
raise unittest . SkipTest
def tearDown ( self ) :
pd . set_power ( False )
@with_processes ( [ ' ubloxd ' ] )
def test_a_ublox_reset ( self ) :
pigeon , pm = pd . create_pigeon ( )
pd . init_baudrate ( pigeon )
assert pigeon . reset_device ( ) , " Could not reset device! "
pd . initialize_pigeon ( pigeon )
ugs = messaging . sub_sock ( " ubloxGnss " , timeout = 0.1 )
gle = messaging . sub_sock ( " gpsLocationExternal " , timeout = 0.1 )
# receive some messages (restart after cold start takes up to 30seconds)
pd . run_receiving ( pigeon , pm , 60 )
# store almanac for next test
create_backup ( pigeon )
verify_ubloxgnss_data ( ugs , 60 )
verify_gps_location ( gle , 60 )
# skip for now, this might hang for a while
#verify_time_to_first_fix(pigeon)
@with_processes ( [ ' ubloxd ' ] )
def test_b_ublox_almanac ( self ) :
pigeon , pm = pd . create_pigeon ( )
pd . init_baudrate ( pigeon )
# device cold start
pigeon . send ( b " \xb5 \x62 \x06 \x04 \x04 \x00 \xff \xff \x00 \x00 \x0c \x5d " )
time . sleep ( 1 ) # wait for cold start
pd . init_baudrate ( pigeon )
# clear configuration
pigeon . send_with_ack ( b " \xb5 \x62 \x06 \x09 \x0d \x00 \x00 \x00 \x1f \x1f \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x00 \x17 \x71 \x5b " )
# restoring almanac backup
pigeon . send ( b " \xB5 \x62 \x09 \x14 \x00 \x00 \x1D \x60 " )
status = pigeon . wait_for_backup_restore_status ( )
assert status == 2 , " Could not restore almanac backup "
pd . initialize_pigeon ( pigeon )
ugs = messaging . sub_sock ( " ubloxGnss " , timeout = 0.1 )
gle = messaging . sub_sock ( " gpsLocationExternal " , timeout = 0.1 )
pd . run_receiving ( pigeon , pm , 15 )
verify_ubloxgnss_data ( ugs , 15 )
verify_gps_location ( gle , 20 )
@with_processes ( [ ' ubloxd ' ] )
def test_c_ublox_startup ( self ) :
pigeon , pm = pd . create_pigeon ( )
pd . init_baudrate ( pigeon )
pd . initialize_pigeon ( pigeon )
ugs = messaging . sub_sock ( " ubloxGnss " , timeout = 0.1 )
gle = messaging . sub_sock ( " gpsLocationExternal " , timeout = 0.1 )
pd . run_receiving ( pigeon , pm , 10 )
verify_ubloxgnss_data ( ugs , 10 )
verify_gps_location ( gle , 10 )
if __name__ == " __main__ " :
unittest . main ( )