#!/usr/bin/env python3
import math
import capnp
import calendar
import numpy as np
from collections import defaultdict
from dataclasses import dataclass
from cereal import log
from cereal import messaging
from openpilot . system . ubloxd . generated . ubx import Ubx
from openpilot . system . ubloxd . generated . gps import Gps
from openpilot . system . ubloxd . generated . glonass import Glonass
SECS_IN_MIN = 60
SECS_IN_HR = 60 * SECS_IN_MIN
SECS_IN_DAY = 24 * SECS_IN_HR
SECS_IN_WEEK = 7 * SECS_IN_DAY
class UbxFramer :
PREAMBLE1 = 0xB5
PREAMBLE2 = 0x62
HEADER_SIZE = 6
CHECKSUM_SIZE = 2
def __init__ ( self ) - > None :
self . buf = bytearray ( )
self . last_log_time = 0.0
def reset ( self ) - > None :
self . buf . clear ( )
@staticmethod
def _checksum_ok ( frame : bytes ) - > bool :
ck_a = 0
ck_b = 0
for b in frame [ 2 : - 2 ] :
ck_a = ( ck_a + b ) & 0xFF
ck_b = ( ck_b + ck_a ) & 0xFF
return ck_a == frame [ - 2 ] and ck_b == frame [ - 1 ]
def add_data ( self , log_time : float , incoming : bytes ) - > list [ bytes ] :
self . last_log_time = log_time
out : list [ bytes ] = [ ]
if not incoming :
return out
self . buf + = incoming
while True :
# find preamble
if len ( self . buf ) < 2 :
break
start = self . buf . find ( b " \xB5 \x62 " )
if start < 0 :
# no preamble in buffer
self . buf . clear ( )
break
if start > 0 :
# drop garbage before preamble
self . buf = self . buf [ start : ]
if len ( self . buf ) < self . HEADER_SIZE :
break
length_le = int . from_bytes ( self . buf [ 4 : 6 ] , ' little ' , signed = False )
total_len = self . HEADER_SIZE + length_le + self . CHECKSUM_SIZE
if len ( self . buf ) < total_len :
break
candidate = bytes ( self . buf [ : total_len ] )
if self . _checksum_ok ( candidate ) :
out . append ( candidate )
# consume this frame
self . buf = self . buf [ total_len : ]
else :
# drop first byte and retry
self . buf = self . buf [ 1 : ]
return out
def _bit ( b : int , shift : int ) - > bool :
return ( b & ( 1 << shift ) ) != 0
@dataclass
class EphemerisCaches :
gps_subframes : defaultdict [ int , dict [ int , bytes ] ]
glonass_strings : defaultdict [ int , dict [ int , bytes ] ]
glonass_string_times : defaultdict [ int , dict [ int , float ] ]
glonass_string_superframes : defaultdict [ int , dict [ int , int ] ]
class UbloxMsgParser :
gpsPi = 3.1415926535898
# user range accuracy in meters
glonass_URA_lookup : dict [ int , float ] = {
0 : 1 , 1 : 2 , 2 : 2.5 , 3 : 4 , 4 : 5 , 5 : 7 ,
6 : 10 , 7 : 12 , 8 : 14 , 9 : 16 , 10 : 32 ,
11 : 64 , 12 : 128 , 13 : 256 , 14 : 512 , 15 : 1024 ,
}
def __init__ ( self ) - > None :
self . framer = UbxFramer ( )
self . caches = EphemerisCaches (
gps_subframes = defaultdict ( dict ) ,
glonass_strings = defaultdict ( dict ) ,
glonass_string_times = defaultdict ( dict ) ,
glonass_string_superframes = defaultdict ( dict ) ,
)
# Message generation entry point
def parse_frame ( self , frame : bytes ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] | None :
# Quick header parse
msg_type = int . from_bytes ( frame [ 2 : 4 ] , ' big ' )
payload = frame [ 6 : - 2 ]
if msg_type == 0x0107 :
body = Ubx . NavPvt . from_bytes ( payload )
return self . _gen_nav_pvt ( body )
if msg_type == 0x0213 :
# Manually parse RXM-SFRBX to avoid Kaitai EOF on some frames
if len ( payload ) < 8 :
return None
gnss_id = payload [ 0 ]
sv_id = payload [ 1 ]
freq_id = payload [ 3 ]
num_words = payload [ 4 ]
exp = 8 + 4 * num_words
if exp != len ( payload ) :
return None
words : list [ int ] = [ ]
off = 8
for _ in range ( num_words ) :
words . append ( int . from_bytes ( payload [ off : off + 4 ] , ' little ' ) )
off + = 4
class _SfrbxView :
def __init__ ( self , gid : int , sid : int , fid : int , body : list [ int ] ) :
self . gnss_id = Ubx . GnssType ( gid )
self . sv_id = sid
self . freq_id = fid
self . body = body
view = _SfrbxView ( gnss_id , sv_id , freq_id , words )
return self . _gen_rxm_sfrbx ( view )
if msg_type == 0x0215 :
body = Ubx . RxmRawx . from_bytes ( payload )
return self . _gen_rxm_rawx ( body )
if msg_type == 0x0A09 :
body = Ubx . MonHw . from_bytes ( payload )
return self . _gen_mon_hw ( body )
if msg_type == 0x0A0B :
body = Ubx . MonHw2 . from_bytes ( payload )
return self . _gen_mon_hw2 ( body )
if msg_type == 0x0135 :
body = Ubx . NavSat . from_bytes ( payload )
return self . _gen_nav_sat ( body )
return None
# NAV-PVT -> gpsLocationExternal
def _gen_nav_pvt ( self , msg : Ubx . NavPvt ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] :
dat = messaging . new_message ( ' gpsLocationExternal ' , valid = True )
gps = dat . gpsLocationExternal
gps . source = log . GpsLocationData . SensorSource . ublox
gps . flags = msg . flags
gps . hasFix = ( msg . flags % 2 ) == 1
gps . latitude = msg . lat * 1e-07
gps . longitude = msg . lon * 1e-07
gps . altitude = msg . height * 1e-03
gps . speed = msg . g_speed * 1e-03
gps . bearingDeg = msg . head_mot * 1e-5
gps . horizontalAccuracy = msg . h_acc * 1e-03
gps . satelliteCount = msg . num_sv
# build UTC timestamp millis (NAV-PVT is in UTC)
# tolerate invalid or unset date values like C++ timegm
try :
utc_tt = calendar . timegm ( ( msg . year , msg . month , msg . day , msg . hour , msg . min , msg . sec , 0 , 0 , 0 ) )
except Exception :
utc_tt = 0
gps . unixTimestampMillis = int ( utc_tt * 1e3 + ( msg . nano * 1e-6 ) )
# match C++ float32 rounding semantics exactly
gps . vNED = [
float ( np . float32 ( msg . vel_n ) * np . float32 ( 1e-03 ) ) ,
float ( np . float32 ( msg . vel_e ) * np . float32 ( 1e-03 ) ) ,
float ( np . float32 ( msg . vel_d ) * np . float32 ( 1e-03 ) ) ,
]
gps . verticalAccuracy = msg . v_acc * 1e-03
gps . speedAccuracy = msg . s_acc * 1e-03
gps . bearingAccuracyDeg = msg . head_acc * 1e-05
return ( ' gpsLocationExternal ' , dat )
# RXM-SFRBX dispatch to GPS or GLONASS ephemeris
def _gen_rxm_sfrbx ( self , msg ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] | None :
if msg . gnss_id == Ubx . GnssType . gps :
return self . _parse_gps_ephemeris ( msg )
if msg . gnss_id == Ubx . GnssType . glonass :
return self . _parse_glonass_ephemeris ( msg )
return None
def _parse_gps_ephemeris ( self , msg : Ubx . RxmSfrbx ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] | None :
# body is list of 10 words; convert to 30-byte subframe (strip parity/padding)
body = msg . body
if len ( body ) != 10 :
return None
subframe_data = bytearray ( )
for word in body :
word >> = 6
subframe_data . append ( ( word >> 16 ) & 0xFF )
subframe_data . append ( ( word >> 8 ) & 0xFF )
subframe_data . append ( word & 0xFF )
sf = Gps . from_bytes ( bytes ( subframe_data ) )
subframe_id = sf . how . subframe_id
if subframe_id < 1 or subframe_id > 3 :
return None
self . caches . gps_subframes [ msg . sv_id ] [ subframe_id ] = bytes ( subframe_data )
if len ( self . caches . gps_subframes [ msg . sv_id ] ) != 3 :
return None
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
eph = dat . ubloxGnss . init ( ' ephemeris ' )
eph . svId = msg . sv_id
iode_s2 = 0
iode_s3 = 0
iodc_lsb = 0
week = 0
# Subframe 1
sf1 = Gps . from_bytes ( self . caches . gps_subframes [ msg . sv_id ] [ 1 ] )
s1 = sf1 . body
assert isinstance ( s1 , Gps . Subframe1 )
week = s1 . week_no
week + = 1024
if week < 1877 :
week + = 1024
eph . tgd = s1 . t_gd * math . pow ( 2 , - 31 )
eph . toc = s1 . t_oc * math . pow ( 2 , 4 )
eph . af2 = s1 . af_2 * math . pow ( 2 , - 55 )
eph . af1 = s1 . af_1 * math . pow ( 2 , - 43 )
eph . af0 = s1 . af_0 * math . pow ( 2 , - 31 )
eph . svHealth = s1 . sv_health
eph . towCount = sf1 . how . tow_count
iodc_lsb = s1 . iodc_lsb
# Subframe 2
sf2 = Gps . from_bytes ( self . caches . gps_subframes [ msg . sv_id ] [ 2 ] )
s2 = sf2 . body
assert isinstance ( s2 , Gps . Subframe2 )
if s2 . t_oe == 0 and sf2 . how . tow_count * 6 > = ( SECS_IN_WEEK - 2 * SECS_IN_HR ) :
week + = 1
eph . crs = s2 . c_rs * math . pow ( 2 , - 5 )
eph . deltaN = s2 . delta_n * math . pow ( 2 , - 43 ) * self . gpsPi
eph . m0 = s2 . m_0 * math . pow ( 2 , - 31 ) * self . gpsPi
eph . cuc = s2 . c_uc * math . pow ( 2 , - 29 )
eph . ecc = s2 . e * math . pow ( 2 , - 33 )
eph . cus = s2 . c_us * math . pow ( 2 , - 29 )
eph . a = math . pow ( s2 . sqrt_a * math . pow ( 2 , - 19 ) , 2.0 )
eph . toe = s2 . t_oe * math . pow ( 2 , 4 )
iode_s2 = s2 . iode
# Subframe 3
sf3 = Gps . from_bytes ( self . caches . gps_subframes [ msg . sv_id ] [ 3 ] )
s3 = sf3 . body
assert isinstance ( s3 , Gps . Subframe3 )
eph . cic = s3 . c_ic * math . pow ( 2 , - 29 )
eph . omega0 = s3 . omega_0 * math . pow ( 2 , - 31 ) * self . gpsPi
eph . cis = s3 . c_is * math . pow ( 2 , - 29 )
eph . i0 = s3 . i_0 * math . pow ( 2 , - 31 ) * self . gpsPi
eph . crc = s3 . c_rc * math . pow ( 2 , - 5 )
eph . omega = s3 . omega * math . pow ( 2 , - 31 ) * self . gpsPi
eph . omegaDot = s3 . omega_dot * math . pow ( 2 , - 43 ) * self . gpsPi
eph . iode = s3 . iode
eph . iDot = s3 . idot * math . pow ( 2 , - 43 ) * self . gpsPi
iode_s3 = s3 . iode
eph . toeWeek = week
eph . tocWeek = week
# clear cache for this SV
self . caches . gps_subframes [ msg . sv_id ] . clear ( )
if not ( iodc_lsb == iode_s2 == iode_s3 ) :
return None
return ( ' ubloxGnss ' , dat )
def _parse_glonass_ephemeris ( self , msg : Ubx . RxmSfrbx ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] | None :
# words are 4 bytes each; Glonass parser expects 16 bytes (string)
body = msg . body
if len ( body ) != 4 :
return None
string_bytes = bytearray ( )
for word in body :
for i in ( 3 , 2 , 1 , 0 ) :
string_bytes . append ( ( word >> ( 8 * i ) ) & 0xFF )
gl = Glonass . from_bytes ( bytes ( string_bytes ) )
string_number = gl . string_number
if string_number < 1 or string_number > 5 or gl . idle_chip :
return None
# correlate by superframe and timing, similar to C++ logic
freq_id = msg . freq_id
superframe_unknown = False
needs_clear = False
for i in range ( 1 , 6 ) :
if i not in self . caches . glonass_strings [ freq_id ] :
continue
sf_prev = self . caches . glonass_string_superframes [ freq_id ] . get ( i , 0 )
if sf_prev == 0 or gl . superframe_number == 0 :
superframe_unknown = True
elif sf_prev != gl . superframe_number :
needs_clear = True
if superframe_unknown :
prev_time = self . caches . glonass_string_times [ freq_id ] . get ( i , 0.0 )
if abs ( ( prev_time - 2.0 * i ) - ( self . framer . last_log_time - 2.0 * string_number ) ) > 10 :
needs_clear = True
if needs_clear :
self . caches . glonass_strings [ freq_id ] . clear ( )
self . caches . glonass_string_superframes [ freq_id ] . clear ( )
self . caches . glonass_string_times [ freq_id ] . clear ( )
self . caches . glonass_strings [ freq_id ] [ string_number ] = bytes ( string_bytes )
self . caches . glonass_string_superframes [ freq_id ] [ string_number ] = gl . superframe_number
self . caches . glonass_string_times [ freq_id ] [ string_number ] = self . framer . last_log_time
if msg . sv_id == 255 :
# unknown SV id
return None
if len ( self . caches . glonass_strings [ freq_id ] ) != 5 :
return None
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
eph = dat . ubloxGnss . init ( ' glonassEphemeris ' )
eph . svId = msg . sv_id
eph . freqNum = msg . freq_id - 7
current_day = 0
tk = 0
# string 1
try :
s1 = Glonass . from_bytes ( self . caches . glonass_strings [ freq_id ] [ 1 ] ) . data
except Exception :
return None
assert isinstance ( s1 , Glonass . String1 )
eph . p1 = int ( s1 . p1 )
tk = int ( s1 . t_k )
eph . tkDEPRECATED = tk
eph . xVel = float ( s1 . x_vel ) * math . pow ( 2 , - 20 )
eph . xAccel = float ( s1 . x_accel ) * math . pow ( 2 , - 30 )
eph . x = float ( s1 . x ) * math . pow ( 2 , - 11 )
# string 2
try :
s2 = Glonass . from_bytes ( self . caches . glonass_strings [ freq_id ] [ 2 ] ) . data
except Exception :
return None
assert isinstance ( s2 , Glonass . String2 )
eph . svHealth = int ( s2 . b_n >> 2 )
eph . p2 = int ( s2 . p2 )
eph . tb = int ( s2 . t_b )
eph . yVel = float ( s2 . y_vel ) * math . pow ( 2 , - 20 )
eph . yAccel = float ( s2 . y_accel ) * math . pow ( 2 , - 30 )
eph . y = float ( s2 . y ) * math . pow ( 2 , - 11 )
# string 3
try :
s3 = Glonass . from_bytes ( self . caches . glonass_strings [ freq_id ] [ 3 ] ) . data
except Exception :
return None
assert isinstance ( s3 , Glonass . String3 )
eph . p3 = int ( s3 . p3 )
eph . gammaN = float ( s3 . gamma_n ) * math . pow ( 2 , - 40 )
eph . svHealth = int ( eph . svHealth | ( 1 if s3 . l_n else 0 ) )
eph . zVel = float ( s3 . z_vel ) * math . pow ( 2 , - 20 )
eph . zAccel = float ( s3 . z_accel ) * math . pow ( 2 , - 30 )
eph . z = float ( s3 . z ) * math . pow ( 2 , - 11 )
# string 4
try :
s4 = Glonass . from_bytes ( self . caches . glonass_strings [ freq_id ] [ 4 ] ) . data
except Exception :
return None
assert isinstance ( s4 , Glonass . String4 )
current_day = int ( s4 . n_t )
eph . nt = current_day
eph . tauN = float ( s4 . tau_n ) * math . pow ( 2 , - 30 )
eph . deltaTauN = float ( s4 . delta_tau_n ) * math . pow ( 2 , - 30 )
eph . age = int ( s4 . e_n )
eph . p4 = int ( s4 . p4 )
eph . svURA = float ( self . glonass_URA_lookup . get ( int ( s4 . f_t ) , 0.0 ) )
# consistency check: SV slot number
# if it doesn't match, keep going but note mismatch (no logging here)
eph . svType = int ( s4 . m )
# string 5
try :
s5 = Glonass . from_bytes ( self . caches . glonass_strings [ freq_id ] [ 5 ] ) . data
except Exception :
return None
assert isinstance ( s5 , Glonass . String5 )
eph . n4 = int ( s5 . n_4 )
tk_seconds = int ( SECS_IN_HR * ( ( tk >> 7 ) & 0x1F ) + SECS_IN_MIN * ( ( tk >> 1 ) & 0x3F ) + ( tk & 0x1 ) * 30 )
eph . tkSeconds = tk_seconds
self . caches . glonass_strings [ freq_id ] . clear ( )
return ( ' ubloxGnss ' , dat )
def _gen_rxm_rawx ( self , msg : Ubx . RxmRawx ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] :
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
mr = dat . ubloxGnss . init ( ' measurementReport ' )
mr . rcvTow = msg . rcv_tow
mr . gpsWeek = msg . week
mr . leapSeconds = msg . leap_s
mb = mr . init ( ' measurements ' , msg . num_meas )
for i , m in enumerate ( msg . meas ) :
mb [ i ] . svId = m . sv_id
mb [ i ] . pseudorange = m . pr_mes
mb [ i ] . carrierCycles = m . cp_mes
mb [ i ] . doppler = m . do_mes
mb [ i ] . gnssId = int ( m . gnss_id . value )
mb [ i ] . glonassFrequencyIndex = m . freq_id
mb [ i ] . locktime = m . lock_time
mb [ i ] . cno = m . cno
mb [ i ] . pseudorangeStdev = 0.01 * ( math . pow ( 2 , ( m . pr_stdev & 15 ) ) )
mb [ i ] . carrierPhaseStdev = 0.004 * ( m . cp_stdev & 15 )
mb [ i ] . dopplerStdev = 0.002 * ( math . pow ( 2 , ( m . do_stdev & 15 ) ) )
ts = mb [ i ] . init ( ' trackingStatus ' )
trk = m . trk_stat
ts . pseudorangeValid = _bit ( trk , 0 )
ts . carrierPhaseValid = _bit ( trk , 1 )
ts . halfCycleValid = _bit ( trk , 2 )
ts . halfCycleSubtracted = _bit ( trk , 3 )
mr . numMeas = msg . num_meas
rs = mr . init ( ' receiverStatus ' )
rs . leapSecValid = _bit ( msg . rec_stat , 0 )
rs . clkReset = _bit ( msg . rec_stat , 2 )
return ( ' ubloxGnss ' , dat )
def _gen_nav_sat ( self , msg : Ubx . NavSat ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] :
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
sr = dat . ubloxGnss . init ( ' satReport ' )
sr . iTow = msg . itow
svs = sr . init ( ' svs ' , msg . num_svs )
for i , s in enumerate ( msg . svs ) :
svs [ i ] . svId = s . sv_id
svs [ i ] . gnssId = int ( s . gnss_id . value )
svs [ i ] . flagsBitfield = s . flags
svs [ i ] . cno = s . cno
svs [ i ] . elevationDeg = s . elev
svs [ i ] . azimuthDeg = s . azim
svs [ i ] . pseudorangeResidual = s . pr_res * 0.1
return ( ' ubloxGnss ' , dat )
def _gen_mon_hw ( self , msg : Ubx . MonHw ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] :
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
hw = dat . ubloxGnss . init ( ' hwStatus ' )
hw . noisePerMS = msg . noise_per_ms
hw . flags = msg . flags
hw . agcCnt = msg . agc_cnt
hw . aStatus = int ( msg . a_status . value )
hw . aPower = int ( msg . a_power . value )
hw . jamInd = msg . jam_ind
return ( ' ubloxGnss ' , dat )
def _gen_mon_hw2 ( self , msg : Ubx . MonHw2 ) - > tuple [ str , capnp . lib . capnp . _DynamicStructBuilder ] :
dat = messaging . new_message ( ' ubloxGnss ' , valid = True )
hw = dat . ubloxGnss . init ( ' hwStatus2 ' )
hw . ofsI = msg . ofs_i
hw . magI = msg . mag_i
hw . ofsQ = msg . ofs_q
hw . magQ = msg . mag_q
# Map Ubx enum to cereal enum {undefined=0, rom=1, otp=2, configpins=3, flash=4}
cfg_map = {
Ubx . MonHw2 . ConfigSource . rom : 1 ,
Ubx . MonHw2 . ConfigSource . otp : 2 ,
Ubx . MonHw2 . ConfigSource . config_pins : 3 ,
Ubx . MonHw2 . ConfigSource . flash : 4 ,
}
hw . cfgSource = cfg_map . get ( msg . cfg_source , 0 )
hw . lowLevCfg = msg . low_lev_cfg
hw . postStatus = msg . post_status
return ( ' ubloxGnss ' , dat )
def main ( ) :
parser = UbloxMsgParser ( )
pm = messaging . PubMaster ( [ ' ubloxGnss ' , ' gpsLocationExternal ' ] )
sock = messaging . sub_sock ( ' ubloxRaw ' , timeout = 100 , conflate = False )
while True :
msg = messaging . recv_one ( sock )
if msg is None :
continue
data = bytes ( msg . ubloxRaw )
log_time = msg . logMonoTime * 1e-9
frames = parser . framer . add_data ( log_time , data )
for frame in frames :
try :
res = parser . parse_frame ( frame )
except Exception :
continue
if not res :
continue
service , dat = res
pm . send ( service , dat )
if __name__ == ' __main__ ' :
main ( )