diff --git a/SConstruct b/SConstruct index 43ba4ae749..56788e5842 100644 --- a/SConstruct +++ b/SConstruct @@ -19,6 +19,10 @@ Decider('MD5-timestamp') SetOption('num_jobs', max(1, int(os.cpu_count()/2))) +AddOption('--kaitai', + action='store_true', + help='Regenerate kaitai struct parsers') + AddOption('--asan', action='store_true', help='turn on ASAN') diff --git a/pyproject.toml b/pyproject.toml index c795219a02..dc7245e872 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -250,6 +250,7 @@ exclude = [ "teleoprtc_repo", "third_party", "*.ipynb", + "generated", ] lint.flake8-implicit-str-concat.allow-multiline = false diff --git a/system/ubloxd/.gitignore b/system/ubloxd/.gitignore deleted file mode 100644 index 9ab870da89..0000000000 --- a/system/ubloxd/.gitignore +++ /dev/null @@ -1 +0,0 @@ -generated/ diff --git a/system/ubloxd/SConscript b/system/ubloxd/SConscript index 4e5f043c1d..8ee94de311 100644 --- a/system/ubloxd/SConscript +++ b/system/ubloxd/SConscript @@ -1,12 +1,10 @@ Import('env') -current_dir = Dir('./generated/').srcnode().abspath -python_cmd = f"kaitai-struct-compiler --target python --outdir {current_dir} $SOURCES" -env.Command(File('./generated/ubx.py'), 'ubx.ksy', python_cmd) -env.Command(File('./generated/gps.py'), 'gps.ksy', python_cmd) -env.Command(File('./generated/glonass.py'), 'glonass.ksy', python_cmd) - +# TODO: look into this # kaitai issue: https://github.com/kaitai-io/kaitai_struct/issues/910 - -#if GetOption('extras'): -# env.Program("tests/test_glonass_runner", ['tests/test_glonass_runner.cc', 'tests/test_glonass_kaitai.cc', glonass_obj], LIBS=[loc_libs]) +if GetOption('kaitai'): + current_dir = Dir('./generated/').srcnode().abspath + python_cmd = f"kaitai-struct-compiler --target python --outdir {current_dir} $SOURCES" + env.Command(File('./generated/ubx.py'), 'ubx.ksy', python_cmd) + env.Command(File('./generated/gps.py'), 'gps.ksy', python_cmd) + env.Command(File('./generated/glonass.py'), 'glonass.ksy', python_cmd) diff --git a/system/ubloxd/generated/glonass.py b/system/ubloxd/generated/glonass.py new file mode 100644 index 0000000000..63ff0d656c --- /dev/null +++ b/system/ubloxd/generated/glonass.py @@ -0,0 +1,247 @@ +# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild + +import kaitaistruct +from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO + + +if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): + raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) + +class Glonass(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.idle_chip = self._io.read_bits_int_be(1) != 0 + self.string_number = self._io.read_bits_int_be(4) + self._io.align_to_byte() + _on = self.string_number + if _on == 4: + self.data = Glonass.String4(self._io, self, self._root) + elif _on == 1: + self.data = Glonass.String1(self._io, self, self._root) + elif _on == 3: + self.data = Glonass.String3(self._io, self, self._root) + elif _on == 5: + self.data = Glonass.String5(self._io, self, self._root) + elif _on == 2: + self.data = Glonass.String2(self._io, self, self._root) + else: + self.data = Glonass.StringNonImmediate(self._io, self, self._root) + self.hamming_code = self._io.read_bits_int_be(8) + self.pad_1 = self._io.read_bits_int_be(11) + self.superframe_number = self._io.read_bits_int_be(16) + self.pad_2 = self._io.read_bits_int_be(8) + self.frame_number = self._io.read_bits_int_be(8) + + class String4(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.tau_n_sign = self._io.read_bits_int_be(1) != 0 + self.tau_n_value = self._io.read_bits_int_be(21) + self.delta_tau_n_sign = self._io.read_bits_int_be(1) != 0 + self.delta_tau_n_value = self._io.read_bits_int_be(4) + self.e_n = self._io.read_bits_int_be(5) + self.not_used_1 = self._io.read_bits_int_be(14) + self.p4 = self._io.read_bits_int_be(1) != 0 + self.f_t = self._io.read_bits_int_be(4) + self.not_used_2 = self._io.read_bits_int_be(3) + self.n_t = self._io.read_bits_int_be(11) + self.n = self._io.read_bits_int_be(5) + self.m = self._io.read_bits_int_be(2) + + @property + def tau_n(self): + if hasattr(self, '_m_tau_n'): + return self._m_tau_n + + self._m_tau_n = ((self.tau_n_value * -1) if self.tau_n_sign else self.tau_n_value) + return getattr(self, '_m_tau_n', None) + + @property + def delta_tau_n(self): + if hasattr(self, '_m_delta_tau_n'): + return self._m_delta_tau_n + + self._m_delta_tau_n = ((self.delta_tau_n_value * -1) if self.delta_tau_n_sign else self.delta_tau_n_value) + return getattr(self, '_m_delta_tau_n', None) + + + class StringNonImmediate(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.data_1 = self._io.read_bits_int_be(64) + self.data_2 = self._io.read_bits_int_be(8) + + + class String5(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.n_a = self._io.read_bits_int_be(11) + self.tau_c = self._io.read_bits_int_be(32) + self.not_used = self._io.read_bits_int_be(1) != 0 + self.n_4 = self._io.read_bits_int_be(5) + self.tau_gps = self._io.read_bits_int_be(22) + self.l_n = self._io.read_bits_int_be(1) != 0 + + + class String1(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.not_used = self._io.read_bits_int_be(2) + self.p1 = self._io.read_bits_int_be(2) + self.t_k = self._io.read_bits_int_be(12) + self.x_vel_sign = self._io.read_bits_int_be(1) != 0 + self.x_vel_value = self._io.read_bits_int_be(23) + self.x_accel_sign = self._io.read_bits_int_be(1) != 0 + self.x_accel_value = self._io.read_bits_int_be(4) + self.x_sign = self._io.read_bits_int_be(1) != 0 + self.x_value = self._io.read_bits_int_be(26) + + @property + def x_vel(self): + if hasattr(self, '_m_x_vel'): + return self._m_x_vel + + self._m_x_vel = ((self.x_vel_value * -1) if self.x_vel_sign else self.x_vel_value) + return getattr(self, '_m_x_vel', None) + + @property + def x_accel(self): + if hasattr(self, '_m_x_accel'): + return self._m_x_accel + + self._m_x_accel = ((self.x_accel_value * -1) if self.x_accel_sign else self.x_accel_value) + return getattr(self, '_m_x_accel', None) + + @property + def x(self): + if hasattr(self, '_m_x'): + return self._m_x + + self._m_x = ((self.x_value * -1) if self.x_sign else self.x_value) + return getattr(self, '_m_x', None) + + + class String2(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.b_n = self._io.read_bits_int_be(3) + self.p2 = self._io.read_bits_int_be(1) != 0 + self.t_b = self._io.read_bits_int_be(7) + self.not_used = self._io.read_bits_int_be(5) + self.y_vel_sign = self._io.read_bits_int_be(1) != 0 + self.y_vel_value = self._io.read_bits_int_be(23) + self.y_accel_sign = self._io.read_bits_int_be(1) != 0 + self.y_accel_value = self._io.read_bits_int_be(4) + self.y_sign = self._io.read_bits_int_be(1) != 0 + self.y_value = self._io.read_bits_int_be(26) + + @property + def y_vel(self): + if hasattr(self, '_m_y_vel'): + return self._m_y_vel + + self._m_y_vel = ((self.y_vel_value * -1) if self.y_vel_sign else self.y_vel_value) + return getattr(self, '_m_y_vel', None) + + @property + def y_accel(self): + if hasattr(self, '_m_y_accel'): + return self._m_y_accel + + self._m_y_accel = ((self.y_accel_value * -1) if self.y_accel_sign else self.y_accel_value) + return getattr(self, '_m_y_accel', None) + + @property + def y(self): + if hasattr(self, '_m_y'): + return self._m_y + + self._m_y = ((self.y_value * -1) if self.y_sign else self.y_value) + return getattr(self, '_m_y', None) + + + class String3(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.p3 = self._io.read_bits_int_be(1) != 0 + self.gamma_n_sign = self._io.read_bits_int_be(1) != 0 + self.gamma_n_value = self._io.read_bits_int_be(10) + self.not_used = self._io.read_bits_int_be(1) != 0 + self.p = self._io.read_bits_int_be(2) + self.l_n = self._io.read_bits_int_be(1) != 0 + self.z_vel_sign = self._io.read_bits_int_be(1) != 0 + self.z_vel_value = self._io.read_bits_int_be(23) + self.z_accel_sign = self._io.read_bits_int_be(1) != 0 + self.z_accel_value = self._io.read_bits_int_be(4) + self.z_sign = self._io.read_bits_int_be(1) != 0 + self.z_value = self._io.read_bits_int_be(26) + + @property + def gamma_n(self): + if hasattr(self, '_m_gamma_n'): + return self._m_gamma_n + + self._m_gamma_n = ((self.gamma_n_value * -1) if self.gamma_n_sign else self.gamma_n_value) + return getattr(self, '_m_gamma_n', None) + + @property + def z_vel(self): + if hasattr(self, '_m_z_vel'): + return self._m_z_vel + + self._m_z_vel = ((self.z_vel_value * -1) if self.z_vel_sign else self.z_vel_value) + return getattr(self, '_m_z_vel', None) + + @property + def z_accel(self): + if hasattr(self, '_m_z_accel'): + return self._m_z_accel + + self._m_z_accel = ((self.z_accel_value * -1) if self.z_accel_sign else self.z_accel_value) + return getattr(self, '_m_z_accel', None) + + @property + def z(self): + if hasattr(self, '_m_z'): + return self._m_z + + self._m_z = ((self.z_value * -1) if self.z_sign else self.z_value) + return getattr(self, '_m_z', None) + + + diff --git a/system/ubloxd/generated/gps.py b/system/ubloxd/generated/gps.py new file mode 100644 index 0000000000..a999016f3e --- /dev/null +++ b/system/ubloxd/generated/gps.py @@ -0,0 +1,193 @@ +# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild + +import kaitaistruct +from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO + + +if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): + raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) + +class Gps(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.tlm = Gps.Tlm(self._io, self, self._root) + self.how = Gps.How(self._io, self, self._root) + _on = self.how.subframe_id + if _on == 1: + self.body = Gps.Subframe1(self._io, self, self._root) + elif _on == 2: + self.body = Gps.Subframe2(self._io, self, self._root) + elif _on == 3: + self.body = Gps.Subframe3(self._io, self, self._root) + elif _on == 4: + self.body = Gps.Subframe4(self._io, self, self._root) + + class Subframe1(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.week_no = self._io.read_bits_int_be(10) + self.code = self._io.read_bits_int_be(2) + self.sv_accuracy = self._io.read_bits_int_be(4) + self.sv_health = self._io.read_bits_int_be(6) + self.iodc_msb = self._io.read_bits_int_be(2) + self.l2_p_data_flag = self._io.read_bits_int_be(1) != 0 + self.reserved1 = self._io.read_bits_int_be(23) + self.reserved2 = self._io.read_bits_int_be(24) + self.reserved3 = self._io.read_bits_int_be(24) + self.reserved4 = self._io.read_bits_int_be(16) + self._io.align_to_byte() + self.t_gd = self._io.read_s1() + self.iodc_lsb = self._io.read_u1() + self.t_oc = self._io.read_u2be() + self.af_2 = self._io.read_s1() + self.af_1 = self._io.read_s2be() + self.af_0_sign = self._io.read_bits_int_be(1) != 0 + self.af_0_value = self._io.read_bits_int_be(21) + self.reserved5 = self._io.read_bits_int_be(2) + + @property + def af_0(self): + if hasattr(self, '_m_af_0'): + return self._m_af_0 + + self._m_af_0 = ((self.af_0_value - (1 << 21)) if self.af_0_sign else self.af_0_value) + return getattr(self, '_m_af_0', None) + + + class Subframe3(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.c_ic = self._io.read_s2be() + self.omega_0 = self._io.read_s4be() + self.c_is = self._io.read_s2be() + self.i_0 = self._io.read_s4be() + self.c_rc = self._io.read_s2be() + self.omega = self._io.read_s4be() + self.omega_dot_sign = self._io.read_bits_int_be(1) != 0 + self.omega_dot_value = self._io.read_bits_int_be(23) + self._io.align_to_byte() + self.iode = self._io.read_u1() + self.idot_sign = self._io.read_bits_int_be(1) != 0 + self.idot_value = self._io.read_bits_int_be(13) + self.reserved = self._io.read_bits_int_be(2) + + @property + def omega_dot(self): + if hasattr(self, '_m_omega_dot'): + return self._m_omega_dot + + self._m_omega_dot = ((self.omega_dot_value - (1 << 23)) if self.omega_dot_sign else self.omega_dot_value) + return getattr(self, '_m_omega_dot', None) + + @property + def idot(self): + if hasattr(self, '_m_idot'): + return self._m_idot + + self._m_idot = ((self.idot_value - (1 << 13)) if self.idot_sign else self.idot_value) + return getattr(self, '_m_idot', None) + + + class Subframe4(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.data_id = self._io.read_bits_int_be(2) + self.page_id = self._io.read_bits_int_be(6) + self._io.align_to_byte() + _on = self.page_id + if _on == 56: + self.body = Gps.Subframe4.IonosphereData(self._io, self, self._root) + + class IonosphereData(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.a0 = self._io.read_s1() + self.a1 = self._io.read_s1() + self.a2 = self._io.read_s1() + self.a3 = self._io.read_s1() + self.b0 = self._io.read_s1() + self.b1 = self._io.read_s1() + self.b2 = self._io.read_s1() + self.b3 = self._io.read_s1() + + + + class How(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.tow_count = self._io.read_bits_int_be(17) + self.alert = self._io.read_bits_int_be(1) != 0 + self.anti_spoof = self._io.read_bits_int_be(1) != 0 + self.subframe_id = self._io.read_bits_int_be(3) + self.reserved = self._io.read_bits_int_be(2) + + + class Tlm(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.preamble = self._io.read_bytes(1) + if not self.preamble == b"\x8B": + raise kaitaistruct.ValidationNotEqualError(b"\x8B", self.preamble, self._io, u"/types/tlm/seq/0") + self.tlm = self._io.read_bits_int_be(14) + self.integrity_status = self._io.read_bits_int_be(1) != 0 + self.reserved = self._io.read_bits_int_be(1) != 0 + + + class Subframe2(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.iode = self._io.read_u1() + self.c_rs = self._io.read_s2be() + self.delta_n = self._io.read_s2be() + self.m_0 = self._io.read_s4be() + self.c_uc = self._io.read_s2be() + self.e = self._io.read_s4be() + self.c_us = self._io.read_s2be() + self.sqrt_a = self._io.read_u4be() + self.t_oe = self._io.read_u2be() + self.fit_interval_flag = self._io.read_bits_int_be(1) != 0 + self.aoda = self._io.read_bits_int_be(5) + self.reserved = self._io.read_bits_int_be(2) + + + diff --git a/system/ubloxd/generated/ubx.py b/system/ubloxd/generated/ubx.py new file mode 100644 index 0000000000..9946584388 --- /dev/null +++ b/system/ubloxd/generated/ubx.py @@ -0,0 +1,273 @@ +# This is a generated file! Please edit source .ksy file and use kaitai-struct-compiler to rebuild + +import kaitaistruct +from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO +from enum import Enum + + +if getattr(kaitaistruct, 'API_VERSION', (0, 9)) < (0, 9): + raise Exception("Incompatible Kaitai Struct Python API: 0.9 or later is required, but you have %s" % (kaitaistruct.__version__)) + +class Ubx(KaitaiStruct): + + class GnssType(Enum): + gps = 0 + sbas = 1 + galileo = 2 + beidou = 3 + imes = 4 + qzss = 5 + glonass = 6 + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.magic = self._io.read_bytes(2) + if not self.magic == b"\xB5\x62": + raise kaitaistruct.ValidationNotEqualError(b"\xB5\x62", self.magic, self._io, u"/seq/0") + self.msg_type = self._io.read_u2be() + self.length = self._io.read_u2le() + _on = self.msg_type + if _on == 2569: + self.body = Ubx.MonHw(self._io, self, self._root) + elif _on == 533: + self.body = Ubx.RxmRawx(self._io, self, self._root) + elif _on == 531: + self.body = Ubx.RxmSfrbx(self._io, self, self._root) + elif _on == 309: + self.body = Ubx.NavSat(self._io, self, self._root) + elif _on == 2571: + self.body = Ubx.MonHw2(self._io, self, self._root) + elif _on == 263: + self.body = Ubx.NavPvt(self._io, self, self._root) + + class RxmRawx(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.rcv_tow = self._io.read_f8le() + self.week = self._io.read_u2le() + self.leap_s = self._io.read_s1() + self.num_meas = self._io.read_u1() + self.rec_stat = self._io.read_u1() + self.reserved1 = self._io.read_bytes(3) + self._raw_meas = [] + self.meas = [] + for i in range(self.num_meas): + self._raw_meas.append(self._io.read_bytes(32)) + _io__raw_meas = KaitaiStream(BytesIO(self._raw_meas[i])) + self.meas.append(Ubx.RxmRawx.Measurement(_io__raw_meas, self, self._root)) + + + class Measurement(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.pr_mes = self._io.read_f8le() + self.cp_mes = self._io.read_f8le() + self.do_mes = self._io.read_f4le() + self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) + self.sv_id = self._io.read_u1() + self.reserved2 = self._io.read_bytes(1) + self.freq_id = self._io.read_u1() + self.lock_time = self._io.read_u2le() + self.cno = self._io.read_u1() + self.pr_stdev = self._io.read_u1() + self.cp_stdev = self._io.read_u1() + self.do_stdev = self._io.read_u1() + self.trk_stat = self._io.read_u1() + self.reserved3 = self._io.read_bytes(1) + + + + class RxmSfrbx(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) + self.sv_id = self._io.read_u1() + self.reserved1 = self._io.read_bytes(1) + self.freq_id = self._io.read_u1() + self.num_words = self._io.read_u1() + self.reserved2 = self._io.read_bytes(1) + self.version = self._io.read_u1() + self.reserved3 = self._io.read_bytes(1) + self.body = [] + for i in range(self.num_words): + self.body.append(self._io.read_u4le()) + + + + class NavSat(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.itow = self._io.read_u4le() + self.version = self._io.read_u1() + self.num_svs = self._io.read_u1() + self.reserved = self._io.read_bytes(2) + self._raw_svs = [] + self.svs = [] + for i in range(self.num_svs): + self._raw_svs.append(self._io.read_bytes(12)) + _io__raw_svs = KaitaiStream(BytesIO(self._raw_svs[i])) + self.svs.append(Ubx.NavSat.Nav(_io__raw_svs, self, self._root)) + + + class Nav(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.gnss_id = KaitaiStream.resolve_enum(Ubx.GnssType, self._io.read_u1()) + self.sv_id = self._io.read_u1() + self.cno = self._io.read_u1() + self.elev = self._io.read_s1() + self.azim = self._io.read_s2le() + self.pr_res = self._io.read_s2le() + self.flags = self._io.read_u4le() + + + + class NavPvt(KaitaiStruct): + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.i_tow = self._io.read_u4le() + self.year = self._io.read_u2le() + self.month = self._io.read_u1() + self.day = self._io.read_u1() + self.hour = self._io.read_u1() + self.min = self._io.read_u1() + self.sec = self._io.read_u1() + self.valid = self._io.read_u1() + self.t_acc = self._io.read_u4le() + self.nano = self._io.read_s4le() + self.fix_type = self._io.read_u1() + self.flags = self._io.read_u1() + self.flags2 = self._io.read_u1() + self.num_sv = self._io.read_u1() + self.lon = self._io.read_s4le() + self.lat = self._io.read_s4le() + self.height = self._io.read_s4le() + self.h_msl = self._io.read_s4le() + self.h_acc = self._io.read_u4le() + self.v_acc = self._io.read_u4le() + self.vel_n = self._io.read_s4le() + self.vel_e = self._io.read_s4le() + self.vel_d = self._io.read_s4le() + self.g_speed = self._io.read_s4le() + self.head_mot = self._io.read_s4le() + self.s_acc = self._io.read_s4le() + self.head_acc = self._io.read_u4le() + self.p_dop = self._io.read_u2le() + self.flags3 = self._io.read_u1() + self.reserved1 = self._io.read_bytes(5) + self.head_veh = self._io.read_s4le() + self.mag_dec = self._io.read_s2le() + self.mag_acc = self._io.read_u2le() + + + class MonHw2(KaitaiStruct): + + class ConfigSource(Enum): + flash = 102 + otp = 111 + config_pins = 112 + rom = 113 + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.ofs_i = self._io.read_s1() + self.mag_i = self._io.read_u1() + self.ofs_q = self._io.read_s1() + self.mag_q = self._io.read_u1() + self.cfg_source = KaitaiStream.resolve_enum(Ubx.MonHw2.ConfigSource, self._io.read_u1()) + self.reserved1 = self._io.read_bytes(3) + self.low_lev_cfg = self._io.read_u4le() + self.reserved2 = self._io.read_bytes(8) + self.post_status = self._io.read_u4le() + self.reserved3 = self._io.read_bytes(4) + + + class MonHw(KaitaiStruct): + + class AntennaStatus(Enum): + init = 0 + dontknow = 1 + ok = 2 + short = 3 + open = 4 + + class AntennaPower(Enum): + false = 0 + true = 1 + dontknow = 2 + def __init__(self, _io, _parent=None, _root=None): + self._io = _io + self._parent = _parent + self._root = _root if _root else self + self._read() + + def _read(self): + self.pin_sel = self._io.read_u4le() + self.pin_bank = self._io.read_u4le() + self.pin_dir = self._io.read_u4le() + self.pin_val = self._io.read_u4le() + self.noise_per_ms = self._io.read_u2le() + self.agc_cnt = self._io.read_u2le() + self.a_status = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaStatus, self._io.read_u1()) + self.a_power = KaitaiStream.resolve_enum(Ubx.MonHw.AntennaPower, self._io.read_u1()) + self.flags = self._io.read_u1() + self.reserved1 = self._io.read_bytes(1) + self.used_mask = self._io.read_u4le() + self.vp = self._io.read_bytes(17) + self.jam_ind = self._io.read_u1() + self.reserved2 = self._io.read_bytes(2) + self.pin_irq = self._io.read_u4le() + self.pull_h = self._io.read_u4le() + self.pull_l = self._io.read_u4le() + + + @property + def checksum(self): + if hasattr(self, '_m_checksum'): + return self._m_checksum + + _pos = self._io.pos() + self._io.seek((self.length + 6)) + self._m_checksum = self._io.read_u2le() + self._io.seek(_pos) + return getattr(self, '_m_checksum', None) + + diff --git a/system/ubloxd/ubloxd.py b/system/ubloxd/ubloxd.py index aa9cc8f98e..d193e3c157 100755 --- a/system/ubloxd/ubloxd.py +++ b/system/ubloxd/ubloxd.py @@ -217,7 +217,7 @@ class UBXMessageProcessor: def __init__(self): pass - def process_nav_pvt(self, msg: UBXMessage) -> log.Event | None: + def process_nav_pvt(self, msg: UBXMessage): """Process NAV-PVT (Position Velocity Time) message""" if len(msg.payload) < 92: cloudlog.warning("NAV-PVT message too short") @@ -265,7 +265,7 @@ class UBXMessageProcessor: return evt - def process_rxm_rawx(self, msg: UBXMessage) -> log.Event | None: + def process_rxm_rawx(self, msg: UBXMessage): """Process RXM-RAWX (Raw measurement data) message""" if len(msg.payload) < 16: return None @@ -289,7 +289,7 @@ class UBXMessageProcessor: rs.clkReset = bool(recStat & 0x04) return evt - def process_rxm_sfrbx(self, msg: UBXMessage) -> log.Event | None: + def process_rxm_sfrbx(self, msg: UBXMessage): """Process RXM-SFRBX (Subframe buffer) message""" # Create minimal valid ubloxGnss message with ephemeris field evt = messaging.new_message('ubloxGnss', valid=True) @@ -297,7 +297,7 @@ class UBXMessageProcessor: eph.svId = 1 # Dummy value return evt - def process_mon_hw(self, msg: UBXMessage) -> log.Event | None: + def process_mon_hw(self, msg: UBXMessage): """Process MON-HW (Hardware status) message""" # Create minimal valid ubloxGnss message with hwStatus field evt = messaging.new_message('ubloxGnss', valid=True) @@ -305,7 +305,7 @@ class UBXMessageProcessor: hw.noisePerMS = 0 return evt - def process_mon_hw2(self, msg: UBXMessage) -> log.Event | None: + def process_mon_hw2(self, msg: UBXMessage): """Process MON-HW2 (Extended hardware status) message""" # Create minimal valid ubloxGnss message with hwStatus2 field evt = messaging.new_message('ubloxGnss', valid=True) @@ -313,7 +313,7 @@ class UBXMessageProcessor: hw2.ofsI = 0 return evt - def process_nav_sat(self, msg: UBXMessage) -> log.Event | None: + def process_nav_sat(self, msg: UBXMessage): """Process NAV-SAT (Satellite status) message""" # Create minimal valid ubloxGnss message with satReport field evt = messaging.new_message('ubloxGnss', valid=True)