Rm more laika references (#30451)
	
		
	
				
					
				
			Rm more laikad references
old-commit-hash: 95c6d5140a
			
			
				testing-closet
			
			
		
							parent
							
								
									3ef2662007
								
							
						
					
					
						commit
						9bf8ee6c86
					
				
				 18 changed files with 0 additions and 960 deletions
			
			
		| @ -1,66 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import cereal.messaging as messaging | ||||
| from laika import constants | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   sm = messaging.SubMaster(['ubloxGnss', 'qcomGnss']) | ||||
| 
 | ||||
|   meas = None | ||||
|   while 1: | ||||
|     sm.update() | ||||
|     if sm['ubloxGnss'].which() == "measurementReport": | ||||
|       meas = sm['ubloxGnss'].measurementReport.measurements | ||||
|     if not sm.updated['qcomGnss'] or meas is None: | ||||
|       continue | ||||
|     report = sm['qcomGnss'].measurementReport | ||||
|     if report.source not in [0, 1]: | ||||
|       continue | ||||
|     GLONASS = report.source == 1 | ||||
|     recv_time = report.milliseconds / 1000 | ||||
| 
 | ||||
|     car = [] | ||||
|     print("qcom has ", sorted([x.svId for x in report.sv])) | ||||
|     print("ublox has", sorted([x.svId for x in meas if x.gnssId == (6 if GLONASS else 0)])) | ||||
|     for i in report.sv: | ||||
|       # match to ublox | ||||
|       tm = None | ||||
|       for m in meas: | ||||
|         if i.svId == m.svId and m.gnssId == 0 and m.sigId == 0 and not GLONASS: | ||||
|           tm = m | ||||
|         if (i.svId-64) == m.svId and m.gnssId == 6 and m.sigId == 0 and GLONASS: | ||||
|           tm = m | ||||
|       if tm is None: | ||||
|         continue | ||||
| 
 | ||||
|       if not i.measurementStatus.measurementNotUsable and i.measurementStatus.satelliteTimeIsKnown: | ||||
|         sat_time = (i.unfilteredMeasurementIntegral + i.unfilteredMeasurementFraction + i.latency) / 1000 | ||||
|         ublox_psuedorange = tm.pseudorange | ||||
|         qcom_psuedorange = (recv_time - sat_time)*constants.SPEED_OF_LIGHT | ||||
|         if GLONASS: | ||||
|           glonass_freq = tm.glonassFrequencyIndex - 7 | ||||
|           ublox_speed = -(constants.SPEED_OF_LIGHT / (constants.GLONASS_L1 + glonass_freq*constants.GLONASS_L1_DELTA)) * (tm.doppler) | ||||
|         else: | ||||
|           ublox_speed = -(constants.SPEED_OF_LIGHT / constants.GPS_L1) * tm.doppler | ||||
|         qcom_speed = i.unfilteredSpeed | ||||
|         car.append((i.svId, tm.pseudorange, ublox_speed, qcom_psuedorange, qcom_speed, tm.cno)) | ||||
| 
 | ||||
|     if len(car) == 0: | ||||
|       print("nothing to compare") | ||||
|       continue | ||||
| 
 | ||||
|     pr_err, speed_err = 0., 0. | ||||
|     for c in car: | ||||
|       ublox_psuedorange, ublox_speed, qcom_psuedorange, qcom_speed = c[1:5] | ||||
|       pr_err += ublox_psuedorange - qcom_psuedorange | ||||
|       speed_err += ublox_speed - qcom_speed | ||||
|     pr_err /= len(car) | ||||
|     speed_err /= len(car) | ||||
|     print("avg psuedorange err %f avg speed err %f" % (pr_err, speed_err)) | ||||
|     for c in sorted(car, key=lambda x: abs(x[1] - x[3] - pr_err)): | ||||
|       svid, ublox_psuedorange, ublox_speed, qcom_psuedorange, qcom_speed, cno = c | ||||
|       print("svid: %3d  pseudorange: %10.2f m  speed: %8.2f m/s   meas: %12.2f  speed: %10.2f   meas_err: %10.3f speed_err: %8.3f cno: %d" % | ||||
|         (svid, ublox_psuedorange, ublox_speed, qcom_psuedorange, qcom_speed, | ||||
|         ublox_psuedorange - qcom_psuedorange - pr_err, ublox_speed - qcom_speed - speed_err, cno)) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| @ -1,117 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import unittest | ||||
| import time | ||||
| import numpy as np | ||||
| 
 | ||||
| from laika import AstroDog | ||||
| from laika.helpers import ConstellationId | ||||
| from laika.raw_gnss import correct_measurements, process_measurements, read_raw_ublox | ||||
| from laika.opt import calc_pos_fix | ||||
| from openpilot.selfdrive.test.openpilotci import get_url | ||||
| from openpilot.system.hardware.hw import Paths | ||||
| from openpilot.tools.lib.logreader import LogReader | ||||
| from openpilot.selfdrive.test.helpers import with_processes | ||||
| import cereal.messaging as messaging | ||||
| 
 | ||||
| def get_gnss_measurements(log_reader): | ||||
|   gnss_measurements = [] | ||||
|   for msg in log_reader: | ||||
|     if msg.which() == "ubloxGnss": | ||||
|       ublox_msg = msg.ubloxGnss | ||||
|       if ublox_msg.which == 'measurementReport': | ||||
|         report = ublox_msg.measurementReport | ||||
|         if len(report.measurements) > 0: | ||||
|           gnss_measurements.append(read_raw_ublox(report)) | ||||
|   return gnss_measurements | ||||
| 
 | ||||
| def get_ublox_raw(log_reader): | ||||
|   ublox_raw = [] | ||||
|   for msg in log_reader: | ||||
|     if msg.which() == "ubloxRaw": | ||||
|       ublox_raw.append(msg) | ||||
|   return ublox_raw | ||||
| 
 | ||||
| class TestUbloxProcessing(unittest.TestCase): | ||||
|   NUM_TEST_PROCESS_MEAS = 10 | ||||
| 
 | ||||
|   @classmethod | ||||
|   def setUpClass(cls): | ||||
|     lr = LogReader(get_url("4cf7a6ad03080c90|2021-09-29--13-46-36", 0)) | ||||
|     cls.gnss_measurements = get_gnss_measurements(lr) | ||||
| 
 | ||||
|     # test gps ephemeris continuity check (drive has ephemeris issues with cutover data) | ||||
|     lr = LogReader(get_url("37b6542f3211019a|2023-01-15--23-45-10", 14)) | ||||
|     cls.ublox_raw = get_ublox_raw(lr) | ||||
| 
 | ||||
|   def test_read_ublox_raw(self): | ||||
|     count_gps = 0 | ||||
|     count_glonass = 0 | ||||
|     for measurements in self.gnss_measurements: | ||||
|       for m in measurements: | ||||
|         if m.constellation_id == ConstellationId.GPS: | ||||
|           count_gps += 1 | ||||
|         elif m.constellation_id == ConstellationId.GLONASS: | ||||
|           count_glonass += 1 | ||||
| 
 | ||||
|     self.assertEqual(count_gps, 5036) | ||||
|     self.assertEqual(count_glonass, 3651) | ||||
| 
 | ||||
|   def test_get_fix(self): | ||||
|     dog = AstroDog(cache_dir=Paths.download_cache_root()) | ||||
|     position_fix_found = 0 | ||||
|     count_processed_measurements = 0 | ||||
|     count_corrected_measurements = 0 | ||||
|     position_fix_found_after_correcting = 0 | ||||
| 
 | ||||
|     pos_ests = [] | ||||
|     for measurements in self.gnss_measurements[:self.NUM_TEST_PROCESS_MEAS]: | ||||
|       processed_meas = process_measurements(measurements, dog) | ||||
|       count_processed_measurements += len(processed_meas) | ||||
|       pos_fix = calc_pos_fix(processed_meas) | ||||
|       if len(pos_fix) > 0 and all(p != 0 for p in pos_fix[0]): | ||||
|         position_fix_found += 1 | ||||
| 
 | ||||
|         corrected_meas = correct_measurements(processed_meas, pos_fix[0][:3], dog) | ||||
|         count_corrected_measurements += len(corrected_meas) | ||||
| 
 | ||||
|         pos_fix = calc_pos_fix(corrected_meas) | ||||
|         if len(pos_fix) > 0 and all(p != 0 for p in pos_fix[0]): | ||||
|           pos_ests.append(pos_fix[0]) | ||||
|           position_fix_found_after_correcting += 1 | ||||
| 
 | ||||
|     mean_fix = np.mean(np.array(pos_ests)[:, :3], axis=0) | ||||
|     np.testing.assert_allclose(mean_fix, [-2452306.662377, -4778343.136806, 3428550.090557], rtol=0, atol=1) | ||||
| 
 | ||||
|     # Note that can happen that there are less corrected measurements compared to processed when they are invalid. | ||||
|     # However, not for the current segment | ||||
|     self.assertEqual(position_fix_found, self.NUM_TEST_PROCESS_MEAS) | ||||
|     self.assertEqual(position_fix_found_after_correcting, self.NUM_TEST_PROCESS_MEAS) | ||||
|     self.assertEqual(count_processed_measurements, 69) | ||||
|     self.assertEqual(count_corrected_measurements, 69) | ||||
| 
 | ||||
|   @with_processes(['ubloxd']) | ||||
|   def test_ublox_gps_cutover(self): | ||||
|     time.sleep(2) | ||||
|     ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) | ||||
|     ur_pm = messaging.PubMaster(['ubloxRaw']) | ||||
| 
 | ||||
|     def replay_segment(): | ||||
|       rcv_msgs = [] | ||||
|       for msg in self.ublox_raw: | ||||
|         ur_pm.send(msg.which(), msg.as_builder()) | ||||
|         time.sleep(0.001) | ||||
|         rcv_msgs += messaging.drain_sock(ugs) | ||||
| 
 | ||||
|       time.sleep(0.1) | ||||
|       rcv_msgs += messaging.drain_sock(ugs) | ||||
|       return rcv_msgs | ||||
| 
 | ||||
|     # replay twice to enforce cutover data on rewind | ||||
|     rcv_msgs = replay_segment() | ||||
|     rcv_msgs += replay_segment() | ||||
| 
 | ||||
|     ephems_cnt = sum(m.ubloxGnss.which() == 'ephemeris' for m in rcv_msgs) | ||||
|     self.assertEqual(ephems_cnt, 15) | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   unittest.main() | ||||
| @ -1,4 +0,0 @@ | ||||
| LimeGPS/ | ||||
| LimeSuite/ | ||||
| hackrf/ | ||||
| gps-sdr-sim/ | ||||
| @ -1,33 +0,0 @@ | ||||
| # GPS test setup | ||||
| Testing the GPS receiver using GPS spoofing. At the moment only | ||||
| static location relpay is supported. | ||||
| 
 | ||||
| # Usage | ||||
| on C3 run `rpc_server.py`, on host PC run `fuzzy_testing.py` | ||||
| 
 | ||||
| `simulate_gps_signal.py` downloads the latest ephemeris file from | ||||
| https://cddis.nasa.gov/archive/gnss/data/daily/20xx/brdc/. | ||||
| 
 | ||||
| 
 | ||||
| # Hardware Setup | ||||
| * [LimeSDR USB](https://wiki.myriadrf.org/LimeSDR-USB) | ||||
| * Asus AX58BT antenna | ||||
| 
 | ||||
| # Software Setup | ||||
| * https://github.com/myriadrf/LimeSuite | ||||
| To communicate with LimeSDR the LimeSuite is needed it abstracts the direct | ||||
| communication. It also contains examples for a quick start. | ||||
| 
 | ||||
| The latest stable version (22.09) does not have the corresponding firmware | ||||
| download available at https://downloads.myriadrf.org/project/limesuite. Therefore | ||||
| version 20.10 was chosen. | ||||
| 
 | ||||
| * https://github.com/osqzss/LimeGPS | ||||
| Built on top of LimeSuite (libLimeSuite.so.20.10-1), generates the GPS signal. | ||||
| 
 | ||||
| ``` | ||||
| ./LimeGPS -e <ephemeris file> -l <location coordinates> | ||||
| 
 | ||||
| # Example | ||||
| ./LimeGPS -e /pathTo/brdc2660.22n -l 47.202028,15.740394,100 | ||||
| ``` | ||||
| @ -1,111 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import argparse | ||||
| import multiprocessing | ||||
| import rpyc | ||||
| from collections import defaultdict | ||||
| 
 | ||||
| from helper import download_rinex, exec_LimeGPS_bin | ||||
| from helper import get_random_coords, get_continuous_coords | ||||
| 
 | ||||
| #------------------------------------------------------------------------------ | ||||
| # this script is supposed to run on HOST PC | ||||
| # limeSDR is unreliable via c3 USB | ||||
| #------------------------------------------------------------------------------ | ||||
| 
 | ||||
| 
 | ||||
| def run_lime_gps(rinex_file: str, location: str, timeout: int): | ||||
|   # needs to run longer than the checker | ||||
|   timeout += 10 | ||||
|   print(f"LimeGPS {location} {timeout}") | ||||
|   p = multiprocessing.Process(target=exec_LimeGPS_bin, | ||||
|                               args=(rinex_file, location, timeout)) | ||||
|   p.start() | ||||
|   return p | ||||
| 
 | ||||
| con = None | ||||
| def run_remote_checker(lat, lon, alt, duration, ip_addr): | ||||
|   global con | ||||
|   try: | ||||
|     con = rpyc.connect(ip_addr, 18861) | ||||
|     con._config['sync_request_timeout'] = duration+20 | ||||
|   except ConnectionRefusedError: | ||||
|     print("could not run remote checker is 'rpc_server.py' running???") | ||||
|     return False, None, None | ||||
| 
 | ||||
|   matched, log, info = con.root.exposed_run_checker(lat, lon, alt, | ||||
|                         timeout=duration) | ||||
|   con.close() # TODO: might wanna fetch more logs here | ||||
|   con = None | ||||
| 
 | ||||
|   print(f"Remote Checker: {log} {info}") | ||||
|   return matched, log, info | ||||
| 
 | ||||
| 
 | ||||
| stats = defaultdict(int) # type: ignore | ||||
| keys = ['success', 'failed', 'ublox_fail', 'proc_crash', 'checker_crash'] | ||||
| 
 | ||||
| def print_report(): | ||||
|   print("\nFuzzy testing report summary:") | ||||
|   for k in keys: | ||||
|     print(f"  {k}: {stats[k]}") | ||||
| 
 | ||||
| 
 | ||||
| def update_stats(matched, log, info): | ||||
|   if matched: | ||||
|     stats['success'] += 1 | ||||
|     return | ||||
| 
 | ||||
|   stats['failed'] += 1 | ||||
|   if log == "PROC CRASH": | ||||
|     stats['proc_crash'] += 1 | ||||
|   if log == "CHECKER CRASHED": | ||||
|     stats['checker_crash'] += 1 | ||||
|   if log == "TIMEOUT": | ||||
|     stats['ublox_fail'] += 1 | ||||
| 
 | ||||
| 
 | ||||
| def main(ip_addr, continuous_mode, timeout, pos): | ||||
|   rinex_file = download_rinex() | ||||
| 
 | ||||
|   lat, lon, alt = pos | ||||
|   if lat == 0 and lon == 0 and alt == 0: | ||||
|     lat, lon, alt = get_random_coords(47.2020, 15.7403) | ||||
| 
 | ||||
|   try: | ||||
|     while True: | ||||
|       # spoof random location | ||||
|       spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},{alt}", timeout) | ||||
| 
 | ||||
|       # remote checker execs blocking | ||||
|       matched, log, info = run_remote_checker(lat, lon, alt, timeout, ip_addr) | ||||
|       update_stats(matched, log, info) | ||||
|       spoof_proc.terminate() | ||||
|       spoof_proc = None | ||||
| 
 | ||||
|       if continuous_mode: | ||||
|         lat, lon, alt = get_continuous_coords(lat, lon, alt) | ||||
|       else: | ||||
|         lat, lon, alt = get_random_coords(lat, lon) | ||||
|   except KeyboardInterrupt: | ||||
|     if spoof_proc is not None: | ||||
|       spoof_proc.terminate() | ||||
| 
 | ||||
|     if con is not None and not con.closed: | ||||
|       con.root.exposed_kill_procs() | ||||
|       con.close() | ||||
| 
 | ||||
|     print_report() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   parser = argparse.ArgumentParser(description="Fuzzy test GPS stack with random locations.") | ||||
|   parser.add_argument("ip_addr", type=str) | ||||
|   parser.add_argument("-c", "--contin", type=bool, nargs='?', default=False, help='Continous location change') | ||||
|   parser.add_argument("-t", "--timeout", type=int, nargs='?', default=180, help='Timeout to get location') | ||||
| 
 | ||||
|   # for replaying a location | ||||
|   parser.add_argument("lat", type=float, nargs='?', default=0) | ||||
|   parser.add_argument("lon", type=float, nargs='?', default=0) | ||||
|   parser.add_argument("alt", type=float, nargs='?', default=0) | ||||
|   args = parser.parse_args() | ||||
|   main(args.ip_addr, args.contin, args.timeout, (args.lat, args.lon, args.alt)) | ||||
| @ -1,53 +0,0 @@ | ||||
| import random | ||||
| import datetime as dt | ||||
| import subprocess as sp | ||||
| from typing import Tuple | ||||
| 
 | ||||
| from laika.downloader import download_nav | ||||
| from laika.gps_time import GPSTime | ||||
| from laika.helpers import ConstellationId | ||||
| 
 | ||||
| 
 | ||||
| def download_rinex(): | ||||
|   # TODO: check if there is a better way to get the full brdc file for LimeGPS | ||||
|   gps_time = GPSTime.from_datetime(dt.datetime.utcnow()) | ||||
|   utc_time = dt.datetime.utcnow() - dt.timedelta(1) | ||||
|   gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day)) | ||||
|   return download_nav(gps_time, '/tmp/gpstest/', ConstellationId.GPS) | ||||
| 
 | ||||
| 
 | ||||
| def exec_LimeGPS_bin(rinex_file: str, location: str, duration: int): | ||||
|   # this functions should never return, cause return means, timeout is | ||||
|   # reached or it crashed | ||||
|   try: | ||||
|     cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", location] | ||||
|     sp.check_output(cmd, timeout=duration) | ||||
|   except sp.TimeoutExpired: | ||||
|     print("LimeGPS timeout reached!") | ||||
|   except Exception as e: | ||||
|     print(f"LimeGPS crashed: {str(e)}") | ||||
| 
 | ||||
| 
 | ||||
| def get_random_coords(lat, lon) -> Tuple[float, float, int]: | ||||
|   # jump around the world | ||||
|   # max values, lat: -90 to 90, lon: -180 to 180 | ||||
| 
 | ||||
|   lat_add = random.random()*20 + 10 | ||||
|   lon_add = random.random()*20 + 20 | ||||
|   alt = random.randint(-10**3, 4*10**3) | ||||
| 
 | ||||
|   lat = ((lat + lat_add + 90) % 180) - 90 | ||||
|   lon = ((lon + lon_add + 180) % 360) - 180 | ||||
|   return round(lat, 5), round(lon, 5), alt | ||||
| 
 | ||||
| 
 | ||||
| def get_continuous_coords(lat, lon, alt) -> Tuple[float, float, int]: | ||||
|   # continuously move around the world | ||||
|   lat_add = random.random()*0.01 | ||||
|   lon_add = random.random()*0.01 | ||||
|   alt_add = random.randint(-100, 100) | ||||
| 
 | ||||
|   lat = ((lat + lat_add + 90) % 180) - 90 | ||||
|   lon = ((lon + lon_add + 180) % 360) - 180 | ||||
|   alt += alt_add | ||||
|   return round(lat, 5), round(lon, 5), alt | ||||
| @ -1,44 +0,0 @@ | ||||
| diff --git a/host/hackrf-tools/src/CMakeLists.txt b/host/hackrf-tools/src/CMakeLists.txt
 | ||||
| index 7115151c..a51388ba 100644
 | ||||
| --- a/host/hackrf-tools/src/CMakeLists.txt
 | ||||
| +++ b/host/hackrf-tools/src/CMakeLists.txt
 | ||||
| @@ -23,20 +23,20 @@
 | ||||
|  
 | ||||
|  set(INSTALL_DEFAULT_BINDIR "bin" CACHE STRING "Appended to CMAKE_INSTALL_PREFIX")
 | ||||
|  
 | ||||
| -find_package(FFTW REQUIRED)
 | ||||
| -include_directories(${FFTW_INCLUDES})
 | ||||
| -get_filename_component(FFTW_LIBRARY_DIRS ${FFTW_LIBRARIES} DIRECTORY)
 | ||||
| -link_directories(${FFTW_LIBRARY_DIRS})
 | ||||
| +#find_package(FFTW REQUIRED)
 | ||||
| +#include_directories(${FFTW_INCLUDES})
 | ||||
| +#get_filename_component(FFTW_LIBRARY_DIRS ${FFTW_LIBRARIES} DIRECTORY)
 | ||||
| +#link_directories(${FFTW_LIBRARY_DIRS})
 | ||||
|  
 | ||||
|  SET(TOOLS
 | ||||
|  	hackrf_transfer
 | ||||
| -	hackrf_spiflash
 | ||||
| -	hackrf_cpldjtag
 | ||||
| +	#hackrf_spiflash
 | ||||
| +	#hackrf_cpldjtag
 | ||||
|  	hackrf_info
 | ||||
| -	hackrf_debug
 | ||||
| -	hackrf_clock
 | ||||
| -	hackrf_sweep
 | ||||
| -	hackrf_operacake
 | ||||
| +	#hackrf_debug
 | ||||
| +	#hackrf_clock
 | ||||
| +	#hackrf_sweep
 | ||||
| +	#hackrf_operacake
 | ||||
|  )
 | ||||
|  
 | ||||
|  if(MSVC)
 | ||||
| @@ -45,7 +45,7 @@ if(MSVC)
 | ||||
|  	)
 | ||||
|  	LIST(APPEND TOOLS_LINK_LIBS ${FFTW_LIBRARIES})
 | ||||
|  else()
 | ||||
| -    LIST(APPEND TOOLS_LINK_LIBS m fftw3f)
 | ||||
| +    LIST(APPEND TOOLS_LINK_LIBS m)# fftw3f)
 | ||||
|  endif()
 | ||||
|  
 | ||||
|  if(NOT libhackrf_SOURCE_DIR)
 | ||||
| @ -1,13 +0,0 @@ | ||||
| diff --git a/gpssim.h b/gpssim.h
 | ||||
| index c30b227..2ae0802 100644
 | ||||
| --- a/gpssim.h
 | ||||
| +++ b/gpssim.h
 | ||||
| @@ -75,7 +75,7 @@
 | ||||
|  #define SC08 (8)
 | ||||
|  #define SC16 (16)
 | ||||
|  
 | ||||
| -#define EPHEM_ARRAY_SIZE (13) // for daily GPS broadcast ephemers file (brdc)
 | ||||
| +#define EPHEM_ARRAY_SIZE (20) // for daily GPS broadcast ephemers file (brdc)
 | ||||
|  
 | ||||
|  /*! \brief Structure representing GPS time */
 | ||||
|  typedef struct
 | ||||
| @ -1,11 +0,0 @@ | ||||
| diff --git a/makefile b/makefile
 | ||||
| index 51bfabf..d0ea1eb 100644
 | ||||
| --- a/makefile
 | ||||
| +++ b/makefile
 | ||||
| @@ -1,5 +1,4 @@
 | ||||
|  CC=gcc -O2 -Wall
 | ||||
|  
 | ||||
|  all: limegps.c gpssim.c
 | ||||
| -	$(CC) -o LimeGPS limegps.c gpssim.c -lm -lpthread -lLimeSuite
 | ||||
| -	
 | ||||
| +	$(CC) -o LimeGPS limegps.c gpssim.c -lm -lpthread -lLimeSuite -I../LimeSuite/src -L../LimeSuite/builddir/src -Wl,-rpath="$(PWD)/../LimeSuite/builddir/src"
 | ||||
| @ -1,13 +0,0 @@ | ||||
| diff --git a/src/lms7002m/LMS7002M_RxTxCalibrations.cpp b/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
 | ||||
| index 41a37044..ac29c6b6 100644
 | ||||
| --- a/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
 | ||||
| +++ b/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
 | ||||
| @@ -254,7 +254,7 @@ int LMS7002M::CalibrateTx(float_type bandwidth_Hz, bool useExtLoopback)
 | ||||
|          mcuControl->RunProcedure(useExtLoopback ? MCU_FUNCTION_CALIBRATE_TX_EXTLOOPB : MCU_FUNCTION_CALIBRATE_TX);
 | ||||
|          status = mcuControl->WaitForMCU(1000);
 | ||||
|          if(status != MCU_BD::MCU_NO_ERROR)
 | ||||
| -            return ReportError(EINVAL, "Tx Calibration: MCU error %i (%s)", status, MCU_BD::MCUStatusMessage(status));
 | ||||
| +            return -1; //ReportError(EINVAL, "Tx Calibration: MCU error %i (%s)", status, MCU_BD::MCUStatusMessage(status));
 | ||||
|      }
 | ||||
|  
 | ||||
|      //sync registers to cache
 | ||||
| @ -1,13 +0,0 @@ | ||||
| diff --git a/src/FPGA_common/FPGA_common.cpp b/src/FPGA_common/FPGA_common.cpp
 | ||||
| index 4e81f33e..7381c475 100644
 | ||||
| --- a/src/FPGA_common/FPGA_common.cpp
 | ||||
| +++ b/src/FPGA_common/FPGA_common.cpp
 | ||||
| @@ -946,7 +946,7 @@ double FPGA::DetectRefClk(double fx3Clk)
 | ||||
|  
 | ||||
|      if (i == 0)
 | ||||
|          return -1;
 | ||||
| -    lime::info("Reference clock %1.2f MHz", clkTbl[i - 1] / 1e6);
 | ||||
| +    //lime::info("Reference clock %1.2f MHz", clkTbl[i - 1] / 1e6);
 | ||||
|      return clkTbl[i - 1];
 | ||||
|  }
 | ||||
|  
 | ||||
| @ -1,16 +0,0 @@ | ||||
| #!/bin/bash | ||||
| 
 | ||||
| # NOTE: can only run inside limeGPS test box! | ||||
| 
 | ||||
| # run limeGPS with random static location | ||||
| timeout 300 ./simulate_gps_signal.py 32.7518 -117.1962 & | ||||
| gps_PID=$(ps -aux | grep -m 1 "timeout 300" | awk '{print $2}') | ||||
| 
 | ||||
| echo "starting limeGPS..." | ||||
| sleep 10 | ||||
| 
 | ||||
| # run unit tests (skipped when module not present) | ||||
| python -m unittest test_gps.py | ||||
| python -m unittest test_gps_qcom.py | ||||
| 
 | ||||
| kill $gps_PID | ||||
| @ -1,25 +0,0 @@ | ||||
| #!/bin/bash | ||||
| set -e | ||||
| 
 | ||||
| DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" | ||||
| cd $DIR | ||||
| 
 | ||||
| if [ ! -d LimeSuite ]; then | ||||
|   git clone https://github.com/myriadrf/LimeSuite.git | ||||
|   cd LimeSuite | ||||
|   # checkout latest version which has firmware updates available | ||||
|   git checkout v20.10.0 | ||||
|   git apply ../patches/limeSuite/* | ||||
|   mkdir builddir && cd builddir | ||||
|   cmake -DCMAKE_BUILD_TYPE=Release .. | ||||
|   make -j4 | ||||
|   cd ../.. | ||||
| fi | ||||
| 
 | ||||
| if [ ! -d LimeGPS ]; then | ||||
|   git clone https://github.com/osqzss/LimeGPS.git | ||||
|   cd LimeGPS | ||||
|   git apply ../patches/limeGPS/* | ||||
|   make | ||||
|   cd .. | ||||
| fi | ||||
| @ -1,21 +0,0 @@ | ||||
| #!/bin/bash | ||||
| set -e | ||||
| 
 | ||||
| DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )" | ||||
| cd $DIR | ||||
| 
 | ||||
| if [ ! -d gps-sdr-sim ]; then | ||||
|   git clone https://github.com/osqzss/gps-sdr-sim.git | ||||
|   cd gps-sdr-sim | ||||
|   make | ||||
|   cd .. | ||||
| fi | ||||
| 
 | ||||
| if [ ! -d hackrf ]; then | ||||
|   git clone https://github.com/greatscottgadgets/hackrf.git | ||||
|   cd hackrf/host | ||||
|   git apply ../../patches/hackrf.patch | ||||
|   cmake . | ||||
|   make | ||||
| fi | ||||
| 
 | ||||
| @ -1,151 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import os | ||||
| import random | ||||
| import argparse | ||||
| import datetime as dt | ||||
| import subprocess as sp | ||||
| from typing import Tuple | ||||
| 
 | ||||
| from laika.downloader import download_nav | ||||
| from laika.gps_time import GPSTime | ||||
| from laika.helpers import ConstellationId | ||||
| 
 | ||||
| cache_dir = '/tmp/gpstest/' | ||||
| 
 | ||||
| 
 | ||||
| def download_rinex(): | ||||
|   # TODO: check if there is a better way to get the full brdc file for LimeGPS | ||||
|   gps_time = GPSTime.from_datetime(dt.datetime.utcnow()) | ||||
|   utc_time = dt.datetime.utcnow()# - dt.timedelta(1) | ||||
|   gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day)) | ||||
|   return download_nav(gps_time, cache_dir, ConstellationId.GPS) | ||||
| 
 | ||||
| def get_coords(lat, lon, s1, s2, o1=0, o2=0) -> Tuple[int, int]: | ||||
|   lat_add = random.random()*s1 + o1 | ||||
|   lon_add = random.random()*s2 + o2 | ||||
| 
 | ||||
|   lat = ((lat + lat_add + 90) % 180) - 90 | ||||
|   lon = ((lon + lon_add + 180) % 360) - 180 | ||||
|   return round(lat, 5), round(lon, 5) | ||||
| 
 | ||||
| def get_continuous_coords(lat, lon) -> Tuple[int, int]: | ||||
|   # continuously move around the world | ||||
|   return get_coords(lat, lon, 0.01, 0.01) | ||||
| 
 | ||||
| def get_random_coords(lat, lon) -> Tuple[int, int]: | ||||
|   # jump around the world | ||||
|   return get_coords(lat, lon, 20, 20, 10, 20) | ||||
| 
 | ||||
| def run_limeSDR_loop(lat, lon, alt, contin_sim, rinex_file, timeout): | ||||
|   while True: | ||||
|     try: | ||||
|       # TODO: add starttime setting and altitude | ||||
|       # -t 2023/01/15,00:00:00 -T 2023/01/15,00:00:00 | ||||
|       # this needs to match the date of the navigation file | ||||
|       print(f"starting LimeGPS, Location: {lat} {lon} {alt}") | ||||
|       cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},{alt}"] | ||||
|       print(f"CMD: {cmd}") | ||||
|       sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout) | ||||
|     except KeyboardInterrupt: | ||||
|       print("stopping LimeGPS") | ||||
|       return | ||||
|     except sp.TimeoutExpired: | ||||
|       print("LimeGPS timeout reached!") | ||||
|     except Exception as e: | ||||
|       out_stderr = e.stderr.decode('utf-8')# pylint:disable=no-member | ||||
|       if "Device is busy." in out_stderr: | ||||
|         print("GPS simulation is already running, Device is busy!") | ||||
|         return | ||||
| 
 | ||||
|       print(f"LimeGPS crashed: {str(e)}") | ||||
|       print(f"stderr:\n{e.stderr.decode('utf-8')}")# pylint:disable=no-member | ||||
|       return | ||||
| 
 | ||||
|     if contin_sim: | ||||
|       lat, lon = get_continuous_coords(lat, lon) | ||||
|     else: | ||||
|       lat, lon = get_random_coords(lat, lon) | ||||
| 
 | ||||
| def run_hackRF_loop(lat, lon, rinex_file, timeout): | ||||
| 
 | ||||
|   if timeout is not None: | ||||
|     print("no jump mode for hackrf!") | ||||
|     return | ||||
| 
 | ||||
|   try: | ||||
|     print(f"starting gps-sdr-sim, Location: {lat},{lon}") | ||||
|     # create 30second file and replay with hackrf endless | ||||
|     cmd = ["gps-sdr-sim/gps-sdr-sim", "-e", rinex_file, "-l", f"{lat},{lon},-200", "-d", "30"] | ||||
|     sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout) | ||||
|     # created in current working directory | ||||
|   except Exception: | ||||
|     print("Failed to generate gpssim.bin") | ||||
| 
 | ||||
|   try: | ||||
|     print("starting hackrf_transfer") | ||||
|     # create 30second file and replay with hackrf endless | ||||
|     cmd = ["hackrf/host/hackrf-tools/src/hackrf_transfer", "-t", "gpssim.bin", | ||||
|            "-f", "1575420000", "-s", "2600000", "-a", "1", "-R"] | ||||
|     sp.check_output(cmd, stderr=sp.PIPE, timeout=timeout) | ||||
|   except KeyboardInterrupt: | ||||
|     print("stopping hackrf_transfer") | ||||
|     return | ||||
|   except Exception as e: | ||||
|     print(f"hackrf_transfer crashed:{str(e)}") | ||||
| 
 | ||||
| 
 | ||||
| def main(lat, lon, alt, jump_sim, contin_sim, hackrf_mode): | ||||
| 
 | ||||
|   if hackrf_mode: | ||||
|     if not os.path.exists('hackrf'): | ||||
|       print("hackrf not found run 'setup_hackrf.sh' first") | ||||
|       return | ||||
| 
 | ||||
|     if not os.path.exists('gps-sdr-sim'): | ||||
|       print("gps-sdr-sim not found run 'setup_hackrf.sh' first") | ||||
|       return | ||||
| 
 | ||||
|     output = sp.check_output(["hackrf/host/hackrf-tools/src/hackrf_info"]) | ||||
|     if output.strip() == b"" or b"No HackRF boards found." in output: | ||||
|       print("No HackRF boards found!") | ||||
|       return | ||||
| 
 | ||||
|   else: | ||||
|     if not os.path.exists('LimeGPS'): | ||||
|       print("LimeGPS not found run 'setup.sh' first") | ||||
|       return | ||||
| 
 | ||||
|     if not os.path.exists('LimeSuite'): | ||||
|       print("LimeSuite not found run 'setup.sh' first") | ||||
|       return | ||||
| 
 | ||||
|     output = sp.check_output(["LimeSuite/builddir/LimeUtil/LimeUtil", "--find"]) | ||||
|     if output.strip() == b"": | ||||
|       print("No LimeSDR device found!") | ||||
|       return | ||||
|     print(f"Device: {output.strip().decode('utf-8')}") | ||||
| 
 | ||||
|   if lat == 0 and lon == 0: | ||||
|     lat, lon = get_random_coords(47.2020, 15.7403) | ||||
| 
 | ||||
|   rinex_file = download_rinex() | ||||
| 
 | ||||
|   timeout = None | ||||
|   if jump_sim: | ||||
|     timeout = 30 | ||||
| 
 | ||||
|   if hackrf_mode: | ||||
|     run_hackRF_loop(lat, lon, rinex_file, timeout) | ||||
|   else: | ||||
|     run_limeSDR_loop(lat, lon, alt, contin_sim, rinex_file, timeout) | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   parser = argparse.ArgumentParser(description="Simulate static [or random jumping] GPS signal.") | ||||
|   parser.add_argument("lat", type=float, nargs='?', default=0) | ||||
|   parser.add_argument("lon", type=float, nargs='?', default=0) | ||||
|   parser.add_argument("alt", type=float, nargs='?', default=0) | ||||
|   parser.add_argument("--jump", action="store_true", help="signal that jumps around the world") | ||||
|   parser.add_argument("--contin", action="store_true", help="continuously/slowly moving around the world") | ||||
|   parser.add_argument("--hackrf", action="store_true", help="hackrf mode (DEFAULT: LimeSDR)") | ||||
|   args = parser.parse_args() | ||||
|   main(args.lat, args.lon, args.alt, args.jump, args.contin, args.hackrf) | ||||
| @ -1,189 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import pytest | ||||
| import time | ||||
| import unittest | ||||
| import struct | ||||
| 
 | ||||
| from openpilot.common.params import Params | ||||
| import cereal.messaging as messaging | ||||
| import openpilot.system.sensord.pigeond as pd | ||||
| from openpilot.selfdrive.test.helpers import with_processes | ||||
| 
 | ||||
| 
 | ||||
| def read_events(service, duration_sec): | ||||
|   service_sock = messaging.sub_sock(service, timeout=0.1) | ||||
|   start_time_sec = time.monotonic() | ||||
|   events = [] | ||||
|   while time.monotonic() - start_time_sec < duration_sec: | ||||
|     events += messaging.drain_sock(service_sock) | ||||
|     time.sleep(0.1) | ||||
| 
 | ||||
|   assert len(events) != 0, f"No '{service}'events collected!" | ||||
|   return events | ||||
| 
 | ||||
| 
 | ||||
| def create_backup(pigeon): | ||||
|   # controlled GNSS stop | ||||
|   pigeon.send(b"\xB5\x62\x06\x04\x04\x00\x00\x00\x08\x00\x16\x74") | ||||
| 
 | ||||
|   # store almanac in flash | ||||
|   pigeon.send(b"\xB5\x62\x09\x14\x04\x00\x00\x00\x00\x00\x21\xEC") | ||||
|   try: | ||||
|     if not pigeon.wait_for_ack(ack=pd.UBLOX_SOS_ACK, nack=pd.UBLOX_SOS_NACK): | ||||
|       raise RuntimeError("Could not store almanac") | ||||
|   except TimeoutError: | ||||
|     pass | ||||
| 
 | ||||
| 
 | ||||
| def verify_ubloxgnss_data(socket: messaging.SubSocket, max_time: int): | ||||
|   start_time = 0 | ||||
|   end_time = 0 | ||||
|   events = messaging.drain_sock(socket) | ||||
|   assert len(events) != 0, "no ublxGnss measurements" | ||||
| 
 | ||||
|   for event in events: | ||||
|     if event.ubloxGnss.which() != "measurementReport": | ||||
|       continue | ||||
| 
 | ||||
|     if start_time == 0: | ||||
|       start_time = event.logMonoTime | ||||
| 
 | ||||
|     if event.ubloxGnss.measurementReport.numMeas != 0: | ||||
|       end_time = event.logMonoTime | ||||
|       break | ||||
| 
 | ||||
|   assert end_time != 0, "no ublox measurements received!" | ||||
| 
 | ||||
|   ttfm = (end_time - start_time)/1e9 | ||||
|   assert ttfm < max_time, f"Time to first measurement > {max_time}s, {ttfm}" | ||||
| 
 | ||||
|   # check for satellite count in measurements | ||||
|   sat_count = [] | ||||
|   end_id = events.index(event)# pylint:disable=undefined-loop-variable | ||||
|   for event in events[end_id:]: | ||||
|     if event.ubloxGnss.which() == "measurementReport": | ||||
|       sat_count.append(event.ubloxGnss.measurementReport.numMeas) | ||||
| 
 | ||||
|   num_sat = int(sum(sat_count)/len(sat_count)) | ||||
|   assert num_sat >= 5, f"Not enough satellites {num_sat} (TestBox setup!)" | ||||
| 
 | ||||
| 
 | ||||
| def verify_gps_location(socket: messaging.SubSocket, max_time: int): | ||||
|   events = messaging.drain_sock(socket) | ||||
|   assert len(events) != 0, "no gpsLocationExternal measurements" | ||||
| 
 | ||||
|   start_time = events[0].logMonoTime | ||||
|   end_time = 0 | ||||
|   for event in events: | ||||
|     gps_valid = event.gpsLocationExternal.flags % 2 | ||||
| 
 | ||||
|     if gps_valid: | ||||
|       end_time = event.logMonoTime | ||||
|       break | ||||
| 
 | ||||
|   assert end_time != 0, "GPS location never converged!" | ||||
| 
 | ||||
|   ttfl = (end_time - start_time)/1e9 | ||||
|   assert ttfl < max_time, f"Time to first location > {max_time}s, {ttfl}" | ||||
| 
 | ||||
|   hacc = events[-1].gpsLocationExternal.accuracy | ||||
|   vacc = events[-1].gpsLocationExternal.verticalAccuracy | ||||
|   assert hacc < 20, f"Horizontal accuracy too high, {hacc}" | ||||
|   assert vacc < 45,  f"Vertical accuracy too high, {vacc}" | ||||
| 
 | ||||
| 
 | ||||
| def verify_time_to_first_fix(pigeon): | ||||
|   # get time to first fix from nav status message | ||||
|   nav_status = b"" | ||||
|   while True: | ||||
|     pigeon.send(b"\xb5\x62\x01\x03\x00\x00\x04\x0d") | ||||
|     nav_status = pigeon.receive() | ||||
|     if nav_status[:4] == b"\xb5\x62\x01\x03": | ||||
|       break | ||||
| 
 | ||||
|   values = struct.unpack("<HHHIBBBBIIH", nav_status[:24]) | ||||
|   ttff = values[8]/1000 | ||||
|   # srms = values[9]/1000 | ||||
|   assert ttff < 40, f"Time to first fix > 40s, {ttff}" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.tici | ||||
| class TestGPS(unittest.TestCase): | ||||
|   @classmethod | ||||
|   def setUpClass(cls): | ||||
|     ublox_available = Params().get_bool("UbloxAvailable") | ||||
|     if not ublox_available: | ||||
|       raise unittest.SkipTest | ||||
| 
 | ||||
| 
 | ||||
|   def tearDown(self): | ||||
|     pd.set_power(False) | ||||
| 
 | ||||
|   @with_processes(['ubloxd']) | ||||
|   def test_a_ublox_reset(self): | ||||
| 
 | ||||
|     pigeon, pm = pd.create_pigeon() | ||||
|     pd.init_baudrate(pigeon) | ||||
|     assert pigeon.reset_device(), "Could not reset device!" | ||||
| 
 | ||||
|     pd.initialize_pigeon(pigeon) | ||||
| 
 | ||||
|     ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) | ||||
|     gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) | ||||
| 
 | ||||
|     # receive some messages (restart after cold start takes up to 30seconds) | ||||
|     pd.run_receiving(pigeon, pm, 60) | ||||
| 
 | ||||
|     # store almanac for next test | ||||
|     create_backup(pigeon) | ||||
| 
 | ||||
|     verify_ubloxgnss_data(ugs, 60) | ||||
|     verify_gps_location(gle, 60) | ||||
| 
 | ||||
|     # skip for now, this might hang for a while | ||||
|     #verify_time_to_first_fix(pigeon) | ||||
| 
 | ||||
| 
 | ||||
|   @with_processes(['ubloxd']) | ||||
|   def test_b_ublox_almanac(self): | ||||
|     pigeon, pm = pd.create_pigeon() | ||||
|     pd.init_baudrate(pigeon) | ||||
| 
 | ||||
|     # device cold start | ||||
|     pigeon.send(b"\xb5\x62\x06\x04\x04\x00\xff\xff\x00\x00\x0c\x5d") | ||||
|     time.sleep(1) # wait for cold start | ||||
|     pd.init_baudrate(pigeon) | ||||
| 
 | ||||
|     # clear configuration | ||||
|     pigeon.send_with_ack(b"\xb5\x62\x06\x09\x0d\x00\x00\x00\x1f\x1f\x00\x00\x00\x00\x00\x00\x00\x00\x17\x71\x5b") | ||||
| 
 | ||||
|     # restoring almanac backup | ||||
|     pigeon.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60") | ||||
|     status = pigeon.wait_for_backup_restore_status() | ||||
|     assert status == 2, "Could not restore almanac backup" | ||||
| 
 | ||||
|     pd.initialize_pigeon(pigeon) | ||||
| 
 | ||||
|     ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) | ||||
|     gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) | ||||
| 
 | ||||
|     pd.run_receiving(pigeon, pm, 15) | ||||
|     verify_ubloxgnss_data(ugs, 15) | ||||
|     verify_gps_location(gle, 20) | ||||
| 
 | ||||
| 
 | ||||
|   @with_processes(['ubloxd']) | ||||
|   def test_c_ublox_startup(self): | ||||
|     pigeon, pm = pd.create_pigeon() | ||||
|     pd.init_baudrate(pigeon) | ||||
|     pd.initialize_pigeon(pigeon) | ||||
| 
 | ||||
|     ugs = messaging.sub_sock("ubloxGnss", timeout=0.1) | ||||
|     gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1) | ||||
|     pd.run_receiving(pigeon, pm, 10) | ||||
|     verify_ubloxgnss_data(ugs, 10) | ||||
|     verify_gps_location(gle, 10) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   unittest.main() | ||||
| @ -1,78 +0,0 @@ | ||||
| #!/usr/bin/env python3 | ||||
| import pytest | ||||
| import time | ||||
| import unittest | ||||
| import subprocess as sp | ||||
| 
 | ||||
| from openpilot.common.params import Params | ||||
| import cereal.messaging as messaging | ||||
| from openpilot.selfdrive.manager.process_config import managed_processes | ||||
| 
 | ||||
| 
 | ||||
| def exec_mmcli(cmd): | ||||
|   cmd = "mmcli -m 0 " + cmd | ||||
|   p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE) | ||||
|   return p.communicate() | ||||
| 
 | ||||
| 
 | ||||
| def wait_for_location(socket, timeout): | ||||
|   while True: | ||||
|     events = messaging.drain_sock(socket) | ||||
|     for event in events: | ||||
|       if event.gpsLocation.flags % 2: | ||||
|         return False | ||||
| 
 | ||||
|     timeout -= 1 | ||||
|     if timeout <= 0: | ||||
|       return True | ||||
| 
 | ||||
|     time.sleep(0.1) | ||||
|     continue | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.tici | ||||
| class TestGPS(unittest.TestCase): | ||||
|   @classmethod | ||||
|   def setUpClass(cls): | ||||
|     ublox_available = Params().get_bool("UbloxAvailable") | ||||
|     if ublox_available: | ||||
|       raise unittest.SkipTest | ||||
| 
 | ||||
|   def test_a_quectel_cold_start(self): | ||||
|     # delete assistance data to enforce cold start for GNSS | ||||
|     # testing shows that this takes up to 20min | ||||
| 
 | ||||
|     _, err = exec_mmcli("--command='AT+QGPSDEL=0'") | ||||
|     assert len(err) == 0, f"GPSDEL failed: {err}" | ||||
| 
 | ||||
|     managed_processes['rawgpsd'].start() | ||||
|     start_time = time.monotonic() | ||||
|     glo = messaging.sub_sock("gpsLocation", timeout=0.1) | ||||
| 
 | ||||
|     timeout = 10*60*3 # 3 minute | ||||
|     timedout = wait_for_location(glo, timeout) | ||||
|     managed_processes['rawgpsd'].stop() | ||||
| 
 | ||||
|     assert timedout is False, "Waiting for location timed out (3min)!" | ||||
| 
 | ||||
|     duration = time.monotonic() - start_time | ||||
|     assert duration < 60, f"Received GPS location {duration}!" | ||||
| 
 | ||||
| 
 | ||||
|   def test_b_quectel_startup(self): | ||||
|     managed_processes['rawgpsd'].start() | ||||
|     start_time = time.monotonic() | ||||
|     glo = messaging.sub_sock("gpsLocation", timeout=0.1) | ||||
| 
 | ||||
|     timeout = 10*60 # 1 minute | ||||
|     timedout = wait_for_location(glo, timeout) | ||||
|     managed_processes['rawgpsd'].stop() | ||||
| 
 | ||||
|     assert timedout is False, "Waiting for location timed out (3min)!" | ||||
| 
 | ||||
|     duration = time.monotonic() - start_time | ||||
|     assert duration < 60, f"Received GPS location {duration}!" | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|   unittest.main() | ||||
					Loading…
					
					
				
		Reference in new issue