Laikad: the basics for ublox msg processing (#24359)
* Add laikad that receives ublox messages and publishes corrected measurements and position fix * types * cleanup * laikad version 1 with uncorrected measurements * push * Fix glonass frequency and delete redundant test * Update after cereal and cleanup * Add test, fix laikad and remove process replay for now * update laika * add hatanaka to packages. Used to decompress orbit data * Fix pippull/24404/head^2
parent
df8f024e19
commit
b64fe6e339
5 changed files with 170 additions and 3 deletions
@ -1 +1 @@ |
||||
Subproject commit 226adc655e1488474468a97ab4a7705aad7e5837 |
||||
Subproject commit 1fbc6780d5184efd5ccf4518a01fe947ffbb4ba0 |
@ -0,0 +1,84 @@ |
||||
#!/usr/bin/env python3 |
||||
from typing import List |
||||
|
||||
from cereal import log, messaging |
||||
from laika import AstroDog |
||||
from laika.helpers import UbloxGnssId |
||||
from laika.raw_gnss import GNSSMeasurement, calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox |
||||
|
||||
|
||||
def correct_and_pos_fix(processed_measurements: List[GNSSMeasurement], dog: AstroDog): |
||||
# pos fix needs more than 5 processed_measurements |
||||
pos_fix = calc_pos_fix(processed_measurements) |
||||
|
||||
if len(pos_fix) == 0: |
||||
return [], [] |
||||
est_pos = pos_fix[0][:3] |
||||
corrected = correct_measurements(processed_measurements, est_pos, dog) |
||||
return calc_pos_fix(corrected), corrected |
||||
|
||||
|
||||
def process_ublox_msg(ublox_msg, dog, ublox_mono_time: int): |
||||
if ublox_msg.which == 'measurementReport': |
||||
report = ublox_msg.measurementReport |
||||
if len(report.measurements) == 0: |
||||
return None |
||||
new_meas = read_raw_ublox(report) |
||||
processed_measurements = process_measurements(new_meas, dog) |
||||
|
||||
corrected = correct_and_pos_fix(processed_measurements, dog) |
||||
pos_fix, _ = corrected |
||||
# todo send corrected messages instead of processed_measurements. Need fix for when having less than 6 measurements |
||||
correct_meas_msgs = [create_measurement_msg(m) for m in processed_measurements] |
||||
# pos fix can be an empty list if not enough correct measurements are available |
||||
if len(pos_fix) > 0: |
||||
corrected_pos = pos_fix[0][:3].tolist() |
||||
else: |
||||
corrected_pos = [0., 0., 0.] |
||||
dat = messaging.new_message('gnssMeasurements') |
||||
dat.gnssMeasurements = { |
||||
"position": corrected_pos, |
||||
"ubloxMonoTime": ublox_mono_time, |
||||
"correctedMeasurements": correct_meas_msgs |
||||
} |
||||
return dat |
||||
|
||||
|
||||
def create_measurement_msg(meas: GNSSMeasurement): |
||||
c = log.GnssMeasurements.CorrectedMeasurement.new_message() |
||||
ublox_gnss_id = meas.ublox_gnss_id |
||||
if ublox_gnss_id is None: |
||||
# todo never happens will fix in later pr |
||||
ublox_gnss_id = UbloxGnssId.GPS |
||||
c.constellationId = ublox_gnss_id.value |
||||
c.svId = int(meas.prn[1:]) |
||||
c.glonassFrequency = meas.glonass_freq if meas.ublox_gnss_id == UbloxGnssId.GLONASS else 0 |
||||
c.pseudorange = float(meas.observables['C1C']) # todo should be observables_final when using corrected measurements |
||||
c.pseudorangeStd = float(meas.observables_std['C1C']) |
||||
c.pseudorangeRate = float(meas.observables['D1C']) # todo should be observables_final when using corrected measurements |
||||
c.pseudorangeRateStd = float(meas.observables_std['D1C']) |
||||
c.satPos = meas.sat_pos_final.tolist() |
||||
c.satVel = meas.sat_vel.tolist() |
||||
return c |
||||
|
||||
|
||||
def main(): |
||||
dog = AstroDog() |
||||
sm = messaging.SubMaster(['ubloxGnss']) |
||||
pm = messaging.PubMaster(['gnssMeasurements']) |
||||
|
||||
while True: |
||||
sm.update() |
||||
|
||||
# Todo if no internet available use latest ephemeris |
||||
if sm.updated['ubloxGnss']: |
||||
ublox_msg = sm['ubloxGnss'] |
||||
msg = process_ublox_msg(ublox_msg, dog, sm.logMonoTime['ubloxGnss']) |
||||
if msg is None: |
||||
msg = messaging.new_message('gnssMeasurements') |
||||
pm.send('gnssMeasurements', msg) |
||||
|
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
@ -0,0 +1,23 @@ |
||||
#!/usr/bin/env python3 |
||||
import unittest |
||||
from datetime import datetime |
||||
|
||||
from laika.helpers import UbloxGnssId |
||||
|
||||
from laika.gps_time import GPSTime |
||||
from laika.raw_gnss import GNSSMeasurement |
||||
from selfdrive.locationd.laikad import create_measurement_msg |
||||
|
||||
|
||||
class TestLaikad(unittest.TestCase): |
||||
|
||||
def test_create_msg_without_errors(self): |
||||
gpstime = GPSTime.from_datetime(datetime.now()) |
||||
meas = GNSSMeasurement('G01', gpstime.week, gpstime.tow, {'C1C': 0., 'D1C': 0.}, {'C1C': 0., 'D1C': 0.}, ublox_gnss_id=UbloxGnssId.GPS) |
||||
msg = create_measurement_msg(meas) |
||||
|
||||
self.assertEqual(msg.constellationId, 'gps') |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
Loading…
Reference in new issue