openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
2.9 KiB

#!/usr/bin/env python3
import json
import random
import unittest
import time
import capnp
import cereal.messaging as messaging
from cereal.services import service_list
from common.params import Params
from selfdrive.manager.process_config import managed_processes
class TestLocationdProc(unittest.TestCase):
MAX_WAITS = 1000
Sensor events splitup (#25714) * PoC of reading sensors via interrupts instead of polling * add Gyro and draft for magn * add more functionality to gpio.cc * change LSM gyro to interrupt * resolve rebase conflict * update BMX accel interrupt impl * add interrupt collector thread to fetch in parallel * change get_event interface to return true on successful read * update BMX gyro interrupt impl * update gpio.h/.cc according to comments * address comments, rename Edgetype enum * Edgetype to EdgeType * update sensor interrupt interface * add error handling, and read fd on trigger * avoid sending empty messages * fix build * use gpiochip * less diff * gpiochip on both edges, but skip falling edge if rising edge is detected * init last_ts with 0 * update sensord testcases * update sensord testsweet * test for pipeline * readd with_process * add null check * move tests update to seperate PR * sensord: improve test coverage (#25683) * update sensord-interrupt testsweet * address review comments * inc stddev threshold * fix format string * add version 0 check again * relax strictness after c3 with bmx tests * relax strictness after tests Co-authored-by: Kurt Nistelberger <kurt.nistelberger@gmail.com> * address PR comments * fix typo * remove 4ms limit, and skip first 0.5sec of data * revert disable_interuppt change to destructor * fix and remove timing skip * make gpiochip generic * sensord port * change from sensorEvents to separated events * fix gyro usage * add splitted sensor tests * modify debug script sensor_data_to_hist.py * refactor get_event interface to remove sensorEvent message type * update locationd to non sensorEvent usage * tmp commit * fix replay * fix accelerometer type * fix sensor to hist debug script * update sensord tests to split events * remove rebase artifacts * port test_sensord.py * small clean up * change cereal to sensorEvents-splitup branch * upate sensorEvents in regen * fix route generation for splitted sensor events * regen cleanUp from sensorEvents change * . * remove light and temp from locationd * add generic init delay per sensor * . * update routes * move bmx gyro/accel to its own channel * adopt sensor tests to bmx channel * remove rebase artifacts * fix sensord test * handle bmx not present * add bmx sockets to regen * . * . * code cleanUp * . * address PR comments * address PR comments * address PR comments * lsm clean up * readd sensorEvents * rever regen.py * . * update replay refs * move channels * fix artifact * bump cereal * update refs * fix timing issue Co-authored-by: Bruce Wayne <batman@workstation-eu-intern2.eu.local> Co-authored-by: gast04 <kurt.nistelberger@gmail.com> Co-authored-by: Willem Melching <willem.melching@gmail.com> Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 29d3ed2ce63a65f793dc0ecb553180a0d0fba03e
3 years ago
LLD_MSGS = ['gpsLocationExternal', 'cameraOdometry', 'carState', 'liveCalibration',
'accelerometer', 'gyroscope', 'magnetometer']
def setUp(self):
random.seed(123489234)
self.pm = messaging.PubMaster(self.LLD_MSGS)
Params().put_bool("UbloxAvailable", True)
managed_processes['locationd'].prepare()
managed_processes['locationd'].start()
time.sleep(1)
def tearDown(self):
managed_processes['locationd'].stop()
def send_msg(self, msg):
self.pm.send(msg.which(), msg)
waits_left = self.MAX_WAITS
while waits_left and not self.pm.all_readers_updated(msg.which()):
time.sleep(0)
waits_left -= 1
time.sleep(0.0001)
def get_fake_msg(self, name, t):
try:
msg = messaging.new_message(name)
except capnp.lib.capnp.KjException:
msg = messaging.new_message(name, 0)
if name == "gpsLocationExternal":
msg.gpsLocationExternal.flags = 1
msg.gpsLocationExternal.verticalAccuracy = 1.0
msg.gpsLocationExternal.speedAccuracy = 1.0
msg.gpsLocationExternal.bearingAccuracyDeg = 1.0
msg.gpsLocationExternal.vNED = [0.0, 0.0, 0.0]
msg.gpsLocationExternal.latitude = self.lat
msg.gpsLocationExternal.longitude = self.lon
msg.gpsLocationExternal.altitude = self.alt
elif name == 'cameraOdometry':
msg.cameraOdometry.rot = [0.0, 0.0, 0.0]
msg.cameraOdometry.rotStd = [0.0, 0.0, 0.0]
msg.cameraOdometry.trans = [0.0, 0.0, 0.0]
msg.cameraOdometry.transStd = [0.0, 0.0, 0.0]
msg.logMonoTime = t
return msg
def test_params_gps(self):
# first reset params
Params().remove('LastGPSPosition')
self.lat = 30 + (random.random() * 10.0)
self.lon = -70 + (random.random() * 10.0)
self.alt = 5 + (random.random() * 10.0)
self.fake_duration = 90 # secs
# get fake messages at the correct frequency, listed in services.py
fake_msgs = []
for sec in range(self.fake_duration):
for name in self.LLD_MSGS:
for j in range(int(service_list[name].frequency)):
fake_msgs.append(self.get_fake_msg(name, int((sec + j / service_list[name].frequency) * 1e9)))
for fake_msg in sorted(fake_msgs, key=lambda x: x.logMonoTime):
self.send_msg(fake_msg)
time.sleep(1) # wait for async params write
lastGPS = json.loads(Params().get('LastGPSPosition'))
self.assertAlmostEqual(lastGPS['latitude'], self.lat, places=3)
self.assertAlmostEqual(lastGPS['longitude'], self.lon, places=3)
self.assertAlmostEqual(lastGPS['altitude'], self.alt, places=3)
if __name__ == "__main__":
unittest.main()