dragonpilot - 基於 openpilot 的開源駕駛輔助系統
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

140 lines
3.8 KiB

#!/usr/bin/env python3
import time
import unittest
import struct
import numpy as np
import cereal.messaging as messaging
import selfdrive.sensord.pigeond as pd
from system.hardware import TICI
from selfdrive.test.helpers import with_processes
def read_events(service, duration_sec):
service_sock = messaging.sub_sock(service, timeout=0.1)
start_time_sec = time.monotonic()
events = []
while time.monotonic() - start_time_sec < duration_sec:
events += messaging.drain_sock(service_sock)
time.sleep(0.1)
assert len(events) != 0, f"No '{service}'events collected!"
return events
def verify_ubloxgnss_data(socket: messaging.SubSocket):
start_time = 0
end_time = 0
events = messaging.drain_sock(socket)
assert len(events) != 0, "no ublxGnss measurements"
for event in events:
if event.ubloxGnss.which() != "measurementReport":
continue
if start_time == 0:
start_time = event.logMonoTime
if event.ubloxGnss.measurementReport.numMeas != 0:
end_time = event.logMonoTime
break
assert end_time != 0, "no ublox measurements received!"
ttfm = (end_time - start_time)/1e9
assert ttfm < 35, f"Time to first measurement > 35s, {ttfm}"
# check for satellite count in measurements
sat_count = []
end_id = events.index(event)# pylint:disable=undefined-loop-variable
for event in events[end_id:]:
if event.ubloxGnss.which() == "measurementReport":
sat_count.append(event.ubloxGnss.measurementReport.numMeas)
num_sat = int(sum(sat_count)/len(sat_count))
assert num_sat > 8, f"Not enough satellites {num_sat} (TestBox setup!)"
def verify_gps_location(socket: messaging.SubSocket):
buf_lon = [0]*10
buf_lat = [0]*10
buf_i = 0
events = messaging.drain_sock(socket)
assert len(events) != 0, "no gpsLocationExternal measurements"
start_time = events[0].logMonoTime
end_time = 0
for event in events:
buf_lon[buf_i % 10] = event.gpsLocationExternal.longitude
buf_lat[buf_i % 10] = event.gpsLocationExternal.latitude
buf_i += 1
if buf_i < 9:
continue
if any([lat == 0 or lon == 0 for lat,lon in zip(buf_lat, buf_lon)]):
continue
if np.std(buf_lon) < 1e-5 and np.std(buf_lat) < 1e-5:
end_time = event.logMonoTime
break
assert end_time != 0, "GPS location never converged!"
ttfl = (end_time - start_time)/1e9
assert ttfl < 40, f"Time to first location > 40s, {ttfl}"
hacc = events[-1].gpsLocationExternal.accuracy
vacc = events[-1].gpsLocationExternal.verticalAccuracy
assert hacc < 15, f"Horizontal accuracy too high, {hacc}"
assert vacc < 43, f"Vertical accuracy too high, {vacc}"
def verify_time_to_first_fix(pigeon):
# get time to first fix from nav status message
nav_status = b""
while True:
pigeon.send(b"\xb5\x62\x01\x03\x00\x00\x04\x0d")
nav_status = pigeon.receive()
if nav_status[:4] == b"\xb5\x62\x01\x03":
break
values = struct.unpack("<HHHIBBBBIIH", nav_status[:24])
ttff = values[8]/1000
# srms = values[9]/1000
assert ttff < 40, f"Time to first fix > 40s, {ttff}"
class TestGPS(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
def tearDown(self):
pd.set_power(False)
@with_processes(['ubloxd'])
def test_ublox_reset(self):
pigeon, pm = pd.create_pigeon()
pd.init_baudrate(pigeon)
assert pigeon.reset_device(), "Could not reset device!"
pd.initialize_pigeon(pigeon)
ugs = messaging.sub_sock("ubloxGnss", timeout=0.1)
gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1)
# receive some messages (restart after cold start takes up to 30seconds)
pd.run_receiving(pigeon, pm, 40)
verify_ubloxgnss_data(ugs)
verify_gps_location(gle)
# skip for now, this might hang for a while
#verify_time_to_first_fix(pigeon)
if __name__ == "__main__":
unittest.main()