GPS test station first unittests (#25950)

* init gps test

* gps test v1

* add static signal gen script

* update readme

* remove LD_PRELOAD by using rpath, update values after testing

* remove LD_PRELOAD

* update fuzzy testing

* address comments

* cleanup

Co-authored-by: Kurt Nistelberger <kurt.nistelberger@gmail.com>
old-commit-hash: 54d667aa15
taco
Kurt Nistelberger 3 years ago committed by GitHub
parent 52e592ddc1
commit 76f9a1487c
  1. 95
      selfdrive/sensord/pigeond.py
  2. 14
      tools/gpstest/README.md
  3. 142
      tools/gpstest/fuzzy_testing.py
  4. 8
      tools/gpstest/gpstest.sh
  5. 13
      tools/gpstest/patches/limeGPS/inc_ephem_array_size.patch
  6. 11
      tools/gpstest/patches/limeGPS/makefile.patch
  7. 13
      tools/gpstest/patches/limeSuite/mcu_error.patch
  8. 13
      tools/gpstest/patches/limeSuite/reference_print.patch
  9. 48
      tools/gpstest/remote_checker.py
  10. 83
      tools/gpstest/run_static_gps_signal.py
  11. 6
      tools/gpstest/setup.sh
  12. 140
      tools/gpstest/test_gps.py

@ -7,7 +7,7 @@ import struct
import requests
import urllib.parse
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Tuple
from cereal import messaging
from common.params import Params
@ -34,7 +34,6 @@ def set_power(enabled: bool) -> None:
gpio_set(GPIO.UBLOX_PWR_EN, enabled)
gpio_set(GPIO.UBLOX_RST_N, enabled)
def add_ubx_checksum(msg: bytes) -> bytes:
A = B = 0
for b in msg[2:]:
@ -115,35 +114,70 @@ class TTYPigeon():
raise TimeoutError('No response from ublox')
time.sleep(0.001)
def reset_device(self) -> bool:
# deleting the backup does not always work on first try (mostly on second try)
for _ in range(5):
# device cold start
self.send(b"\xb5\x62\x06\x04\x04\x00\xff\xff\x00\x00\x0c\x5d")
time.sleep(1) # wait for cold start
init_baudrate(self)
# clear configuration
self.send_with_ack(b"\xb5\x62\x06\x09\x0d\x00\x00\x00\x1f\x1f\x00\x00\x00\x00\x00\x00\x00\x00\x17\x71\x5b")
# clear flash memory (almanac backup)
self.send_with_ack(b"\xB5\x62\x09\x14\x04\x00\x01\x00\x00\x00\x22\xf0")
# try restoring backup to verify it got deleted
self.send(b"\xB5\x62\x09\x14\x00\x00\x1D\x60")
# 1: failed to restore, 2: could restore, 3: no backup
status = self.wait_for_backup_restore_status()
if status == 1 or status == 3:
return True
return False
def initialize_pigeon(pigeon: TTYPigeon) -> bool:
# try initializing a few times
for _ in range(10):
try:
def init_baudrate(pigeon: TTYPigeon):
# ublox default setting on startup is 9600 baudrate
pigeon.set_baud(9600)
# up baud rate
# $PUBX,41,1,0007,0003,460800,0*15\r\n
pigeon.send(b"\x24\x50\x55\x42\x58\x2C\x34\x31\x2C\x31\x2C\x30\x30\x30\x37\x2C\x30\x30\x30\x33\x2C\x34\x36\x30\x38\x30\x30\x2C\x30\x2A\x31\x35\x0D\x0A")
time.sleep(0.1)
pigeon.set_baud(460800)
# other configuration messages
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x00\x00\x06\x18")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x01\x00\x01\x08\x22")
pigeon.send_with_ack(b"\xB5\x62\x06\x00\x01\x00\x03\x0A\x24")
def initialize_pigeon(pigeon: TTYPigeon) -> bool:
# try initializing a few times
for _ in range(10):
try:
# setup port config
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x00\x00\x06\x18")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x01\x08\x22")
pigeon.send_with_ack(b"\xb5\x62\x06\x00\x01\x00\x03\x0A\x24")
# UBX-CFG-RATE (0x06 0x08)
pigeon.send_with_ack(b"\xB5\x62\x06\x08\x06\x00\x64\x00\x01\x00\x00\x00\x79\x10")
# UBX-CFG-NAV5 (0x06 0x24)
pigeon.send_with_ack(b"\xB5\x62\x06\x24\x24\x00\x05\x00\x04\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5A\x63")
# UBX-CFG-ODO (0x06 0x1E)
pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x14\x00\x00\x00\x00\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3C\x37")
pigeon.send_with_ack(b"\xB5\x62\x06\x39\x08\x00\xFF\xAD\x62\xAD\x1E\x63\x00\x00\x83\x0C")
pigeon.send_with_ack(b"\xB5\x62\x06\x23\x28\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x56\x24")
# UBX-CFG-NAV5 (0x06 0x24)
pigeon.send_with_ack(b"\xB5\x62\x06\x24\x00\x00\x2A\x84")
pigeon.send_with_ack(b"\xB5\x62\x06\x23\x00\x00\x29\x81")
pigeon.send_with_ack(b"\xB5\x62\x06\x1E\x00\x00\x24\x72")
pigeon.send_with_ack(b"\xB5\x62\x06\x39\x00\x00\x3F\xC3")
# UBX-CFG-MSG (set message rate)
pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x01\x07\x01\x13\x51")
pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x15\x01\x22\x70")
pigeon.send_with_ack(b"\xB5\x62\x06\x01\x03\x00\x02\x13\x01\x20\x6C")
@ -224,13 +258,11 @@ def deinitialize_and_exit(pigeon: Optional[TTYPigeon]):
set_power(False)
sys.exit(0)
def main():
assert TICI, "unsupported hardware for pigeond"
def create_pigeon() -> Tuple[TTYPigeon, messaging.PubMaster]:
pigeon = None
# register exit handler
pigeon = None
signal.signal(signal.SIGINT, lambda sig, frame: deinitialize_and_exit(pigeon))
pm = messaging.PubMaster(['ubloxRaw'])
# power cycle ublox
@ -240,15 +272,20 @@ def main():
time.sleep(0.5)
pigeon = TTYPigeon()
r = initialize_pigeon(pigeon)
Params().put_bool("UbloxAvailable", r)
return pigeon, pm
# start receiving data
while True:
def run_receiving(pigeon: TTYPigeon, pm: messaging.PubMaster, duration: int = 0):
start_time = time.monotonic()
def end_condition():
return True if duration == 0 else time.monotonic() - start_time < duration
while end_condition():
dat = pigeon.receive()
if len(dat) > 0:
if dat[0] == 0x00:
cloudlog.warning("received invalid data from ublox, re-initing!")
init_baudrate(pigeon)
initialize_pigeon(pigeon)
continue
@ -260,5 +297,17 @@ def main():
# prevent locking up a CPU core if ublox disconnects
time.sleep(0.001)
def main():
assert TICI, "unsupported hardware for pigeond"
pigeon, pm = create_pigeon()
init_baudrate(pigeon)
r = initialize_pigeon(pigeon)
Params().put_bool("UbloxAvailable", r)
# start receiving data
run_receiving(pigeon, pm)
if __name__ == "__main__":
main()

@ -1,20 +1,18 @@
# GPS test setup
Testing the GPS receiver using GPS spoofing. At the moment only
static location relpay is supported.
# Usage
```
# replaying a static location
./gpstest.sh -e <ephemeris file> -s <static location>
# replaying a prerecorded route (NMEA cvs file)
./gpstest.sh -e <ephemeris file> -d <dynamic location>
# on host, start gps signal simulation
./run_static_lime.py
```
If `-e` is not provided the latest ephemeris file will be downloaded from
`run_static_lime.py` downloads the latest ephemeris file from
https://cddis.nasa.gov/archive/gnss/data/daily/20xx/brdc/.
(TODO: add auto downloader)
# Hardware Setup
# Hardware Setup
* [LimeSDR USB](https://wiki.myriadrf.org/LimeSDR-USB)
* Asus AX58BT antenna

@ -0,0 +1,142 @@
#!/usr/bin/env python3
import sys
import time
import random
import datetime as dt
import subprocess as sp
import multiprocessing
import threading
from typing import Tuple, Any
from laika.downloader import download_nav
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
cache_dir = '/tmp/gpstest/'
def download_rinex():
# TODO: check if there is a better way to get the full brdc file for LimeGPS
gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
utc_time = dt.datetime.utcnow() - dt.timedelta(1)
gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
return download_nav(gps_time, cache_dir, ConstellationId.GPS)
def exec_LimeGPS_bin(rinex_file: str, location: str, duration: int):
# this functions should never return, cause return means, timeout is
# reached or it crashed
try:
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", location]
sp.check_output(cmd, timeout=duration)
except sp.TimeoutExpired:
print("LimeGPS timeout reached!")
except Exception as e:
print(f"LimeGPS crashed: {str(e)}")
def run_lime_gps(rinex_file: str, location: str, duration: int):
print(f"LimeGPS {location} {duration}")
p = multiprocessing.Process(target=exec_LimeGPS_bin,
args=(rinex_file, location, duration))
p.start()
return p
def get_random_coords(lat, lon) -> Tuple[int, int]:
# jump around the world
# max values, lat: -90 to 90, lon: -180 to 180
lat_add = random.random()*20 + 10
lon_add = random.random()*20 + 20
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5)
def get_continuous_coords(lat, lon) -> Tuple[int, int]:
# continuously move around the world
lat_add = random.random()*0.01
lon_add = random.random()*0.01
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5)
rc_p: Any = None
def exec_remote_checker(lat, lon, duration):
global rc_p
# TODO: good enough for testing
remote_cmd = "export PYTHONPATH=/data/pythonpath:/data/pythonpath/pyextra && "
remote_cmd += "cd /data/openpilot && "
remote_cmd += f"timeout {duration} /usr/local/pyenv/shims/python tools/gpstest/remote_checker.py "
remote_cmd += f"{lat} {lon}"
ssh_cmd = ['ssh', '-i', '/home/batman/openpilot/xx/phone/key/id_rsa',
'comma@192.168.60.130']
ssh_cmd += [remote_cmd]
rc_p = sp.Popen(ssh_cmd, stdout=sp.PIPE)
rc_p.wait()
rc_output = rc_p.stdout.read()
print(f"Checker Result: {rc_output.strip().decode('utf-8')}")
def run_remote_checker(spoof_proc, lat, lon, duration) -> bool:
checker_thread = threading.Thread(target=exec_remote_checker, args=(lat, lon, duration))
checker_thread.start()
tcnt = 0
while True:
if not checker_thread.is_alive():
# assume this only happens when the signal got matched
return True
# the spoofing process has a timeout, kill checker if reached
if not spoof_proc.is_alive():
rc_p.kill()
# spoofing process died, assume timeout
print("Spoofing process timeout")
return False
print(f"Time elapsed: {tcnt}[s]", end = "\r")
time.sleep(1)
tcnt += 1
def main():
continuous_mode = False
if len(sys.argv) == 2 and sys.argv[1] == '-c':
print("Continuous Mode!")
continuous_mode = True
rinex_file = download_rinex()
duration = 60*3 # max runtime in seconds
lat, lon = get_random_coords(47.2020, 15.7403)
while True:
# spoof random location
spoof_proc = run_lime_gps(rinex_file, f"{lat},{lon},100", duration)
start_time = time.monotonic()
# remote checker runs blocking
if not run_remote_checker(spoof_proc, lat, lon, duration):
# location could not be matched by ublox module
pass
end_time = time.monotonic()
spoof_proc.terminate()
# -1 to count process startup
print(f"Time to get Signal: {round(end_time - start_time - 1, 4)}")
if continuous_mode:
lat, lon = get_continuous_coords(lat, lon)
else:
lat, lon = get_random_coords(lat, lon)
if __name__ == "__main__":
main()

@ -1,8 +0,0 @@
#!/bin/bash
LimeGPS_BIN=LimeGPS/LimeGPS
if test -f "$LimeGPS_BIN"; then
LD_PRELOAD=LimeSuite/builddir/src/libLimeSuite.so $LimeGPS_BIN $@
else
echo "LimeGPS binary not found, run 'setup.sh' first"
fi

@ -0,0 +1,13 @@
diff --git a/gpssim.h b/gpssim.h
index c30b227..2ae0802 100644
--- a/gpssim.h
+++ b/gpssim.h
@@ -75,7 +75,7 @@
#define SC08 (8)
#define SC16 (16)
-#define EPHEM_ARRAY_SIZE (13) // for daily GPS broadcast ephemers file (brdc)
+#define EPHEM_ARRAY_SIZE (20) // for daily GPS broadcast ephemers file (brdc)
/*! \brief Structure representing GPS time */
typedef struct

@ -0,0 +1,11 @@
diff --git a/makefile b/makefile
index 51bfabf..d0ea1eb 100644
--- a/makefile
+++ b/makefile
@@ -1,5 +1,4 @@
CC=gcc -O2 -Wall
all: limegps.c gpssim.c
- $(CC) -o LimeGPS limegps.c gpssim.c -lm -lpthread -lLimeSuite
-
+ $(CC) -o LimeGPS limegps.c gpssim.c -lm -lpthread -lLimeSuite -I../LimeSuite/src -L../LimeSuite/builddir/src -Wl,-rpath="$(PWD)/../LimeSuite/builddir/src"

@ -0,0 +1,13 @@
diff --git a/src/lms7002m/LMS7002M_RxTxCalibrations.cpp b/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
index 41a37044..ac29c6b6 100644
--- a/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
+++ b/src/lms7002m/LMS7002M_RxTxCalibrations.cpp
@@ -254,7 +254,7 @@ int LMS7002M::CalibrateTx(float_type bandwidth_Hz, bool useExtLoopback)
mcuControl->RunProcedure(useExtLoopback ? MCU_FUNCTION_CALIBRATE_TX_EXTLOOPB : MCU_FUNCTION_CALIBRATE_TX);
status = mcuControl->WaitForMCU(1000);
if(status != MCU_BD::MCU_NO_ERROR)
- return ReportError(EINVAL, "Tx Calibration: MCU error %i (%s)", status, MCU_BD::MCUStatusMessage(status));
+ return -1; //ReportError(EINVAL, "Tx Calibration: MCU error %i (%s)", status, MCU_BD::MCUStatusMessage(status));
}
//sync registers to cache

@ -0,0 +1,13 @@
diff --git a/src/FPGA_common/FPGA_common.cpp b/src/FPGA_common/FPGA_common.cpp
index 4e81f33e..7381c475 100644
--- a/src/FPGA_common/FPGA_common.cpp
+++ b/src/FPGA_common/FPGA_common.cpp
@@ -946,7 +946,7 @@ double FPGA::DetectRefClk(double fx3Clk)
if (i == 0)
return -1;
- lime::info("Reference clock %1.2f MHz", clkTbl[i - 1] / 1e6);
+ //lime::info("Reference clock %1.2f MHz", clkTbl[i - 1] / 1e6);
return clkTbl[i - 1];
}

@ -0,0 +1,48 @@
#!/usr/bin/env python3
import sys
import time
from typing import List
import cereal.messaging as messaging
from selfdrive.manager.process_config import managed_processes
DELTA = 0.001
# assume running openpilot for now
procs: List[str] = []#"ubloxd", "pigeond"]
def main():
if len(sys.argv) < 3:
print("args: <latitude> <longitude>")
return
sol_lat = float(sys.argv[1])
sol_lon = float(sys.argv[2])
for p in procs:
managed_processes[p].start()
time.sleep(0.5) # give time to startup
gps_sock = messaging.sub_sock('gpsLocationExternal', timeout=0.1)
# analyze until the location changed
while True:
events = messaging.drain_sock(gps_sock)
for e in events:
lat = e.gpsLocationExternal.latitude
lon = e.gpsLocationExternal.longitude
if abs(lat - sol_lat) < DELTA and abs(lon - sol_lon) < DELTA:
print("MATCH")
return
for p in procs:
if not managed_processes[p].proc.is_alive():
print(f"ERROR: '{p}' died")
return
if __name__ == "__main__":
main()
for p in procs:
managed_processes[p].stop()

@ -0,0 +1,83 @@
#!/usr/bin/env python3
import os
import sys
import random
import datetime as dt
import subprocess as sp
from typing import Tuple
from laika.downloader import download_nav
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
cache_dir = '/tmp/gpstest/'
def download_rinex():
# TODO: check if there is a better way to get the full brdc file for LimeGPS
gps_time = GPSTime.from_datetime(dt.datetime.utcnow())
utc_time = dt.datetime.utcnow() - dt.timedelta(1)
gps_time = GPSTime.from_datetime(dt.datetime(utc_time.year, utc_time.month, utc_time.day))
return download_nav(gps_time, cache_dir, ConstellationId.GPS)
def get_random_coords(lat, lon) -> Tuple[int, int]:
# jump around the world
# max values, lat: -90 to 90, lon: -180 to 180
lat_add = random.random()*20 + 10
lon_add = random.random()*20 + 20
lat = ((lat + lat_add + 90) % 180) - 90
lon = ((lon + lon_add + 180) % 360) - 180
return round(lat, 5), round(lon, 5)
def check_availability() -> bool:
cmd = ["LimeSuite/builddir/LimeUtil/LimeUtil", "--find"]
output = sp.check_output(cmd)
if output.strip() == b"":
return False
print(f"Device: {output.strip().decode('utf-8')}")
return True
def main():
if not os.path.exists('LimeGPS'):
print("LimeGPS not found run 'setup.sh' first")
return
if not os.path.exists('LimeSuite'):
print("LimeSuite not found run 'setup.sh' first")
return
if not check_availability():
print("No limeSDR device found!")
return
rinex_file = download_rinex()
lat, lon = get_random_coords(47.2020, 15.7403)
if len(sys.argv) == 3:
lat = float(sys.argv[1])
lon = float(sys.argv[2])
try:
print(f"starting LimeGPS, Location: {lat},{lon}")
cmd = ["LimeGPS/LimeGPS", "-e", rinex_file, "-l", f"{lat},{lon},100"]
sp.check_output(cmd, stderr=sp.PIPE)
except KeyboardInterrupt:
print("stopping LimeGPS")
except Exception as e:
out_stderr = e.stderr.decode('utf-8')# pylint:disable=no-member
if "Device is busy." in out_stderr:
print("GPS simulation is already running, Device is busy!")
return
print(f"LimeGPS crashed: {str(e)}")
print(f"stderr:\n{e.stderr.decode('utf-8')}")# pylint:disable=no-member
if __name__ == "__main__":
main()

@ -9,8 +9,9 @@ if [ ! -d LimeSuite ]; then
cd LimeSuite
# checkout latest version which has firmware updates available
git checkout v20.10.0
git apply ../patches/limeSuite/*
mkdir builddir && cd builddir
cmake ..
cmake -DCMAKE_BUILD_TYPE=Release ..
make -j4
cd ../..
fi
@ -18,8 +19,7 @@ fi
if [ ! -d LimeGPS ]; then
git clone https://github.com/osqzss/LimeGPS.git
cd LimeGPS
sed -i 's/LimeSuite/LimeSuite -I..\/LimeSuite\/src -L..\/LimeSuite\/builddir\/src/' makefile
git apply ../patches/limeGPS/*
make
cd ..
fi

@ -0,0 +1,140 @@
#!/usr/bin/env python3
import time
import unittest
import struct
import numpy as np
import cereal.messaging as messaging
import selfdrive.sensord.pigeond as pd
from system.hardware import TICI
from selfdrive.test.helpers import with_processes
def read_events(service, duration_sec):
service_sock = messaging.sub_sock(service, timeout=0.1)
start_time_sec = time.monotonic()
events = []
while time.monotonic() - start_time_sec < duration_sec:
events += messaging.drain_sock(service_sock)
time.sleep(0.1)
assert len(events) != 0, f"No '{service}'events collected!"
return events
def verify_ubloxgnss_data(socket: messaging.SubSocket):
start_time = 0
end_time = 0
events = messaging.drain_sock(socket)
assert len(events) != 0, "no ublxGnss measurements"
for event in events:
if event.ubloxGnss.which() != "measurementReport":
continue
if start_time == 0:
start_time = event.logMonoTime
if event.ubloxGnss.measurementReport.numMeas != 0:
end_time = event.logMonoTime
break
assert end_time != 0, "no ublox measurements received!"
ttfm = (end_time - start_time)/1e9
assert ttfm < 35, f"Time to first measurement > 35s, {ttfm}"
# check for satellite count in measurements
sat_count = []
end_id = events.index(event)# pylint:disable=undefined-loop-variable
for event in events[end_id:]:
if event.ubloxGnss.which() == "measurementReport":
sat_count.append(event.ubloxGnss.measurementReport.numMeas)
num_sat = int(sum(sat_count)/len(sat_count))
assert num_sat > 8, f"Not enough satellites {num_sat} (TestBox setup!)"
def verify_gps_location(socket: messaging.SubSocket):
buf_lon = [0]*10
buf_lat = [0]*10
buf_i = 0
events = messaging.drain_sock(socket)
assert len(events) != 0, "no gpsLocationExternal measurements"
start_time = events[0].logMonoTime
end_time = 0
for event in events:
buf_lon[buf_i % 10] = event.gpsLocationExternal.longitude
buf_lat[buf_i % 10] = event.gpsLocationExternal.latitude
buf_i += 1
if buf_i < 9:
continue
if any([lat == 0 or lon == 0 for lat,lon in zip(buf_lat, buf_lon)]):
continue
if np.std(buf_lon) < 1e-5 and np.std(buf_lat) < 1e-5:
end_time = event.logMonoTime
break
assert end_time != 0, "GPS location never converged!"
ttfl = (end_time - start_time)/1e9
assert ttfl < 40, f"Time to first location > 40s, {ttfl}"
hacc = events[-1].gpsLocationExternal.accuracy
vacc = events[-1].gpsLocationExternal.verticalAccuracy
assert hacc < 15, f"Horizontal accuracy too high, {hacc}"
assert vacc < 43, f"Vertical accuracy too high, {vacc}"
def verify_time_to_first_fix(pigeon):
# get time to first fix from nav status message
nav_status = b""
while True:
pigeon.send(b"\xb5\x62\x01\x03\x00\x00\x04\x0d")
nav_status = pigeon.receive()
if nav_status[:4] == b"\xb5\x62\x01\x03":
break
values = struct.unpack("<HHHIBBBBIIH", nav_status[:24])
ttff = values[8]/1000
# srms = values[9]/1000
assert ttff < 40, f"Time to first fix > 40s, {ttff}"
class TestGPS(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not TICI:
raise unittest.SkipTest
def tearDown(self):
pd.set_power(False)
@with_processes(['ubloxd'])
def test_ublox_reset(self):
pigeon, pm = pd.create_pigeon()
pd.init_baudrate(pigeon)
assert pigeon.reset_device(), "Could not reset device!"
pd.initialize_pigeon(pigeon)
ugs = messaging.sub_sock("ubloxGnss", timeout=0.1)
gle = messaging.sub_sock("gpsLocationExternal", timeout=0.1)
# receive some messages (restart after cold start takes up to 30seconds)
pd.run_receiving(pigeon, pm, 40)
verify_ubloxgnss_data(ugs)
verify_gps_location(gle)
# skip for now, this might hang for a while
#verify_time_to_first_fix(pigeon)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save