pull/36106/head
Adeeb Shihadeh 2 days ago
parent 4b50fe2a2a
commit a55cd76ead
  1. 308
      system/ubloxd/test_ubloxd.py
  2. 20
      system/ubloxd/tests/print_gps_stats.py
  3. 89
      system/ubloxd/tests/ubloxd.py

@ -1,308 +0,0 @@
#!/usr/bin/env python3
"""
Test suite for the Python ubloxd implementation.
Tests UBLOX message parsing, processing, and cereal message generation.
"""
import struct
from openpilot.system.ubloxd.ubloxd import (
UBXMessageParser, UBXMessageProcessor, UBXMessage,
UBLOX_PREAMBLE1, UBLOX_PREAMBLE2, UBXClass, UBXMessageID
)
class TestUBXMessageParser:
"""Test UBLOX message parser functionality"""
def setup_method(self):
self.parser = UBXMessageParser()
def test_calculate_checksum(self):
"""Test UBLOX checksum calculation"""
# Test message: UBX header + simple payload
msg = bytearray([0xb5, 0x62, 0x01, 0x07, 0x04, 0x00, 0x01, 0x02, 0x03, 0x04])
ck_a, ck_b = self.parser._calculate_checksum(msg, 2, 10)
assert isinstance(ck_a, int)
assert isinstance(ck_b, int)
assert 0 <= ck_a <= 255
assert 0 <= ck_b <= 255
def test_simple_message_parsing(self):
"""Test parsing a complete simple message"""
# Create a simple test message: UBX-NAV-PVT stub
payload = b'\x00' * 92 # Minimum NAV-PVT payload size
msg_data = struct.pack('<BBBBH', UBLOX_PREAMBLE1, UBLOX_PREAMBLE2,
UBXClass.NAV, UBXMessageID.NAV.PVT, len(payload))
msg_data += payload
# Add checksum
ck_a = ck_b = 0
for byte in msg_data[2:]:
ck_a = (ck_a + byte) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
msg_data += bytes([ck_a, ck_b])
# Parse the message
complete, consumed = self.parser.add_data(123.456, msg_data)
assert complete
assert consumed == len(msg_data)
# Extract the parsed message
parsed_msg = self.parser.parse_message()
assert parsed_msg is not None
assert parsed_msg.msg_class == UBXClass.NAV
assert parsed_msg.msg_id == UBXMessageID.NAV.PVT
assert len(parsed_msg.payload) == 92
assert parsed_msg.checksum_valid
assert parsed_msg.log_time == 123.456
def test_incremental_parsing(self):
"""Test parsing message received in multiple chunks"""
# Create test message
payload = b'\x11\x22\x33\x44'
msg_data = struct.pack('<BBBBH', UBLOX_PREAMBLE1, UBLOX_PREAMBLE2,
0x01, 0x02, len(payload))
msg_data += payload
# Add checksum
ck_a = ck_b = 0
for byte in msg_data[2:]:
ck_a = (ck_a + byte) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
msg_data += bytes([ck_a, ck_b])
# Send message in chunks
chunk1 = msg_data[:4] # Header partial
chunk2 = msg_data[4:8] # Header complete + payload start
chunk3 = msg_data[8:] # Rest of payload + checksum
# Process chunks
complete1, consumed1 = self.parser.add_data(100.0, chunk1)
assert not complete1
assert consumed1 == len(chunk1)
complete2, consumed2 = self.parser.add_data(100.0, chunk2)
assert not complete2
assert consumed2 == len(chunk2)
complete3, consumed3 = self.parser.add_data(100.0, chunk3)
assert complete3
assert consumed3 == len(chunk3)
# Verify parsed message
parsed_msg = self.parser.parse_message()
assert parsed_msg is not None
assert parsed_msg.payload == payload
def test_corrupted_data_recovery(self):
"""Test recovery from corrupted message data"""
# Send some garbage data followed by valid message
garbage = b'\x00\x11\x22\x33\x44\x55'
# Valid message
payload = b'\xaa\xbb'
msg_data = struct.pack('<BBBBH', UBLOX_PREAMBLE1, UBLOX_PREAMBLE2,
0x01, 0x02, len(payload))
msg_data += payload
# Add checksum
ck_a = ck_b = 0
for byte in msg_data[2:]:
ck_a = (ck_a + byte) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
msg_data += bytes([ck_a, ck_b])
combined_data = garbage + msg_data
# Parser should skip garbage and find valid message
complete, consumed = self.parser.add_data(100.0, combined_data)
assert complete
parsed_msg = self.parser.parse_message()
assert parsed_msg is not None
assert parsed_msg.payload == payload
def test_reset(self):
"""Test parser reset functionality"""
# Add some data
test_data = b'\xb5\x62\x01\x02'
self.parser.add_data(100.0, test_data)
assert len(self.parser.parse_buffer) > 0
# Reset should clear buffer
self.parser.reset()
assert len(self.parser.parse_buffer) == 0
class TestUBXMessageProcessor:
"""Test UBLOX message processing and cereal event generation"""
def setup_method(self):
self.processor = UBXMessageProcessor()
def test_process_nav_pvt(self, mocker):
"""Test processing of NAV-PVT messages"""
# Mock cereal message
mock_event = mocker.MagicMock()
mock_location = mocker.MagicMock()
mock_event.gpsLocationExternal = mock_location
mock_new_message = mocker.patch('openpilot.system.ubloxd.ubloxd.messaging.new_message',
return_value=mock_event)
# Create NAV-PVT payload with test data
fixType = 3 # 3D fix
numSV = 12 # 12 satellites
lon = int(-122.419416 * 1e7) # San Francisco longitude
lat = int(37.774929 * 1e7) # San Francisco latitude
height = 100000 # 100m above ellipsoid (in mm)
hAcc = 5000 # 5m horizontal accuracy (in mm)
vAcc = 10000 # 10m vertical accuracy (in mm)
gSpeed = 2236 # ground speed ~2.24 m/s (in mm/s)
headMot = 4500000 # 45 degrees heading (in 1e-5 deg)
sAcc = 500 # 0.5 m/s speed accuracy (in mm/s)
headAcc = 1800000 # 18 degree heading accuracy (in 1e-5 deg)
# Pack the payload (first 92 bytes of NAV-PVT)
payload = struct.pack('<IHBBBBBBIiBBBBiiiiIIiiiiiIIHHHHHHBBBB',
123000, # iTOW
2024, 1, 15, 12, 30, 45, # year, month, day, hour, min, sec
0x07, # valid flags
1000, # tAcc
0, # nano
fixType, # fixType (3D)
0x01, # flags
0x00, # flags2
numSV, # numSV
lon, lat, height, 5000, hAcc, vAcc,
0, 0, 0, gSpeed, headMot, sAcc, headAcc, 100,
0, 0, 0, 0, 0, 0, 0, 0, 0) # Reserved fields
# Create UBX message
msg = UBXMessage(
msg_class=UBXClass.NAV,
msg_id=UBXMessageID.NAV.PVT,
payload=payload,
checksum_valid=True,
log_time=100.0
)
# Process message
events = self.processor.process_message(msg)
# Verify results
assert len(events) == 1
mock_new_message.assert_called_once_with('gpsLocationExternal', valid=True)
# Check that location fields were set correctly
assert mock_location.unixTimestampMillis == 100000
assert abs(mock_location.latitude - 37.774929) < 1e-6
assert abs(mock_location.longitude - (-122.419416)) < 1e-6
assert abs(mock_location.altitude - 100.0) < 0.01
assert abs(mock_location.speed - 2.236) < 0.001
assert abs(mock_location.bearingDeg - 45.0) < 0.1
assert abs(mock_location.horizontalAccuracy - 5.0) < 0.01
assert mock_location.hasFix
assert mock_location.satelliteCount == 12
def test_process_invalid_message(self):
"""Test handling of invalid/corrupted messages"""
# Create message with invalid payload (too short for NAV-PVT)
msg = UBXMessage(
msg_class=UBXClass.NAV,
msg_id=UBXMessageID.NAV.PVT,
payload=b'\x00\x01\x02', # Too short
checksum_valid=True,
log_time=100.0
)
# Should handle gracefully without crashing
events = self.processor.process_message(msg)
assert len(events) == 0
def test_process_unknown_message(self):
"""Test handling of unknown message types"""
msg = UBXMessage(
msg_class=0xFF, # Unknown class
msg_id=0xFF, # Unknown ID
payload=b'\x00\x01\x02\x03',
checksum_valid=True,
log_time=100.0
)
# Should handle gracefully
events = self.processor.process_message(msg)
assert len(events) == 0
class TestIntegration:
"""Integration tests for complete parsing pipeline"""
def test_end_to_end_nav_pvt(self, mocker):
"""Test complete NAV-PVT message parsing and processing"""
parser = UBXMessageParser()
processor = UBXMessageProcessor()
# Create realistic NAV-PVT message
payload = struct.pack('<IHBBBBBBIiBBBBiiiiIIiiiiiIIHHHHHHBBBB',
123000, # iTOW
2024, 1, 15, 12, 30, 45, # year, month, day, hour, min, sec
0x07, # valid flags
1000, # tAcc
0, # nano
3, # fixType (3D)
0x01, # flags
0x00, # flags2
10, # numSV
int(-122.0 * 1e7), # lon
int(37.0 * 1e7), # lat
10000, # height (10m)
5000, # hMSL (5m)
2000, # hAcc (2m)
3000, # vAcc (3m)
0, 0, 0, # velN, velE, velD
0, # gSpeed
0, # headMot
0, # sAcc
0, # headAcc
100, # pDOP
0, 0, 0, 0, 0, 0, 0, 0, 0) # reserved
# Build complete message with header and checksum
msg_data = struct.pack('<BBBBH', UBLOX_PREAMBLE1, UBLOX_PREAMBLE2,
UBXClass.NAV, UBXMessageID.NAV.PVT, len(payload))
msg_data += payload
# Calculate and add checksum
ck_a = ck_b = 0
for byte in msg_data[2:]:
ck_a = (ck_a + byte) & 0xFF
ck_b = (ck_b + ck_a) & 0xFF
msg_data += bytes([ck_a, ck_b])
# Parse message
mock_event = mocker.MagicMock()
mock_location = mocker.MagicMock()
mock_event.gpsLocationExternal = mock_location
mock_new_message = mocker.patch('openpilot.system.ubloxd.ubloxd.messaging.new_message',
return_value=mock_event)
complete, consumed = parser.add_data(200.0, msg_data)
assert complete
parsed_msg = parser.parse_message()
assert parsed_msg is not None
assert parsed_msg.checksum_valid
events = processor.process_message(parsed_msg)
assert len(events) == 1
# Verify location was processed correctly
mock_new_message.assert_called_once_with('gpsLocationExternal', valid=True)
assert mock_location.unixTimestampMillis == 200000
assert abs(mock_location.latitude - 37.0) < 0.1
assert abs(mock_location.longitude - (-122.0)) < 0.1

@ -1,20 +0,0 @@
#!/usr/bin/env python3
import time
import cereal.messaging as messaging
if __name__ == "__main__":
sm = messaging.SubMaster(['ubloxGnss', 'gpsLocationExternal'])
while 1:
ug = sm['ubloxGnss']
gle = sm['gpsLocationExternal']
try:
cnos = []
for m in ug.measurementReport.measurements:
cnos.append(m.cno)
print(f"Sats: {ug.measurementReport.numMeas} Accuracy: {gle.horizontalAccuracy:.2f} m cnos", sorted(cnos))
except Exception:
pass
sm.update()
time.sleep(0.1)

@ -1,89 +0,0 @@
#!/usr/bin/env python3
# type: ignore
from openpilot.selfdrive.locationd.test import ublox
import struct
baudrate = 460800
rate = 100 # send new data every 100ms
def configure_ublox(dev):
# configure ports and solution parameters and rate
dev.configure_port(port=ublox.PORT_USB, inMask=1, outMask=1) # enable only UBX on USB
dev.configure_port(port=0, inMask=0, outMask=0) # disable DDC
payload = struct.pack('<BBHIIHHHBB', 1, 0, 0, 2240, baudrate, 1, 1, 0, 0, 0)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_PRT, payload) # enable UART
dev.configure_port(port=4, inMask=0, outMask=0) # disable SPI
dev.configure_poll_port()
dev.configure_poll_port(ublox.PORT_SERIAL1)
dev.configure_poll_port(ublox.PORT_USB)
dev.configure_solution_rate(rate_ms=rate)
# Configure solution
payload = struct.pack('<HBBIIBB4H6BH6B', 5, 4, 3, 0, 0,
0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
0, 0, 0, 0, 0,
0, 0, 0, 0)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_NAV5, payload)
payload = struct.pack('<B3BBB6BBB2BBB2B', 0, 0, 0, 0, 1,
3, 0, 0, 0, 0,
0, 0, 0, 0, 0,
0, 0, 0, 0, 0)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_ODO, payload)
#bits_ITMF_config1 = '10101101011000101010110111111111'
#bits_ITMF_config2 = '00000000000000000110001100011110'
ITMF_config1 = 2908925439
ITMF_config2 = 25374
payload = struct.pack('<II', ITMF_config1, ITMF_config2)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_ITMF, payload)
payload = struct.pack('<HHIBBBBBBBBBBH6BBB2BH4B3BB', 0, (1 << 10), 0, 0, 0,
0, 0, 0, 0, 0, 0,
0, 1, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0,
0, 0, 0, 0)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_NAVX5, payload)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_NAV5)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_NAVX5)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_ODO)
dev.configure_poll(ublox.CLASS_CFG, ublox.MSG_CFG_ITMF)
# Configure RAW, PVT and HW messages to be sent every solution cycle
dev.configure_message_rate(ublox.CLASS_NAV, ublox.MSG_NAV_PVT, 1)
dev.configure_message_rate(ublox.CLASS_RXM, ublox.MSG_RXM_RAW, 1)
dev.configure_message_rate(ublox.CLASS_RXM, ublox.MSG_RXM_SFRBX, 1)
dev.configure_message_rate(ublox.CLASS_MON, ublox.MSG_MON_HW, 1)
dev.configure_message_rate(ublox.CLASS_MON, ublox.MSG_MON_HW2, 1)
dev.configure_message_rate(ublox.CLASS_NAV, ublox.MSG_NAV_SAT, 1)
# Query the backup restore status
print("backup restore polling message (implement custom response handler!):")
dev.configure_poll(0x09, 0x14)
print("if successful, send this to clear the flash:")
dev.send_message(0x09, 0x14, b"\x01\x00\x00\x00")
print("send on stop:")
# Save on shutdown
# Controlled GNSS stop and hot start
payload = struct.pack('<HBB', 0x0000, 0x08, 0x00)
dev.send_message(ublox.CLASS_CFG, ublox.MSG_CFG_RST, payload)
# UBX-UPD-SOS backup
dev.send_message(0x09, 0x14, b"\x00\x00\x00\x00")
if __name__ == "__main__":
class Device:
def write(self, s):
d = '"{}"s'.format(''.join(f'\\x{b:02X}' for b in s))
print(f" if (!send_with_ack({d})) continue;")
dev = ublox.UBlox(Device(), baudrate=baudrate)
configure_ublox(dev)
Loading…
Cancel
Save