* rm laika

* Rm laika

* Needed this

* More rm

* More rm
old-commit-hash: 56dea7b0b4
testing-closet
Harald Schäfer 2 years ago committed by GitHub
parent 3d602a86dc
commit 7c4d29c3d1
  1. 1
      release/files_common
  2. 468
      selfdrive/locationd/laikad.py
  3. 319
      selfdrive/locationd/test/test_laikad.py
  4. 1
      selfdrive/manager/process_config.py
  5. 4
      selfdrive/test/process_replay/README.md
  6. 21
      selfdrive/test/process_replay/process_replay.py
  7. 2
      selfdrive/test/process_replay/test_fuzzy.py
  8. 2
      selfdrive/test/profiling/profiler.py
  9. 1
      selfdrive/test/test_onroad.py
  10. 2
      selfdrive/test/update_ci_routes.py
  11. 19
      system/sensord/rawgps/test_rawgps.py
  12. 10
      tools/gpstest/fuzzy_testing.py
  13. 185
      tools/gpstest/rpc_server.py
  14. 105
      tools/gpstest/test_laikad.py

@ -226,7 +226,6 @@ system/ubloxd/*.cc
selfdrive/locationd/__init__.py
selfdrive/locationd/SConscript
selfdrive/locationd/.gitignore
selfdrive/locationd/laikad.py
selfdrive/locationd/locationd.h
selfdrive/locationd/locationd.cc
selfdrive/locationd/paramsd.py

@ -1,468 +0,0 @@
#!/usr/bin/env python3
import math
import os
import time
import shutil
from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor
from enum import IntEnum
from typing import List, Optional, Dict, Any
import numpy as np
from cereal import log, messaging
from openpilot.common.params import Params, put_nonblocking
from laika import AstroDog
from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.downloader import DownloadFailed
from laika.ephemeris import EphemerisType, GPSEphemeris, GLONASSEphemeris, ephemeris_structs, parse_qcom_ephem
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId, get_sv_id
from laika.raw_gnss import GNSSMeasurement, correct_measurements, process_measurements, read_raw_ublox
from laika.raw_gnss import gps_time_from_qcom_report, get_measurements_from_qcom_reports
from laika.opt import calc_pos_fix, get_posfix_sympy_fun, calc_vel_fix, get_velfix_sympy_func
from openpilot.selfdrive.locationd.models.constants import GENERATED_DIR, ObservationKind
from openpilot.selfdrive.locationd.models.gnss_kf import GNSSKalman
from openpilot.selfdrive.locationd.models.gnss_kf import States as GStates
from openpilot.system.hardware.hw import Paths
from openpilot.system.swaglog import cloudlog
MAX_TIME_GAP = 10
EPHEMERIS_CACHE = 'LaikadEphemerisV3'
CACHE_VERSION = 0.2
POS_FIX_RESIDUAL_THRESHOLD = 100.0
class LogEphemerisType(IntEnum):
nav = 0
nasaUltraRapid = 1
glonassIacUltraRapid = 2
qcom = 3
class EphemerisSource(IntEnum):
gnssChip = 0
internet = 1
cache = 2
unknown = 3
def get_log_eph_type(ephem):
if ephem.eph_type == EphemerisType.NAV:
source_type = LogEphemerisType.nav
elif ephem.eph_type == EphemerisType.QCOM_POLY:
source_type = LogEphemerisType.qcom
else:
assert ephem.file_epoch is not None
file_src = ephem.file_source
if file_src == 'igu': # example nasa: '2214/igu22144_00.sp3.Z'
source_type = LogEphemerisType.nasaUltraRapid
elif file_src == 'Sta': # example nasa: '22166/ultra/Stark_1D_22061518.sp3'
source_type = LogEphemerisType.glonassIacUltraRapid
else:
raise Exception(f"Didn't expect file source {file_src}")
return source_type
def get_log_eph_source(ephem):
if ephem.file_name == 'qcom' or ephem.file_name == 'ublox':
source = EphemerisSource.gnssChip
elif ephem.file_name == EPHEMERIS_CACHE:
source = EphemerisSource.cache
else:
source = EphemerisSource.internet
return source
class Laikad:
def __init__(self, valid_const=(ConstellationId.GPS, ConstellationId.GLONASS), auto_fetch_navs=True, auto_update=False,
valid_ephem_types=(EphemerisType.NAV, EphemerisType.QCOM_POLY),
save_ephemeris=False, use_qcom=False):
"""
valid_const: GNSS constellation which can be used
auto_fetch_navs: If true fetch navs from internet when needed
auto_update: If true download AstroDog will download all files needed. This can be ephemeris or correction data like ionosphere.
valid_ephem_types: Valid ephemeris types to be used by AstroDog
save_ephemeris: If true saves and loads nav and orbit ephemeris to cache.
"""
self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types,
clear_old_ephemeris=True, cache_dir=Paths.download_cache_root())
self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom)
self.auto_fetch_navs = auto_fetch_navs
self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None
self.orbit_fetch_future: Optional[Future] = None
self.last_report_time = GPSTime(0, 0)
self.last_fetch_navs_t = GPSTime(0, 0)
self.last_cached_t = GPSTime(0, 0)
self.save_ephemeris = save_ephemeris
self.load_cache()
self.posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)}
self.velfix_function = get_velfix_sympy_func()
self.last_fix_pos = None
self.last_fix_t = None
self.use_qcom = use_qcom
self.first_log_time = None
self.ttff = -1
self.measurement_lag = 0.630 if self.use_qcom else 0.095
# qcom specific stuff
self.qcom_reports_received = 4
self.qcom_reports = []
def load_cache(self):
if not self.save_ephemeris:
return
cache_bytes = Params().get(EPHEMERIS_CACHE)
if not cache_bytes:
return
nav_dict = {}
try:
with ephemeris_structs.EphemerisCache.from_bytes(cache_bytes) as ephem_cache:
glonass_navs = [GLONASSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.glonassEphemerides]
gps_navs = [GPSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.gpsEphemerides]
for e in sum([glonass_navs, gps_navs], []):
if e.prn not in nav_dict:
nav_dict[e.prn] = []
nav_dict[e.prn].append(e)
self.astro_dog.add_navs(nav_dict)
except Exception:
cloudlog.exception("Error parsing cache")
cloudlog.debug(
f"Loaded navs ({sum([len(nav_dict[prn]) for prn in nav_dict.keys()])}). Unique orbit and nav sats: {list(nav_dict.keys())} ")
def cache_ephemeris(self):
if self.save_ephemeris and (self.last_report_time - self.last_cached_t > SECS_IN_MIN):
nav_list: List = sum([v for k,v in self.astro_dog.navs.items()], [])
#TODO this only saves currently valid ephems, when we download future ephems we should save them too
valid_navs = [e for e in nav_list if e.valid(self.last_report_time)]
if len(valid_navs) > 0:
ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'],
gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G'])
put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes())
cloudlog.debug("Cache saved")
self.last_cached_t = self.last_report_time
def create_ephem_statuses(self):
ephemeris_statuses = []
eph_list: List = sum([v for k,v in self.astro_dog.navs.items()], []) + sum([v for k,v in self.astro_dog.qcom_polys.items()], [])
for eph in eph_list:
status = log.GnssMeasurements.EphemerisStatus.new_message()
status.constellationId = ConstellationId.from_rinex_char(eph.prn[0]).value
status.svId = get_sv_id(eph.prn)
status.type = get_log_eph_type(eph).value
status.source = get_log_eph_source(eph).value
status.tow = eph.epoch.tow
status.gpsWeek = eph.epoch.week
ephemeris_statuses.append(status)
return ephemeris_statuses
def get_lsq_fix(self, t, measurements):
if self.last_fix_t is None or abs(self.last_fix_t - t) > 0:
min_measurements = 5 if any(p.constellation_id == ConstellationId.GLONASS for p in measurements) else 4
position_solution, pr_residuals, pos_std = calc_pos_fix(measurements, self.posfix_functions, min_measurements=min_measurements)
if len(position_solution) < 3:
return None
position_estimate = position_solution[:3]
position_std = pos_std[:3]
velocity_solution, prr_residuals, vel_std = calc_vel_fix(measurements, position_estimate, self.velfix_function, min_measurements=min_measurements)
if len(velocity_solution) < 3:
return None
velocity_estimate = velocity_solution[:3]
velocity_std = vel_std[:3]
return position_estimate, position_std, velocity_estimate, velocity_std
def is_good_report(self, gnss_msg):
if gnss_msg.which() in ['drMeasurementReport', 'measurementReport'] and self.use_qcom:
# TODO: Understand and use remaining unknown constellations
try:
if gnss_msg.which() == 'drMeasurementReport':
constellation_id = ConstellationId.from_qcom_source(gnss_msg.drMeasurementReport.source)
else:
constellation_id = ConstellationId.from_qcom_source(gnss_msg.measurementReport.source)
good_constellation = constellation_id in [ConstellationId.GPS, ConstellationId.SBAS, ConstellationId.GLONASS]
report_time = gps_time_from_qcom_report(gnss_msg)
except NotImplementedError:
return False
# Garbage timestamps with week > 32767 are sometimes sent by module.
# This is an issue with gpsTime and GLONASS time.
good_week = report_time.week < np.iinfo(np.int16).max
return good_constellation and good_week
elif gnss_msg.which() == 'measurementReport' and not self.use_qcom:
return True
else:
return False
def read_report(self, gnss_msg):
if self.use_qcom:
# QCOM reports are per constellation, so we need to aggregate them
# Additionally, the pseudoranges are broken in the measurementReports
# and the doppler filteredSpeed is broken in the drMeasurementReports
report_time = gps_time_from_qcom_report(gnss_msg)
if report_time - self.last_report_time == 0:
self.qcom_reports.append(gnss_msg)
self.last_report_time = report_time
elif report_time - self.last_report_time > 0:
self.qcom_reports_received = max(1, len(self.qcom_reports))
self.qcom_reports = [gnss_msg]
self.last_report_time = report_time
else:
# Sometimes DR reports get sent one iteration late (1second), they need to be ignored
cloudlog.warning(f"Received report with time {report_time} before last report time {self.last_report_time}")
if len(self.qcom_reports) == self.qcom_reports_received:
new_meas = get_measurements_from_qcom_reports(self.qcom_reports)
else:
new_meas = []
else:
report = gnss_msg.measurementReport
self.last_report_time = GPSTime(report.gpsWeek, report.rcvTow)
new_meas = read_raw_ublox(report)
return self.last_report_time, new_meas
def is_ephemeris(self, gnss_msg):
if self.use_qcom:
return gnss_msg.which() == 'drSvPoly'
else:
return gnss_msg.which() in ('ephemeris', 'glonassEphemeris')
def read_ephemeris(self, gnss_msg):
if self.use_qcom:
try:
ephem = parse_qcom_ephem(gnss_msg.drSvPoly)
self.astro_dog.add_qcom_polys({ephem.prn: [ephem]})
except Exception:
cloudlog.exception("Error parsing qcom svPoly ephemeris from qcom module")
return
else:
if gnss_msg.which() == 'ephemeris':
data_struct = ephemeris_structs.Ephemeris.new_message(**gnss_msg.ephemeris.to_dict())
try:
ephem = GPSEphemeris(data_struct, file_name='ublox')
except Exception:
cloudlog.exception("Error parsing GPS ephemeris from ublox")
return
elif gnss_msg.which() == 'glonassEphemeris':
data_struct = ephemeris_structs.GlonassEphemeris.new_message(**gnss_msg.glonassEphemeris.to_dict())
try:
ephem = GLONASSEphemeris(data_struct, file_name='ublox')
except Exception:
cloudlog.exception("Error parsing GLONASS ephemeris from ublox")
return
else:
cloudlog.error(f"Unsupported ephemeris type: {gnss_msg.which()}")
return
self.astro_dog.add_navs({ephem.prn: [ephem]})
self.cache_ephemeris()
def process_report(self, new_meas, t):
# Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites
new_meas = [m for m in new_meas if 1e7 < m.observables['C1C'] < 3e7]
processed_measurements = process_measurements(new_meas, self.astro_dog)
if self.last_fix_pos is not None:
est_pos = self.last_fix_pos
correct_delay = True
else:
est_pos = self.gnss_kf.x[GStates.ECEF_POS].tolist()
correct_delay = False
corrected_measurements = correct_measurements(processed_measurements, est_pos, self.astro_dog, correct_delay=correct_delay)
# If many measurements weren't corrected, position may be garbage, so reset
if len(processed_measurements) >= 8 and len(corrected_measurements) < 5:
cloudlog.error("Didn't correct enough measurements, resetting estimate position")
self.last_fix_pos = None
self.last_fix_t = None
return corrected_measurements
def calc_fix(self, t, measurements):
instant_fix = self.get_lsq_fix(t, measurements)
if instant_fix is None:
return None
else:
position_estimate, position_std, velocity_estimate, velocity_std = instant_fix
self.last_fix_t = t
self.last_fix_pos = position_estimate
self.lat_fix_pos_std = position_std
return position_estimate, position_std, velocity_estimate, velocity_std
def process_gnss_msg(self, gnss_msg, gnss_mono_time: int, block=False):
out_msg = messaging.new_message("gnssMeasurements")
t = gnss_mono_time * 1e-9
msg_dict: Dict[str, Any] = {"measTime": gnss_mono_time - int(1e9 * self.measurement_lag)}
if self.first_log_time is None:
self.first_log_time = 1e-9 * gnss_mono_time
if self.is_ephemeris(gnss_msg):
self.read_ephemeris(gnss_msg)
elif self.is_good_report(gnss_msg):
report_t, new_meas = self.read_report(gnss_msg)
if report_t.week > 0:
if self.auto_fetch_navs:
self.fetch_navs(report_t, block)
corrected_measurements = self.process_report(new_meas, t)
msg_dict['correctedMeasurements'] = [create_measurement_msg(m) for m in corrected_measurements]
fix = self.calc_fix(t, corrected_measurements)
measurement_msg = log.LiveLocationKalman.Measurement.new_message
if fix is not None:
position_estimate, position_std, velocity_estimate, velocity_std = fix
if self.ttff <= 0:
self.ttff = max(1e-3, t - self.first_log_time)
msg_dict["positionECEF"] = measurement_msg(value=position_estimate, std=position_std.tolist(), valid=bool(self.last_fix_t == t))
msg_dict["velocityECEF"] = measurement_msg(value=velocity_estimate, std=velocity_std.tolist(), valid=bool(self.last_fix_t == t))
self.update_localizer(self.last_fix_pos, t, corrected_measurements)
P_diag = self.gnss_kf.P.diagonal()
kf_valid = all(self.kf_valid(t))
msg_dict["kalmanPositionECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_POS].tolist(),
std=np.sqrt(P_diag[GStates.ECEF_POS]).tolist(),
valid=kf_valid)
msg_dict["kalmanVelocityECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_VELOCITY].tolist(),
std=np.sqrt(P_diag[GStates.ECEF_VELOCITY]).tolist(),
valid=kf_valid)
msg_dict['gpsWeek'] = self.last_report_time.week
msg_dict['gpsTimeOfWeek'] = self.last_report_time.tow
msg_dict['timeToFirstFix'] = self.ttff
msg_dict['ephemerisStatuses'] = self.create_ephem_statuses()
out_msg.gnssMeasurements = msg_dict
return out_msg
def update_localizer(self, est_pos, t: float, measurements: List[GNSSMeasurement]):
# Check time and outputs are valid
valid = self.kf_valid(t)
if not all(valid):
if not valid[0]: # Filter not initialized
pass
elif not valid[1]:
cloudlog.error("Time gap of over 10s detected, gnss kalman reset")
elif not valid[2]:
cloudlog.error("Gnss kalman filter state is nan")
if est_pos is not None and len(est_pos) > 0:
cloudlog.info(f"Reset kalman filter with {est_pos}")
self.init_gnss_localizer(est_pos)
else:
return
if len(measurements) > 0:
kf_add_observations(self.gnss_kf, t, measurements)
else:
# Ensure gnss filter is updated even with no new measurements
self.gnss_kf.predict(t)
def kf_valid(self, t: float) -> List[bool]:
filter_time = self.gnss_kf.filter.get_filter_time()
return [not math.isnan(filter_time),
abs(t - filter_time) < MAX_TIME_GAP,
all(np.isfinite(self.gnss_kf.x[GStates.ECEF_POS]))]
def init_gnss_localizer(self, est_pos):
x_initial, p_initial_diag = np.copy(GNSSKalman.x_initial), np.copy(np.diagonal(GNSSKalman.P_initial))
x_initial[GStates.ECEF_POS] = est_pos
p_initial_diag[GStates.ECEF_POS] = 1000 ** 2
self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
def fetch_navs(self, t: GPSTime, block):
# Download new navs if 1 hour of navs data left
if t + SECS_IN_HR not in self.astro_dog.navs_fetched_times and (abs(t - self.last_fetch_navs_t) > SECS_IN_MIN):
astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types, self.astro_dog.cache_dir
ret = None
if block: # Used for testing purposes
ret = get_orbit_data(t, *astro_dog_vars)
elif self.orbit_fetch_future is None:
self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
elif self.orbit_fetch_future.done():
ret = self.orbit_fetch_future.result()
self.orbit_fetch_executor = self.orbit_fetch_future = None
if ret is not None:
if ret[0] is None:
self.last_fetch_navs_t = ret[2]
else:
self.astro_dog.navs, self.astro_dog.navs_fetched_times, self.last_fetch_navs_t = ret
self.cache_ephemeris()
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types, cache_dir):
astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, cache_dir=cache_dir)
cloudlog.info(f"Start to download/parse navs for time {t.as_datetime()}")
start_time = time.monotonic()
try:
astro_dog.get_navs(t)
cloudlog.info(f"Done parsing navs. Took {time.monotonic() - start_time:.1f}s")
cloudlog.debug(f"Downloaded navs ({sum([len(v) for v in astro_dog.navs])}): {list(astro_dog.navs.keys())}" +
f"With time range: {[f'{start.as_datetime()}, {end.as_datetime()}' for (start,end) in astro_dog.orbit_fetched_times._ranges]}")
return astro_dog.navs, astro_dog.navs_fetched_times, t
except (DownloadFailed, RuntimeError, ValueError, IOError) as e:
cloudlog.warning(f"No orbit data found or parsing failure: {e}")
return None, None, t
def create_measurement_msg(meas: GNSSMeasurement):
c = log.GnssMeasurements.CorrectedMeasurement.new_message()
c.constellationId = meas.constellation_id.value
c.svId = meas.sv_id
c.glonassFrequency = meas.glonass_freq if meas.constellation_id == ConstellationId.GLONASS else 0
c.pseudorange = float(meas.observables_final['C1C'])
c.pseudorangeStd = float(meas.observables_std['C1C'])
c.pseudorangeRate = float(meas.observables_final['D1C'])
c.pseudorangeRateStd = float(meas.observables_std['D1C'])
c.satPos = meas.sat_pos_final.tolist()
c.satVel = meas.sat_vel.tolist()
c.satVel = meas.sat_vel.tolist()
return c
def kf_add_observations(gnss_kf: GNSSKalman, t: float, measurements: List[GNSSMeasurement]):
ekf_data = defaultdict(list)
for m in measurements:
m_arr = m.as_array()
if m.constellation_id == ConstellationId.GPS:
ekf_data[ObservationKind.PSEUDORANGE_GPS].append(m_arr)
elif m.constellation_id == ConstellationId.GLONASS:
ekf_data[ObservationKind.PSEUDORANGE_GLONASS].append(m_arr)
ekf_data[ObservationKind.PSEUDORANGE_RATE_GPS] = ekf_data[ObservationKind.PSEUDORANGE_GPS]
ekf_data[ObservationKind.PSEUDORANGE_RATE_GLONASS] = ekf_data[ObservationKind.PSEUDORANGE_GLONASS]
for kind, data in ekf_data.items():
if len(data) > 0:
gnss_kf.predict_and_observe(t, kind, data)
def clear_tmp_cache():
if os.path.exists(Paths.download_cache_root()):
shutil.rmtree(Paths.download_cache_root())
os.mkdir(Paths.download_cache_root())
def main():
#clear_tmp_cache()
use_qcom = not Params().get_bool("UbloxAvailable")
if use_qcom:
raw_name = "qcomGnss"
else:
raw_name = "ubloxGnss"
raw_gnss_sock = messaging.sub_sock(raw_name, conflate=False)
pm = messaging.PubMaster(['gnssMeasurements'])
# disable until set as main gps source, to better analyze startup time
# TODO ensure low CPU usage before enabling
use_internet = False # "LAIKAD_NO_INTERNET" not in os.environ
replay = "REPLAY" in os.environ
laikad = Laikad(save_ephemeris=not replay, auto_fetch_navs=use_internet, use_qcom=use_qcom)
while True:
for in_msg in messaging.drain_sock(raw_gnss_sock, wait_for_one=True):
out_msg = laikad.process_gnss_msg(getattr(in_msg, raw_name), in_msg.logMonoTime, replay)
pm.send('gnssMeasurements', out_msg)
if __name__ == "__main__":
main()

@ -1,319 +0,0 @@
#!/usr/bin/env python3
import pytest
import time
import unittest
from cereal import log
from openpilot.common.params import Params
from datetime import datetime
from unittest import mock
from laika.constants import SECS_IN_DAY
from laika.downloader import DownloadFailed
from laika.ephemeris import EphemerisType
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
from laika.raw_gnss import GNSSMeasurement, read_raw_ublox, read_raw_qcom
from openpilot.selfdrive.locationd.laikad import EPHEMERIS_CACHE, Laikad
from openpilot.selfdrive.test.openpilotci import get_url
from openpilot.tools.lib.logreader import LogReader
from openpilot.selfdrive.test.process_replay.process_replay import get_process_config, replay_process
GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC = GPSTime.from_datetime(datetime(2022, month=1, day=29, hour=12))
UBLOX_TEST_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"
QCOM_TEST_ROUTE = "616dc83ca1f7f11e|2023-07-11--10-52-31"
def get_log_ublox():
logs = LogReader(get_url(UBLOX_TEST_ROUTE, 0))
ublox_cfg = get_process_config("ubloxd")
all_logs = replay_process(ublox_cfg, logs)
low_gnss = []
for m in all_logs:
if m.which() != "ubloxGnss" or m.ubloxGnss.which() != 'measurementReport':
continue
MAX_MEAS = 7
if m.ubloxGnss.measurementReport.numMeas > MAX_MEAS:
mb = log.Event.new_message(ubloxGnss=m.ubloxGnss.to_dict())
mb.logMonoTime = m.logMonoTime
mb.ubloxGnss.measurementReport.numMeas = MAX_MEAS
mb.ubloxGnss.measurementReport.measurements = list(m.ubloxGnss.measurementReport.measurements)[:MAX_MEAS]
mb.ubloxGnss.measurementReport.measurements[0].pseudorange += 1000
low_gnss.append(mb.as_reader())
else:
low_gnss.append(m)
return all_logs, low_gnss
def get_log_qcom():
logs = LogReader(get_url(QCOM_TEST_ROUTE, 0))
all_logs = [m for m in logs if m.which() == 'qcomGnss']
return all_logs
def verify_messages(lr, laikad, return_one_success=False):
good_msgs = []
for m in lr:
if m.which() == 'ubloxGnss':
gnss_msg = m.ubloxGnss
elif m.which() == 'qcomGnss':
gnss_msg = m.qcomGnss
else:
continue
msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=True)
if msg is not None and len(msg.gnssMeasurements.correctedMeasurements) > 0:
good_msgs.append(msg)
if return_one_success:
return msg
return good_msgs
def get_first_gps_time(logs):
for m in logs:
if m.which() == 'ubloxGnss':
if m.ubloxGnss.which == 'measurementReport':
new_meas = read_raw_ublox(m.ubloxGnss.measurementReport)
if len(new_meas) > 0:
return new_meas[0].recv_time
elif m.which() == "qcomGnss":
if m.qcomGnss.which == 'measurementReport':
new_meas = read_raw_qcom(m.qcomGnss.measurementReport)
if len(new_meas) > 0:
return new_meas[0].recv_time
def get_measurement_mock(gpstime, sat_ephemeris):
meas = GNSSMeasurement(ConstellationId.GPS, 1, gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.})
# Fake measurement being processed
meas.observables_final = meas.observables
meas.sat_ephemeris = sat_ephemeris
return meas
@pytest.mark.slow
class TestLaikad(unittest.TestCase):
@classmethod
def setUpClass(cls):
logs, low_gnss = get_log_ublox()
cls.logs = logs
cls.low_gnss = low_gnss
cls.logs_qcom = get_log_qcom()
first_gps_time = get_first_gps_time(logs)
cls.first_gps_time = first_gps_time
def setUp(self):
Params().remove(EPHEMERIS_CACHE)
def test_fetch_navs_non_blocking(self):
gpstime = GPSTime.from_datetime(datetime(2021, month=3, day=1))
laikad = Laikad()
laikad.fetch_navs(gpstime, block=False)
laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad:
laikad.fetch_navs(gpstime, block=False)
ephem = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
laikad.fetch_navs(gpstime+2*SECS_IN_DAY, block=False)
laikad.orbit_fetch_future.result(30)
# Get results and save orbits to laikad:
laikad.fetch_navs(gpstime + 2 * SECS_IN_DAY, block=False)
ephem2 = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
self.assertNotEqual(ephem, ephem2)
def test_fetch_navs_with_wrong_clocks(self):
laikad = Laikad()
def check_has_navs():
self.assertGreater(len(laikad.astro_dog.navs), 0)
ephem = laikad.astro_dog.navs['G01'][0]
self.assertIsNotNone(ephem)
real_current_time = GPSTime.from_datetime(datetime(2021, month=3, day=1))
wrong_future_clock_time = real_current_time + SECS_IN_DAY
laikad.fetch_navs(wrong_future_clock_time, block=True)
check_has_navs()
self.assertEqual(laikad.last_fetch_navs_t, wrong_future_clock_time)
# Test fetching orbits with earlier time
assert real_current_time < laikad.last_fetch_navs_t
laikad.astro_dog.orbits = {}
laikad.fetch_navs(real_current_time, block=True)
check_has_navs()
self.assertEqual(laikad.last_fetch_navs_t, real_current_time)
def test_laika_online(self):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT)
correct_msgs = verify_messages(self.logs, laikad)
correct_msgs_expected = 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(correct_msgs_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_kf_becomes_valid(self):
laikad = Laikad(auto_update=False)
m = self.logs[0]
self.assertFalse(all(laikad.kf_valid(m.logMonoTime * 1e-9)))
kf_valid = False
for m in self.logs:
if m.which() != 'ubloxGnss':
continue
laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
kf_valid = all(laikad.kf_valid(m.logMonoTime * 1e-9))
if kf_valid:
break
self.assertTrue(kf_valid)
def test_laika_online_nav_only(self):
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
laikad = Laikad(auto_update=True, valid_ephem_types=EphemerisType.NAV, use_qcom=use_qcom)
# Disable fetch_orbits to test NAV only
correct_msgs = verify_messages(logs, laikad)
correct_msgs_expected = 55 if use_qcom else 560
valid_fix_expected = 55 if use_qcom else 560
self.assertEqual(correct_msgs_expected, len(correct_msgs))
self.assertEqual(valid_fix_expected, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
@mock.patch('laika.downloader.download_and_cache_file')
def test_laika_offline(self, downloader_mock):
downloader_mock.side_effect = DownloadFailed("Mock download failed")
laikad = Laikad(auto_update=False)
laikad.fetch_navs(GPS_TIME_PREDICTION_ORBITS_RUSSIAN_SRC, block=True)
@mock.patch('laika.downloader.download_and_cache_file')
def test_download_failed_russian_source(self, downloader_mock):
downloader_mock.side_effect = DownloadFailed
laikad = Laikad(auto_update=False)
correct_msgs = verify_messages(self.logs, laikad)
expected_msgs = 376
self.assertEqual(expected_msgs, len(correct_msgs))
self.assertEqual(expected_msgs, len([m for m in correct_msgs if m.gnssMeasurements.positionECEF.valid]))
def test_laika_get_orbits(self):
laikad = Laikad(auto_update=False)
# Pretend process has loaded the orbits on startup by using the time of the first gps message.
laikad.fetch_navs(self.first_gps_time, block=True)
self.dict_has_values(laikad.astro_dog.navs)
@unittest.skip("Use to debug live data")
def test_laika_get_navs_now(self):
laikad = Laikad(auto_update=False)
laikad.fetch_navs(GPSTime.from_datetime(datetime.utcnow()), block=True)
prn = "G01"
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0)
prn = "R01"
self.assertGreater(len(laikad.astro_dog.navs[prn]), 0)
def test_get_navs_in_process(self):
for auto_fetch_navs in [True, False]:
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
laikad = Laikad(auto_update=False, use_qcom=use_qcom, auto_fetch_navs=auto_fetch_navs)
has_navs = False
has_fix = False
seen_chip_eph = False
seen_internet_eph = False
for m in logs:
if m.which() != 'ubloxGnss' and m.which() != 'qcomGnss':
continue
gnss_msg = m.qcomGnss if use_qcom else m.ubloxGnss
out_msg = laikad.process_gnss_msg(gnss_msg, m.logMonoTime, block=False)
if laikad.orbit_fetch_future is not None:
laikad.orbit_fetch_future.result()
vals = laikad.astro_dog.navs.values()
has_navs = len(vals) > 0 and max([len(v) for v in vals]) > 0
vals = laikad.astro_dog.qcom_polys.values()
has_polys = len(vals) > 0 and max([len(v) for v in vals]) > 0
has_fix = has_fix or out_msg.gnssMeasurements.positionECEF.valid
if len(out_msg.gnssMeasurements.ephemerisStatuses):
seen_chip_eph = seen_chip_eph or any(x.source == 'gnssChip' for x in out_msg.gnssMeasurements.ephemerisStatuses)
seen_internet_eph = seen_internet_eph or any(x.source == 'internet' for x in out_msg.gnssMeasurements.ephemerisStatuses)
self.assertTrue(has_navs or has_polys)
self.assertTrue(has_fix)
self.assertTrue(seen_chip_eph or auto_fetch_navs)
self.assertTrue(seen_internet_eph or not auto_fetch_navs)
self.assertEqual(len(laikad.astro_dog.navs_fetched_times._ranges), 0)
self.assertEqual(None, laikad.orbit_fetch_future)
def test_cache(self):
use_qcom = True
for use_qcom, logs in zip([True, False], [self.logs_qcom, self.logs], strict=True):
Params().remove(EPHEMERIS_CACHE)
laikad = Laikad(auto_update=True, save_ephemeris=True, use_qcom=use_qcom)
def wait_for_cache():
max_time = 2
while Params().get(EPHEMERIS_CACHE) is None:
time.sleep(0.1)
max_time -= 0.1
if max_time < 0:
self.fail("Cache has not been written after 2 seconds")
# Test cache with no ephemeris
laikad.last_report_time = GPSTime(1,0)
laikad.cache_ephemeris()
if Params().get(EPHEMERIS_CACHE) is not None:
self.fail("Cache should not have been written without valid ephem")
#laikad.astro_dog.get_navs(self.first_gps_time)
msg = verify_messages(logs, laikad, return_one_success=True)
laikad.cache_ephemeris()
wait_for_cache()
# Check both nav and orbits separate
laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.NAV,
save_ephemeris=True, use_qcom=use_qcom, auto_fetch_navs=False)
# Verify navs are loaded from cache
self.dict_has_values(laikad.astro_dog.navs)
# Verify cache is working for only nav by running a segment
msg = verify_messages(logs, laikad, return_one_success=True)
self.assertTrue(len(msg.gnssMeasurements.ephemerisStatuses))
self.assertTrue(any(x.source=='cache' for x in msg.gnssMeasurements.ephemerisStatuses))
self.assertIsNotNone(msg)
#TODO test cache with only orbits
#with patch('selfdrive.locationd.laikad.get_orbit_data', return_value=None) as mock_method:
# # Verify no orbit downloads even if orbit fetch times is reset since the cache has recently been saved and we don't want to download high frequently
# laikad.astro_dog.orbit_fetched_times = TimeRangeHolder()
# laikad.fetch_navs(self.first_gps_time, block=False)
# mock_method.assert_not_called()
# # Verify cache is working for only orbits by running a segment
# laikad = Laikad(auto_update=False, valid_ephem_types=EphemerisType.ULTRA_RAPID_ORBIT, save_ephemeris=True)
# msg = verify_messages(self.logs, laikad, return_one_success=True)
# self.assertIsNotNone(msg)
# # Verify orbit data is not downloaded
# mock_method.assert_not_called()
#break
def test_low_gnss_meas(self):
cnt = 0
laikad = Laikad()
for m in self.low_gnss:
msg = laikad.process_gnss_msg(m.ubloxGnss, m.logMonoTime, block=True)
if msg is None:
continue
gm = msg.gnssMeasurements
if len(gm.correctedMeasurements) != 0 and gm.positionECEF.valid:
cnt += 1
self.assertEqual(cnt, 560)
def dict_has_values(self, dct):
self.assertGreater(len(dct), 0)
self.assertGreater(min([len(v) for v in dct.values()]), 0)
if __name__ == "__main__":
unittest.main()

@ -68,7 +68,6 @@ procs = [
PythonProcess("controlsd", "selfdrive.controls.controlsd", only_onroad),
PythonProcess("deleter", "system.loggerd.deleter", always_run),
PythonProcess("dmonitoringd", "selfdrive.monitoring.dmonitoringd", driverview, enabled=(not PC or WEBCAM)),
PythonProcess("laikad", "selfdrive.locationd.laikad", only_onroad),
PythonProcess("rawgpsd", "system.sensord.rawgps.rawgpsd", qcomgps, enabled=TICI),
PythonProcess("navd", "selfdrive.navd.navd", only_onroad),
PythonProcess("pandad", "selfdrive.boardd.pandad", always_run),

@ -17,7 +17,6 @@ Currently the following processes are tested:
* locationd
* paramsd
* ubloxd
* laikad
* torqued
### Usage
@ -71,7 +70,7 @@ lr = LogReader(...)
output_logs = replay_process_with_name('locationd', lr)
# or list of names
output_logs = replay_process_with_name(['ubloxd', 'locationd', 'laikad'], lr)
output_logs = replay_process_with_name(['ubloxd', 'locationd'], lr)
```
Supported processes:
@ -83,7 +82,6 @@ Supported processes:
* locationd
* paramsd
* ubloxd
* laikad
* torqued
* modeld
* dmonitoringmodeld

@ -448,16 +448,6 @@ def controlsd_config_callback(params, cfg, lr):
params.put("ReplayControlsState", controlsState.as_builder().to_bytes())
def laikad_config_pubsub_callback(params, cfg, lr):
ublox = params.get_bool("UbloxAvailable")
main_key = "ubloxGnss" if ublox else "qcomGnss"
sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", })
cfg.pubs = set(cfg.pubs) - sub_keys
cfg.main_pub = main_key
cfg.main_pub_drained = True
def locationd_config_pubsub_callback(params, cfg, lr):
ublox = params.get_bool("UbloxAvailable")
sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", })
@ -543,17 +533,6 @@ CONFIGS = [
subs=["ubloxGnss", "gpsLocationExternal"],
ignore=["logMonoTime"],
),
ProcessConfig(
proc_name="laikad",
pubs=["ubloxGnss", "qcomGnss"],
subs=["gnssMeasurements"],
ignore=["logMonoTime"],
config_callback=laikad_config_pubsub_callback,
tolerance=NUMPY_TOLERANCE,
processing_time=0.002,
timeout=60*10, # first messages are blocked on internet assistance
main_pub="ubloxGnss", # config_callback will switch this to qcom if needed
),
ProcessConfig(
proc_name="torqued",
pubs=["liveLocationKalman", "carState", "carControl"],

@ -13,7 +13,7 @@ import openpilot.selfdrive.test.process_replay.process_replay as pr
# These processes currently fail because of unrealistic data breaking assumptions
# that openpilot makes causing error with NaN, inf, int size, array indexing ...
# TODO: Make each one testable
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'laikad', 'dmonitoringmodeld', 'modeld']
NOT_TESTED = ['controlsd', 'plannerd', 'calibrationd', 'dmonitoringd', 'paramsd', 'dmonitoringmodeld', 'modeld']
TEST_CASES = [(cfg.proc_name, copy.deepcopy(cfg)) for cfg in pr.CONFIGS if cfg.proc_name not in NOT_TESTED]
class TestFuzzProcesses(unittest.TestCase):

@ -83,14 +83,12 @@ if __name__ == '__main__':
from openpilot.selfdrive.controls.radard import radard_thread
from openpilot.selfdrive.locationd.paramsd import main as paramsd_thread
from openpilot.selfdrive.controls.plannerd import main as plannerd_thread
from openpilot.selfdrive.locationd.laikad import main as laikad_thread
procs = {
'radard': radard_thread,
'controlsd': controlsd_thread,
'paramsd': paramsd_thread,
'plannerd': plannerd_thread,
'laikad': laikad_thread,
}
proc = sys.argv[1]

@ -56,7 +56,6 @@ PROCS = {
"selfdrive.navd.navd": 0.4,
"system.loggerd.uploader": 3.0,
"system.loggerd.deleter": 0.1,
"selfdrive.locationd.laikad": (1.0, 80.0), # TODO: better GPS setup in testing closet
}
PROCS.update({

@ -8,7 +8,6 @@ from azure.storage.blob import ContainerClient
from tqdm import tqdm
from openpilot.selfdrive.car.tests.routes import routes as test_car_models_routes
from openpilot.selfdrive.locationd.test.test_laikad import UBLOX_TEST_ROUTE, QCOM_TEST_ROUTE
from openpilot.selfdrive.test.process_replay.test_processes import source_segments as replay_segments
from openpilot.selfdrive.test.openpilotci import (DATA_CI_ACCOUNT, DATA_CI_ACCOUNT_URL, DATA_CI_CONTAINER,
get_azure_credential, get_container_sas)
@ -90,7 +89,6 @@ if __name__ == "__main__":
if not len(to_sync):
# sync routes from the car tests routes and process replay
to_sync.extend([UBLOX_TEST_ROUTE, QCOM_TEST_ROUTE])
to_sync.extend([rt.route for rt in test_car_models_routes])
to_sync.extend([s[1].rsplit('--', 1)[0] for s in replay_segments])

@ -5,13 +5,11 @@ import time
import datetime
import unittest
import subprocess
import numpy as np
import cereal.messaging as messaging
from openpilot.system.hardware import TICI
from openpilot.system.sensord.rawgps.rawgpsd import at_cmd, wait_for_modem
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.common.transformations.coordinates import ecef_from_geodetic
GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0')))
@ -123,22 +121,5 @@ class TestRawgpsd(unittest.TestCase):
managed_processes['rawgpsd'].stop()
self.check_assistance(True)
@unittest.skipIf(not GOOD_SIGNAL, "No good GPS signal")
def test_fix(self):
managed_processes['rawgpsd'].start()
managed_processes['laikad'].start()
assert self._wait_for_output(60)
assert self.sm.updated['qcomGnss']
assert self.sm.updated['gpsLocation']
assert self.sm['gpsLocation'].flags == 1
module_fix = ecef_from_geodetic([self.sm['gpsLocation'].latitude,
self.sm['gpsLocation'].longitude,
self.sm['gpsLocation'].altitude])
assert self.sm['gnssMeasurements'].positionECEF.valid
total_diff = np.array(self.sm['gnssMeasurements'].positionECEF.value) - module_fix
self.assertLess(np.linalg.norm(total_diff), 100)
managed_processes['laikad'].stop()
managed_processes['rawgpsd'].stop()
if __name__ == "__main__":
unittest.main(failfast=True)

@ -33,8 +33,7 @@ def run_remote_checker(lat, lon, alt, duration, ip_addr):
return False, None, None
matched, log, info = con.root.exposed_run_checker(lat, lon, alt,
timeout=duration,
use_laikad=True)
timeout=duration)
con.close() # TODO: might wanna fetch more logs here
con = None
@ -43,7 +42,7 @@ def run_remote_checker(lat, lon, alt, duration, ip_addr):
stats = defaultdict(int) # type: ignore
keys = ['success', 'failed', 'ublox_fail', 'laikad_fail', 'proc_crash', 'checker_crash']
keys = ['success', 'failed', 'ublox_fail', 'proc_crash', 'checker_crash']
def print_report():
print("\nFuzzy testing report summary:")
@ -62,10 +61,7 @@ def update_stats(matched, log, info):
if log == "CHECKER CRASHED":
stats['checker_crash'] += 1
if log == "TIMEOUT":
if "LAIKAD" in info:
stats['laikad_fail'] += 1
else: # "UBLOX" in info
stats['ublox_fail'] += 1
stats['ublox_fail'] += 1
def main(ip_addr, continuous_mode, timeout, pos):

@ -1,185 +0,0 @@
import os
import time
import shutil
from datetime import datetime
from collections import defaultdict
from openpilot.system.hardware.hw import Paths
import rpyc
from rpyc.utils.server import ThreadedServer
#from openpilot.common.params import Params
import cereal.messaging as messaging
from openpilot.selfdrive.manager.process_config import managed_processes
from laika.lib.coordinates import ecef2geodetic
DELTA = 0.001
ALT_DELTA = 30
MATCH_NUM = 10
REPORT_STATS = 10
EPHEM_CACHE = "/data/params/d/LaikadEphemerisV3"
SERVER_LOG_FILE = "/tmp/fuzzy_server.log"
server_log = open(SERVER_LOG_FILE, "w+")
def slog(msg):
server_log.write(f"{datetime.now().strftime('%H:%M:%S.%f')} | {msg}\n")
server_log.flush()
def handle_laikad(msg):
if not hasattr(msg, 'correctedMeasurements'):
return None
num_corr = len(msg.correctedMeasurements)
pos_ecef = msg.positionECEF.value
pos_geo = []
if len(pos_ecef) > 0:
pos_geo = ecef2geodetic(pos_ecef)
pos_std = msg.positionECEF.std
pos_valid = msg.positionECEF.valid
slog(f"{num_corr} {pos_geo} {pos_ecef} {pos_std} {pos_valid}")
return pos_geo, (num_corr, pos_geo, list(pos_ecef), list(msg.positionECEF.std))
hw_msgs = 0
ephem_msgs: dict = defaultdict(int)
def handle_ublox(msg):
global hw_msgs
d = msg.to_dict()
if 'hwStatus2' in d:
hw_msgs += 1
if 'ephemeris' in d:
ephem_msgs[msg.ephemeris.svId] += 1
num_meas = None
if 'measurementReport' in d:
num_meas = msg.measurementReport.numMeas
return [hw_msgs, ephem_msgs, num_meas]
def start_procs(procs):
for p in procs:
managed_processes[p].start()
time.sleep(1)
def kill_procs(procs, no_retry=False):
for p in procs:
managed_processes[p].stop()
time.sleep(1)
if not no_retry:
for p in procs:
mp = managed_processes[p].proc
if mp is not None and mp.is_alive():
managed_processes[p].stop()
time.sleep(3)
def check_alive_procs(procs):
for p in procs:
mp = managed_processes[p].proc
if mp is None or not mp.is_alive():
return False, p
return True, None
class RemoteCheckerService(rpyc.Service):
def on_connect(self, conn):
pass
def on_disconnect(self, conn):
#kill_procs(self.procs, no_retry=False)
# this execution is delayed, it will kill the next run of laikad
# TODO: add polling to wait for everything is killed
pass
def run_checker(self, slat, slon, salt, sockets, procs, timeout):
global hw_msgs, ephem_msgs
hw_msgs = 0
ephem_msgs = defaultdict(int)
slog(f"Run test: {slat} {slon} {salt}")
# quectel_mod = Params().get_bool("UbloxAvailable")
match_cnt = 0
msg_cnt = 0
stats_laikad = []
stats_ublox = []
self.procs = procs
start_procs(procs)
sm = messaging.SubMaster(sockets)
start_time = time.monotonic()
while True:
sm.update()
if sm.updated['ubloxGnss']:
stats_ublox.append(handle_ublox(sm['ubloxGnss']))
if sm.updated['gnssMeasurements']:
pos_geo, stats = handle_laikad(sm['gnssMeasurements'])
if pos_geo is None or len(pos_geo) == 0:
continue
match = all(abs(g-s) < DELTA for g,s in zip(pos_geo[:2], [slat, slon], strict=True))
match &= abs(pos_geo[2] - salt) < ALT_DELTA
if match:
match_cnt += 1
if match_cnt >= MATCH_NUM:
return True, "MATCH", f"After: {round(time.monotonic() - start_time, 4)}"
# keep some stats for error reporting
stats_laikad.append(stats)
if (msg_cnt % 10) == 0:
a, p = check_alive_procs(procs)
if not a:
return False, "PROC CRASH", f"{p}"
msg_cnt += 1
if (time.monotonic() - start_time) > timeout:
h = f"LAIKAD: {stats_laikad[-REPORT_STATS:]}"
if len(h) == 0:
h = f"UBLOX: {stats_ublox[-REPORT_STATS:]}"
return False, "TIMEOUT", h
def exposed_run_checker(self, slat, slon, salt, timeout=180, use_laikad=True):
try:
procs = []
sockets = []
if use_laikad:
procs.append("laikad") # pigeond, ubloxd # might wanna keep them running
sockets += ['ubloxGnss', 'gnssMeasurements']
if os.path.exists(EPHEM_CACHE):
os.remove(EPHEM_CACHE)
shutil.rmtree(Paths.download_cache_root(), ignore_errors=True)
ret = self.run_checker(slat, slon, salt, sockets, procs, timeout)
kill_procs(procs)
return ret
except Exception as e:
# always make sure processes get killed
kill_procs(procs)
return False, "CHECKER CRASHED", f"{str(e)}"
def exposed_kill_procs(self):
kill_procs(self.procs, no_retry=True)
if __name__ == "__main__":
print(f"Sever Log written to: {SERVER_LOG_FILE}")
t = ThreadedServer(RemoteCheckerService, port=18861)
t.start()

@ -1,105 +0,0 @@
#!/usr/bin/env python3
import os
import time
import unittest
import cereal.messaging as messaging
import openpilot.system.sensord.pigeond as pd
from openpilot.common.params import Params
from openpilot.system.hardware import TICI
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.selfdrive.test.helpers import with_processes
def wait_for_location(sm, timeout, con=10):
cons_meas = 0
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout:
sm.update()
if not sm.updated["gnssMeasurements"]:
continue
msg = sm["gnssMeasurements"]
cons_meas = (cons_meas + 1) if 'positionECEF' in msg.to_dict() else 0
if cons_meas >= con:
return True
return False
class TestLaikad(unittest.TestCase):
@classmethod
def setUpClass(self):
if not TICI:
raise unittest.SkipTest
ublox_available = Params().get_bool("UbloxAvailable")
if not ublox_available:
raise unittest.SkipTest
def setUp(self):
# ensure laikad cold start
Params().remove("LaikadEphemerisV3")
os.environ["LAIKAD_NO_INTERNET"] = "1"
managed_processes['laikad'].start()
def tearDown(self):
managed_processes['laikad'].stop()
@with_processes(['pigeond', 'ubloxd'])
def test_laikad_cold_start(self):
time.sleep(5)
start_time = time.monotonic()
sm = messaging.SubMaster(["gnssMeasurements"])
success = wait_for_location(sm, 60*2, con=10)
duration = time.monotonic() - start_time
assert success, "Waiting for location timed out (2min)!"
assert duration < 60, f"Received Location {duration}!"
@with_processes(['ubloxd'])
def test_laikad_ublox_reset_start(self):
time.sleep(2)
pigeon, pm = pd.create_pigeon()
pd.init_baudrate(pigeon)
assert pigeon.reset_device(), "Could not reset device!"
laikad_sock = messaging.sub_sock("gnssMeasurements", timeout=0.1)
ublox_gnss_sock = messaging.sub_sock("ubloxGnss", timeout=0.1)
pd.init_baudrate(pigeon)
pd.initialize_pigeon(pigeon)
pd.run_receiving(pigeon, pm, 180)
ublox_msgs = messaging.drain_sock(ublox_gnss_sock)
laikad_msgs = messaging.drain_sock(laikad_sock)
gps_ephem_cnt = 0
glonass_ephem_cnt = 0
for um in ublox_msgs:
if um.ubloxGnss.which() == 'ephemeris':
gps_ephem_cnt += 1
elif um.ubloxGnss.which() == 'glonassEphemeris':
glonass_ephem_cnt += 1
assert gps_ephem_cnt > 0, "NO gps ephemeris collected!"
assert glonass_ephem_cnt > 0, "NO glonass ephemeris collected!"
pos_meas = 0
duration = -1
for lm in laikad_msgs:
pos_meas = (pos_meas + 1) if 'positionECEF' in lm.gnssMeasurements.to_dict() else 0
if pos_meas > 5:
duration = (lm.logMonoTime - laikad_msgs[0].logMonoTime)*1e-9
break
assert pos_meas > 5, "NOT enough positions at end of read!"
assert duration < 120, "Laikad took too long to get a Position!"
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save