#!/usr/bin/env python3 from typing import List from cereal import log, messaging from laika import AstroDog from laika.helpers import ConstellationId from laika.raw_gnss import GNSSMeasurement, calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox def correct_and_pos_fix(processed_measurements: List[GNSSMeasurement], dog: AstroDog): # pos fix needs more than 5 processed_measurements pos_fix = calc_pos_fix(processed_measurements) if len(pos_fix) == 0: return [], [] est_pos = pos_fix[0][:3] corrected = correct_measurements(processed_measurements, est_pos, dog) return calc_pos_fix(corrected), corrected def process_ublox_msg(ublox_msg, dog, ublox_mono_time: int): if ublox_msg.which == 'measurementReport': report = ublox_msg.measurementReport if len(report.measurements) == 0: return None new_meas = read_raw_ublox(report) processed_measurements = process_measurements(new_meas, dog) corrected = correct_and_pos_fix(processed_measurements, dog) pos_fix, _ = corrected # todo send corrected messages instead of processed_measurements. Need fix for when having less than 6 measurements correct_meas_msgs = [create_measurement_msg(m) for m in processed_measurements] # pos fix can be an empty list if not enough correct measurements are available if len(pos_fix) > 0: corrected_pos = pos_fix[0][:3].tolist() else: corrected_pos = [0., 0., 0.] dat = messaging.new_message('gnssMeasurements') dat.gnssMeasurements = { "position": corrected_pos, "ubloxMonoTime": ublox_mono_time, "correctedMeasurements": correct_meas_msgs } return dat def create_measurement_msg(meas: GNSSMeasurement): c = log.GnssMeasurements.CorrectedMeasurement.new_message() c.constellationId = meas.constellation_id.value c.svId = int(meas.prn[1:]) c.glonassFrequency = meas.glonass_freq if meas.constellation_id == ConstellationId.GLONASS else 0 c.pseudorange = float(meas.observables['C1C']) # todo should be observables_final when using corrected measurements c.pseudorangeStd = float(meas.observables_std['C1C']) c.pseudorangeRate = float(meas.observables['D1C']) # todo should be observables_final when using corrected measurements c.pseudorangeRateStd = float(meas.observables_std['D1C']) c.satPos = meas.sat_pos_final.tolist() c.satVel = meas.sat_vel.tolist() return c def main(): dog = AstroDog() sm = messaging.SubMaster(['ubloxGnss']) pm = messaging.PubMaster(['gnssMeasurements']) while True: sm.update() # Todo if no internet available use latest ephemeris if sm.updated['ubloxGnss']: ublox_msg = sm['ubloxGnss'] msg = process_ublox_msg(ublox_msg, dog, sm.logMonoTime['ubloxGnss']) if msg is None: msg = messaging.new_message('gnssMeasurements') pm.send('gnssMeasurements', msg) if __name__ == "__main__": main()