You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
85 lines
3.0 KiB
85 lines
3.0 KiB
3 years ago
|
#!/usr/bin/env python3
|
||
|
from typing import List
|
||
|
|
||
|
from cereal import log, messaging
|
||
|
from laika import AstroDog
|
||
|
from laika.helpers import UbloxGnssId
|
||
|
from laika.raw_gnss import GNSSMeasurement, calc_pos_fix, correct_measurements, process_measurements, read_raw_ublox
|
||
|
|
||
|
|
||
|
def correct_and_pos_fix(processed_measurements: List[GNSSMeasurement], dog: AstroDog):
|
||
|
# pos fix needs more than 5 processed_measurements
|
||
|
pos_fix = calc_pos_fix(processed_measurements)
|
||
|
|
||
|
if len(pos_fix) == 0:
|
||
|
return [], []
|
||
|
est_pos = pos_fix[0][:3]
|
||
|
corrected = correct_measurements(processed_measurements, est_pos, dog)
|
||
|
return calc_pos_fix(corrected), corrected
|
||
|
|
||
|
|
||
|
def process_ublox_msg(ublox_msg, dog, ublox_mono_time: int):
|
||
|
if ublox_msg.which == 'measurementReport':
|
||
|
report = ublox_msg.measurementReport
|
||
|
if len(report.measurements) == 0:
|
||
|
return None
|
||
|
new_meas = read_raw_ublox(report)
|
||
|
processed_measurements = process_measurements(new_meas, dog)
|
||
|
|
||
|
corrected = correct_and_pos_fix(processed_measurements, dog)
|
||
|
pos_fix, _ = corrected
|
||
|
# todo send corrected messages instead of processed_measurements. Need fix for when having less than 6 measurements
|
||
|
correct_meas_msgs = [create_measurement_msg(m) for m in processed_measurements]
|
||
|
# pos fix can be an empty list if not enough correct measurements are available
|
||
|
if len(pos_fix) > 0:
|
||
|
corrected_pos = pos_fix[0][:3].tolist()
|
||
|
else:
|
||
|
corrected_pos = [0., 0., 0.]
|
||
|
dat = messaging.new_message('gnssMeasurements')
|
||
|
dat.gnssMeasurements = {
|
||
|
"position": corrected_pos,
|
||
|
"ubloxMonoTime": ublox_mono_time,
|
||
|
"correctedMeasurements": correct_meas_msgs
|
||
|
}
|
||
|
return dat
|
||
|
|
||
|
|
||
|
def create_measurement_msg(meas: GNSSMeasurement):
|
||
|
c = log.GnssMeasurements.CorrectedMeasurement.new_message()
|
||
|
ublox_gnss_id = meas.ublox_gnss_id
|
||
|
if ublox_gnss_id is None:
|
||
|
# todo never happens will fix in later pr
|
||
|
ublox_gnss_id = UbloxGnssId.GPS
|
||
|
c.constellationId = ublox_gnss_id.value
|
||
|
c.svId = int(meas.prn[1:])
|
||
|
c.glonassFrequency = meas.glonass_freq if meas.ublox_gnss_id == UbloxGnssId.GLONASS else 0
|
||
|
c.pseudorange = float(meas.observables['C1C']) # todo should be observables_final when using corrected measurements
|
||
|
c.pseudorangeStd = float(meas.observables_std['C1C'])
|
||
|
c.pseudorangeRate = float(meas.observables['D1C']) # todo should be observables_final when using corrected measurements
|
||
|
c.pseudorangeRateStd = float(meas.observables_std['D1C'])
|
||
|
c.satPos = meas.sat_pos_final.tolist()
|
||
|
c.satVel = meas.sat_vel.tolist()
|
||
|
return c
|
||
|
|
||
|
|
||
|
def main():
|
||
|
dog = AstroDog()
|
||
|
sm = messaging.SubMaster(['ubloxGnss'])
|
||
|
pm = messaging.PubMaster(['gnssMeasurements'])
|
||
|
|
||
|
while True:
|
||
|
sm.update()
|
||
|
|
||
|
# Todo if no internet available use latest ephemeris
|
||
|
if sm.updated['ubloxGnss']:
|
||
|
ublox_msg = sm['ubloxGnss']
|
||
|
msg = process_ublox_msg(ublox_msg, dog, sm.logMonoTime['ubloxGnss'])
|
||
|
if msg is None:
|
||
|
msg = messaging.new_message('gnssMeasurements')
|
||
|
pm.send('gnssMeasurements', msg)
|
||
|
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|