You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							469 lines
						
					
					
						
							20 KiB
						
					
					
				
			
		
		
	
	
							469 lines
						
					
					
						
							20 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import math
 | 
						|
import os
 | 
						|
import time
 | 
						|
import shutil
 | 
						|
from collections import defaultdict
 | 
						|
from concurrent.futures import Future, ProcessPoolExecutor
 | 
						|
from enum import IntEnum
 | 
						|
from typing import List, Optional, Dict, Any
 | 
						|
 | 
						|
import numpy as np
 | 
						|
 | 
						|
from cereal import log, messaging
 | 
						|
from openpilot.common.params import Params, put_nonblocking
 | 
						|
from laika import AstroDog
 | 
						|
from laika.constants import SECS_IN_HR, SECS_IN_MIN
 | 
						|
from laika.downloader import DownloadFailed
 | 
						|
from laika.ephemeris import EphemerisType, GPSEphemeris, GLONASSEphemeris, ephemeris_structs, parse_qcom_ephem
 | 
						|
from laika.gps_time import GPSTime
 | 
						|
from laika.helpers import ConstellationId, get_sv_id
 | 
						|
from laika.raw_gnss import GNSSMeasurement, correct_measurements, process_measurements, read_raw_ublox
 | 
						|
from laika.raw_gnss import gps_time_from_qcom_report, get_measurements_from_qcom_reports
 | 
						|
from laika.opt import calc_pos_fix, get_posfix_sympy_fun, calc_vel_fix, get_velfix_sympy_func
 | 
						|
from openpilot.selfdrive.locationd.models.constants import GENERATED_DIR, ObservationKind
 | 
						|
from openpilot.selfdrive.locationd.models.gnss_kf import GNSSKalman
 | 
						|
from openpilot.selfdrive.locationd.models.gnss_kf import States as GStates
 | 
						|
from openpilot.system.swaglog import cloudlog
 | 
						|
 | 
						|
MAX_TIME_GAP = 10
 | 
						|
EPHEMERIS_CACHE = 'LaikadEphemerisV3'
 | 
						|
DOWNLOADS_CACHE_FOLDER = "/tmp/comma_download_cache/"
 | 
						|
CACHE_VERSION = 0.2
 | 
						|
POS_FIX_RESIDUAL_THRESHOLD = 100.0
 | 
						|
 | 
						|
 | 
						|
class LogEphemerisType(IntEnum):
 | 
						|
  nav = 0
 | 
						|
  nasaUltraRapid = 1
 | 
						|
  glonassIacUltraRapid = 2
 | 
						|
  qcom = 3
 | 
						|
 | 
						|
class EphemerisSource(IntEnum):
 | 
						|
  gnssChip = 0
 | 
						|
  internet = 1
 | 
						|
  cache = 2
 | 
						|
  unknown = 3
 | 
						|
 | 
						|
def get_log_eph_type(ephem):
 | 
						|
  if ephem.eph_type == EphemerisType.NAV:
 | 
						|
    source_type = LogEphemerisType.nav
 | 
						|
  elif ephem.eph_type == EphemerisType.QCOM_POLY:
 | 
						|
    source_type = LogEphemerisType.qcom
 | 
						|
  else:
 | 
						|
    assert ephem.file_epoch is not None
 | 
						|
    file_src = ephem.file_source
 | 
						|
    if file_src == 'igu':  # example nasa: '2214/igu22144_00.sp3.Z'
 | 
						|
      source_type = LogEphemerisType.nasaUltraRapid
 | 
						|
    elif file_src == 'Sta':  # example nasa: '22166/ultra/Stark_1D_22061518.sp3'
 | 
						|
      source_type = LogEphemerisType.glonassIacUltraRapid
 | 
						|
    else:
 | 
						|
      raise Exception(f"Didn't expect file source {file_src}")
 | 
						|
  return source_type
 | 
						|
 | 
						|
def get_log_eph_source(ephem):
 | 
						|
  if ephem.file_name == 'qcom' or ephem.file_name == 'ublox':
 | 
						|
    source = EphemerisSource.gnssChip
 | 
						|
  elif ephem.file_name == EPHEMERIS_CACHE:
 | 
						|
    source = EphemerisSource.cache
 | 
						|
  else:
 | 
						|
    source = EphemerisSource.internet
 | 
						|
  return source
 | 
						|
 | 
						|
 | 
						|
class Laikad:
 | 
						|
  def __init__(self, valid_const=(ConstellationId.GPS, ConstellationId.GLONASS), auto_fetch_navs=True, auto_update=False,
 | 
						|
               valid_ephem_types=(EphemerisType.NAV, EphemerisType.QCOM_POLY),
 | 
						|
               save_ephemeris=False, use_qcom=False):
 | 
						|
    """
 | 
						|
    valid_const: GNSS constellation which can be used
 | 
						|
    auto_fetch_navs: If true fetch navs from internet when needed
 | 
						|
    auto_update: If true download AstroDog will download all files needed. This can be ephemeris or correction data like ionosphere.
 | 
						|
    valid_ephem_types: Valid ephemeris types to be used by AstroDog
 | 
						|
    save_ephemeris: If true saves and loads nav and orbit ephemeris to cache.
 | 
						|
    """
 | 
						|
    self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types,
 | 
						|
                              clear_old_ephemeris=True, cache_dir=DOWNLOADS_CACHE_FOLDER)
 | 
						|
    self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom)
 | 
						|
 | 
						|
    self.auto_fetch_navs = auto_fetch_navs
 | 
						|
    self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None
 | 
						|
    self.orbit_fetch_future: Optional[Future] = None
 | 
						|
 | 
						|
    self.last_report_time = GPSTime(0, 0)
 | 
						|
    self.last_fetch_navs_t = GPSTime(0, 0)
 | 
						|
    self.last_cached_t = GPSTime(0, 0)
 | 
						|
    self.save_ephemeris = save_ephemeris
 | 
						|
    self.load_cache()
 | 
						|
 | 
						|
    self.posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)}
 | 
						|
    self.velfix_function = get_velfix_sympy_func()
 | 
						|
    self.last_fix_pos = None
 | 
						|
    self.last_fix_t = None
 | 
						|
    self.use_qcom = use_qcom
 | 
						|
    self.first_log_time = None
 | 
						|
    self.ttff = -1
 | 
						|
    self.measurement_lag = 0.630 if self.use_qcom else 0.095
 | 
						|
 | 
						|
    # qcom specific stuff
 | 
						|
    self.qcom_reports_received = 4
 | 
						|
    self.qcom_reports = []
 | 
						|
 | 
						|
  def load_cache(self):
 | 
						|
    if not self.save_ephemeris:
 | 
						|
      return
 | 
						|
 | 
						|
    cache_bytes = Params().get(EPHEMERIS_CACHE)
 | 
						|
    if not cache_bytes:
 | 
						|
      return
 | 
						|
 | 
						|
    nav_dict = {}
 | 
						|
    try:
 | 
						|
      with ephemeris_structs.EphemerisCache.from_bytes(cache_bytes) as ephem_cache:
 | 
						|
        glonass_navs = [GLONASSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.glonassEphemerides]
 | 
						|
        gps_navs = [GPSEphemeris(data_struct, file_name=EPHEMERIS_CACHE) for data_struct in ephem_cache.gpsEphemerides]
 | 
						|
      for e in sum([glonass_navs, gps_navs], []):
 | 
						|
        if e.prn not in nav_dict:
 | 
						|
          nav_dict[e.prn] = []
 | 
						|
        nav_dict[e.prn].append(e)
 | 
						|
      self.astro_dog.add_navs(nav_dict)
 | 
						|
    except Exception:
 | 
						|
      cloudlog.exception("Error parsing cache")
 | 
						|
    cloudlog.debug(
 | 
						|
      f"Loaded navs ({sum([len(nav_dict[prn]) for prn in nav_dict.keys()])}). Unique orbit and nav sats: {list(nav_dict.keys())} ")
 | 
						|
 | 
						|
  def cache_ephemeris(self):
 | 
						|
 | 
						|
    if self.save_ephemeris and (self.last_report_time - self.last_cached_t > SECS_IN_MIN):
 | 
						|
      nav_list: List = sum([v for k,v in self.astro_dog.navs.items()], [])
 | 
						|
      #TODO this only saves currently valid ephems, when we download future ephems we should save them too
 | 
						|
      valid_navs = [e for e in nav_list if e.valid(self.last_report_time)]
 | 
						|
      if len(valid_navs) > 0:
 | 
						|
        ephem_cache = ephemeris_structs.EphemerisCache(glonassEphemerides=[e.data for e in valid_navs if e.prn[0]=='R'],
 | 
						|
                                                       gpsEphemerides=[e.data for e in valid_navs if e.prn[0]=='G'])
 | 
						|
        put_nonblocking(EPHEMERIS_CACHE, ephem_cache.to_bytes())
 | 
						|
        cloudlog.debug("Cache saved")
 | 
						|
      self.last_cached_t = self.last_report_time
 | 
						|
 | 
						|
  def create_ephem_statuses(self):
 | 
						|
    ephemeris_statuses = []
 | 
						|
    eph_list: List = sum([v for k,v in self.astro_dog.navs.items()], []) + sum([v for k,v in self.astro_dog.qcom_polys.items()], [])
 | 
						|
    for eph in eph_list:
 | 
						|
      status = log.GnssMeasurements.EphemerisStatus.new_message()
 | 
						|
      status.constellationId = ConstellationId.from_rinex_char(eph.prn[0]).value
 | 
						|
      status.svId = get_sv_id(eph.prn)
 | 
						|
      status.type = get_log_eph_type(eph).value
 | 
						|
      status.source = get_log_eph_source(eph).value
 | 
						|
      status.tow = eph.epoch.tow
 | 
						|
      status.gpsWeek = eph.epoch.week
 | 
						|
      ephemeris_statuses.append(status)
 | 
						|
    return ephemeris_statuses
 | 
						|
 | 
						|
 | 
						|
  def get_lsq_fix(self, t, measurements):
 | 
						|
    if self.last_fix_t is None or abs(self.last_fix_t - t) > 0:
 | 
						|
      min_measurements = 5 if any(p.constellation_id == ConstellationId.GLONASS for p in measurements) else 4
 | 
						|
 | 
						|
      position_solution, pr_residuals, pos_std = calc_pos_fix(measurements, self.posfix_functions, min_measurements=min_measurements)
 | 
						|
      if len(position_solution) < 3:
 | 
						|
        return None
 | 
						|
      position_estimate = position_solution[:3]
 | 
						|
      position_std = pos_std[:3]
 | 
						|
 | 
						|
      velocity_solution, prr_residuals, vel_std = calc_vel_fix(measurements, position_estimate, self.velfix_function, min_measurements=min_measurements)
 | 
						|
      if len(velocity_solution) < 3:
 | 
						|
        return None
 | 
						|
      velocity_estimate = velocity_solution[:3]
 | 
						|
      velocity_std = vel_std[:3]
 | 
						|
 | 
						|
      return position_estimate, position_std, velocity_estimate, velocity_std
 | 
						|
 | 
						|
  def is_good_report(self, gnss_msg):
 | 
						|
    if gnss_msg.which() in ['drMeasurementReport', 'measurementReport'] and self.use_qcom:
 | 
						|
      # TODO: Understand and use remaining unknown constellations
 | 
						|
      try:
 | 
						|
        if gnss_msg.which() == 'drMeasurementReport':
 | 
						|
          constellation_id = ConstellationId.from_qcom_source(gnss_msg.drMeasurementReport.source)
 | 
						|
        else:
 | 
						|
          constellation_id = ConstellationId.from_qcom_source(gnss_msg.measurementReport.source)
 | 
						|
        good_constellation = constellation_id in [ConstellationId.GPS, ConstellationId.SBAS, ConstellationId.GLONASS]
 | 
						|
        report_time = gps_time_from_qcom_report(gnss_msg)
 | 
						|
      except NotImplementedError:
 | 
						|
        return False
 | 
						|
      # Garbage timestamps with week > 32767 are sometimes sent by module.
 | 
						|
      # This is an issue with gpsTime and GLONASS time.
 | 
						|
      good_week = report_time.week < np.iinfo(np.int16).max
 | 
						|
      return good_constellation and good_week
 | 
						|
    elif gnss_msg.which() == 'measurementReport' and not self.use_qcom:
 | 
						|
      return True
 | 
						|
    else:
 | 
						|
      return False
 | 
						|
 | 
						|
  def read_report(self, gnss_msg):
 | 
						|
    if self.use_qcom:
 | 
						|
      # QCOM reports are per constellation, so we need to aggregate them
 | 
						|
      # Additionally, the pseudoranges are broken in the measurementReports
 | 
						|
      # and the doppler filteredSpeed is broken in the drMeasurementReports
 | 
						|
      report_time = gps_time_from_qcom_report(gnss_msg)
 | 
						|
      if report_time - self.last_report_time == 0:
 | 
						|
        self.qcom_reports.append(gnss_msg)
 | 
						|
        self.last_report_time = report_time
 | 
						|
      elif report_time - self.last_report_time > 0:
 | 
						|
        self.qcom_reports_received = max(1, len(self.qcom_reports))
 | 
						|
        self.qcom_reports = [gnss_msg]
 | 
						|
        self.last_report_time = report_time
 | 
						|
      else:
 | 
						|
        # Sometimes DR reports get sent one iteration late (1second), they need to be ignored
 | 
						|
        cloudlog.warning(f"Received report with time {report_time} before last report time {self.last_report_time}")
 | 
						|
 | 
						|
      if len(self.qcom_reports) == self.qcom_reports_received:
 | 
						|
        new_meas = get_measurements_from_qcom_reports(self.qcom_reports)
 | 
						|
      else:
 | 
						|
        new_meas = []
 | 
						|
    else:
 | 
						|
      report = gnss_msg.measurementReport
 | 
						|
      self.last_report_time = GPSTime(report.gpsWeek, report.rcvTow)
 | 
						|
      new_meas = read_raw_ublox(report)
 | 
						|
    return self.last_report_time, new_meas
 | 
						|
 | 
						|
  def is_ephemeris(self, gnss_msg):
 | 
						|
    if self.use_qcom:
 | 
						|
      return gnss_msg.which() == 'drSvPoly'
 | 
						|
    else:
 | 
						|
      return gnss_msg.which() in ('ephemeris', 'glonassEphemeris')
 | 
						|
 | 
						|
  def read_ephemeris(self, gnss_msg):
 | 
						|
    if self.use_qcom:
 | 
						|
      try:
 | 
						|
        ephem = parse_qcom_ephem(gnss_msg.drSvPoly)
 | 
						|
        self.astro_dog.add_qcom_polys({ephem.prn: [ephem]})
 | 
						|
      except Exception:
 | 
						|
        cloudlog.exception("Error parsing qcom svPoly ephemeris from qcom module")
 | 
						|
        return
 | 
						|
 | 
						|
    else:
 | 
						|
      if gnss_msg.which() == 'ephemeris':
 | 
						|
        data_struct = ephemeris_structs.Ephemeris.new_message(**gnss_msg.ephemeris.to_dict())
 | 
						|
        try:
 | 
						|
          ephem = GPSEphemeris(data_struct, file_name='ublox')
 | 
						|
        except Exception:
 | 
						|
          cloudlog.exception("Error parsing GPS ephemeris from ublox")
 | 
						|
          return
 | 
						|
      elif gnss_msg.which() == 'glonassEphemeris':
 | 
						|
        data_struct = ephemeris_structs.GlonassEphemeris.new_message(**gnss_msg.glonassEphemeris.to_dict())
 | 
						|
        try:
 | 
						|
          ephem = GLONASSEphemeris(data_struct, file_name='ublox')
 | 
						|
        except Exception:
 | 
						|
          cloudlog.exception("Error parsing GLONASS ephemeris from ublox")
 | 
						|
          return
 | 
						|
      else:
 | 
						|
        cloudlog.error(f"Unsupported ephemeris type: {gnss_msg.which()}")
 | 
						|
        return
 | 
						|
      self.astro_dog.add_navs({ephem.prn: [ephem]})
 | 
						|
    self.cache_ephemeris()
 | 
						|
 | 
						|
  def process_report(self, new_meas, t):
 | 
						|
    # Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites
 | 
						|
    new_meas = [m for m in new_meas if 1e7 < m.observables['C1C'] < 3e7]
 | 
						|
    processed_measurements = process_measurements(new_meas, self.astro_dog)
 | 
						|
    if self.last_fix_pos is not None:
 | 
						|
      est_pos = self.last_fix_pos
 | 
						|
      correct_delay = True
 | 
						|
    else:
 | 
						|
      est_pos = self.gnss_kf.x[GStates.ECEF_POS].tolist()
 | 
						|
      correct_delay = False
 | 
						|
    corrected_measurements = correct_measurements(processed_measurements, est_pos, self.astro_dog, correct_delay=correct_delay)
 | 
						|
    # If many measurements weren't corrected, position may be garbage, so reset
 | 
						|
    if len(processed_measurements) >= 8 and len(corrected_measurements) < 5:
 | 
						|
      cloudlog.error("Didn't correct enough measurements, resetting estimate position")
 | 
						|
      self.last_fix_pos = None
 | 
						|
      self.last_fix_t = None
 | 
						|
    return corrected_measurements
 | 
						|
 | 
						|
  def calc_fix(self, t, measurements):
 | 
						|
    instant_fix = self.get_lsq_fix(t, measurements)
 | 
						|
    if instant_fix is None:
 | 
						|
      return None
 | 
						|
    else:
 | 
						|
      position_estimate, position_std, velocity_estimate, velocity_std = instant_fix
 | 
						|
      self.last_fix_t = t
 | 
						|
      self.last_fix_pos = position_estimate
 | 
						|
      self.lat_fix_pos_std = position_std
 | 
						|
      return position_estimate, position_std, velocity_estimate, velocity_std
 | 
						|
 | 
						|
  def process_gnss_msg(self, gnss_msg, gnss_mono_time: int, block=False):
 | 
						|
    out_msg = messaging.new_message("gnssMeasurements")
 | 
						|
    t = gnss_mono_time * 1e-9
 | 
						|
    msg_dict: Dict[str, Any] = {"measTime": gnss_mono_time - int(1e9 * self.measurement_lag)}
 | 
						|
    if self.first_log_time is None:
 | 
						|
      self.first_log_time = 1e-9 * gnss_mono_time
 | 
						|
    if self.is_ephemeris(gnss_msg):
 | 
						|
      self.read_ephemeris(gnss_msg)
 | 
						|
    elif self.is_good_report(gnss_msg):
 | 
						|
      report_t, new_meas = self.read_report(gnss_msg)
 | 
						|
      if report_t.week > 0:
 | 
						|
        if self.auto_fetch_navs:
 | 
						|
          self.fetch_navs(report_t, block)
 | 
						|
 | 
						|
      corrected_measurements = self.process_report(new_meas, t)
 | 
						|
      msg_dict['correctedMeasurements'] = [create_measurement_msg(m) for m in corrected_measurements]
 | 
						|
 | 
						|
      fix = self.calc_fix(t, corrected_measurements)
 | 
						|
      measurement_msg = log.LiveLocationKalman.Measurement.new_message
 | 
						|
      if fix is not None:
 | 
						|
        position_estimate, position_std, velocity_estimate, velocity_std = fix
 | 
						|
        if self.ttff <= 0:
 | 
						|
          self.ttff = max(1e-3, t - self.first_log_time)
 | 
						|
        msg_dict["positionECEF"] = measurement_msg(value=position_estimate, std=position_std.tolist(), valid=bool(self.last_fix_t == t))
 | 
						|
        msg_dict["velocityECEF"] = measurement_msg(value=velocity_estimate, std=velocity_std.tolist(), valid=bool(self.last_fix_t == t))
 | 
						|
 | 
						|
      self.update_localizer(self.last_fix_pos, t, corrected_measurements)
 | 
						|
      P_diag = self.gnss_kf.P.diagonal()
 | 
						|
      kf_valid = all(self.kf_valid(t))
 | 
						|
      msg_dict["kalmanPositionECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_POS].tolist(),
 | 
						|
                                        std=np.sqrt(P_diag[GStates.ECEF_POS]).tolist(),
 | 
						|
                                        valid=kf_valid)
 | 
						|
      msg_dict["kalmanVelocityECEF"] = measurement_msg(value=self.gnss_kf.x[GStates.ECEF_VELOCITY].tolist(),
 | 
						|
                                        std=np.sqrt(P_diag[GStates.ECEF_VELOCITY]).tolist(),
 | 
						|
                                        valid=kf_valid)
 | 
						|
 | 
						|
    msg_dict['gpsWeek'] = self.last_report_time.week
 | 
						|
    msg_dict['gpsTimeOfWeek'] = self.last_report_time.tow
 | 
						|
    msg_dict['timeToFirstFix'] = self.ttff
 | 
						|
    msg_dict['ephemerisStatuses'] = self.create_ephem_statuses()
 | 
						|
    out_msg.gnssMeasurements = msg_dict
 | 
						|
    return out_msg
 | 
						|
 | 
						|
  def update_localizer(self, est_pos, t: float, measurements: List[GNSSMeasurement]):
 | 
						|
    # Check time and outputs are valid
 | 
						|
    valid = self.kf_valid(t)
 | 
						|
    if not all(valid):
 | 
						|
      if not valid[0]:  # Filter not initialized
 | 
						|
        pass
 | 
						|
      elif not valid[1]:
 | 
						|
        cloudlog.error("Time gap of over 10s detected, gnss kalman reset")
 | 
						|
      elif not valid[2]:
 | 
						|
        cloudlog.error("Gnss kalman filter state is nan")
 | 
						|
      if est_pos is not None and len(est_pos) > 0:
 | 
						|
        cloudlog.info(f"Reset kalman filter with {est_pos}")
 | 
						|
        self.init_gnss_localizer(est_pos)
 | 
						|
      else:
 | 
						|
        return
 | 
						|
    if len(measurements) > 0:
 | 
						|
      kf_add_observations(self.gnss_kf, t, measurements)
 | 
						|
    else:
 | 
						|
      # Ensure gnss filter is updated even with no new measurements
 | 
						|
      self.gnss_kf.predict(t)
 | 
						|
 | 
						|
  def kf_valid(self, t: float) -> List[bool]:
 | 
						|
    filter_time = self.gnss_kf.filter.get_filter_time()
 | 
						|
    return [not math.isnan(filter_time),
 | 
						|
            abs(t - filter_time) < MAX_TIME_GAP,
 | 
						|
            all(np.isfinite(self.gnss_kf.x[GStates.ECEF_POS]))]
 | 
						|
 | 
						|
  def init_gnss_localizer(self, est_pos):
 | 
						|
    x_initial, p_initial_diag = np.copy(GNSSKalman.x_initial), np.copy(np.diagonal(GNSSKalman.P_initial))
 | 
						|
    x_initial[GStates.ECEF_POS] = est_pos
 | 
						|
    p_initial_diag[GStates.ECEF_POS] = 1000 ** 2
 | 
						|
    self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
 | 
						|
 | 
						|
  def fetch_navs(self, t: GPSTime, block):
 | 
						|
    # Download new navs if 1 hour of navs data left
 | 
						|
    if t + SECS_IN_HR not in self.astro_dog.navs_fetched_times and (abs(t - self.last_fetch_navs_t) > SECS_IN_MIN):
 | 
						|
      astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types, self.astro_dog.cache_dir
 | 
						|
      ret = None
 | 
						|
 | 
						|
      if block:  # Used for testing purposes
 | 
						|
        ret = get_orbit_data(t, *astro_dog_vars)
 | 
						|
      elif self.orbit_fetch_future is None:
 | 
						|
        self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1)
 | 
						|
        self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars)
 | 
						|
      elif self.orbit_fetch_future.done():
 | 
						|
        ret = self.orbit_fetch_future.result()
 | 
						|
        self.orbit_fetch_executor = self.orbit_fetch_future = None
 | 
						|
 | 
						|
      if ret is not None:
 | 
						|
        if ret[0] is None:
 | 
						|
          self.last_fetch_navs_t = ret[2]
 | 
						|
        else:
 | 
						|
          self.astro_dog.navs, self.astro_dog.navs_fetched_times, self.last_fetch_navs_t = ret
 | 
						|
          self.cache_ephemeris()
 | 
						|
 | 
						|
 | 
						|
def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types, cache_dir):
 | 
						|
  astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, cache_dir=cache_dir)
 | 
						|
  cloudlog.info(f"Start to download/parse navs for time {t.as_datetime()}")
 | 
						|
  start_time = time.monotonic()
 | 
						|
  try:
 | 
						|
    astro_dog.get_navs(t)
 | 
						|
    cloudlog.info(f"Done parsing navs. Took {time.monotonic() - start_time:.1f}s")
 | 
						|
    cloudlog.debug(f"Downloaded navs ({sum([len(v) for v in astro_dog.navs])}): {list(astro_dog.navs.keys())}" +
 | 
						|
                   f"With time range: {[f'{start.as_datetime()}, {end.as_datetime()}' for (start,end) in astro_dog.orbit_fetched_times._ranges]}")
 | 
						|
    return astro_dog.navs, astro_dog.navs_fetched_times, t
 | 
						|
  except (DownloadFailed, RuntimeError, ValueError, IOError) as e:
 | 
						|
    cloudlog.warning(f"No orbit data found or parsing failure: {e}")
 | 
						|
  return None, None, t
 | 
						|
 | 
						|
 | 
						|
def create_measurement_msg(meas: GNSSMeasurement):
 | 
						|
  c = log.GnssMeasurements.CorrectedMeasurement.new_message()
 | 
						|
  c.constellationId = meas.constellation_id.value
 | 
						|
  c.svId = meas.sv_id
 | 
						|
  c.glonassFrequency = meas.glonass_freq if meas.constellation_id == ConstellationId.GLONASS else 0
 | 
						|
  c.pseudorange = float(meas.observables_final['C1C'])
 | 
						|
  c.pseudorangeStd = float(meas.observables_std['C1C'])
 | 
						|
  c.pseudorangeRate = float(meas.observables_final['D1C'])
 | 
						|
  c.pseudorangeRateStd = float(meas.observables_std['D1C'])
 | 
						|
  c.satPos = meas.sat_pos_final.tolist()
 | 
						|
  c.satVel = meas.sat_vel.tolist()
 | 
						|
  c.satVel = meas.sat_vel.tolist()
 | 
						|
  return c
 | 
						|
 | 
						|
def kf_add_observations(gnss_kf: GNSSKalman, t: float, measurements: List[GNSSMeasurement]):
 | 
						|
  ekf_data = defaultdict(list)
 | 
						|
  for m in measurements:
 | 
						|
    m_arr = m.as_array()
 | 
						|
    if m.constellation_id == ConstellationId.GPS:
 | 
						|
      ekf_data[ObservationKind.PSEUDORANGE_GPS].append(m_arr)
 | 
						|
    elif m.constellation_id == ConstellationId.GLONASS:
 | 
						|
      ekf_data[ObservationKind.PSEUDORANGE_GLONASS].append(m_arr)
 | 
						|
  ekf_data[ObservationKind.PSEUDORANGE_RATE_GPS] = ekf_data[ObservationKind.PSEUDORANGE_GPS]
 | 
						|
  ekf_data[ObservationKind.PSEUDORANGE_RATE_GLONASS] = ekf_data[ObservationKind.PSEUDORANGE_GLONASS]
 | 
						|
  for kind, data in ekf_data.items():
 | 
						|
    if len(data) > 0:
 | 
						|
      gnss_kf.predict_and_observe(t, kind, data)
 | 
						|
 | 
						|
 | 
						|
def clear_tmp_cache():
 | 
						|
  if os.path.exists(DOWNLOADS_CACHE_FOLDER):
 | 
						|
    shutil.rmtree(DOWNLOADS_CACHE_FOLDER)
 | 
						|
  os.mkdir(DOWNLOADS_CACHE_FOLDER)
 | 
						|
 | 
						|
 | 
						|
def main(sm=None, pm=None):
 | 
						|
  #clear_tmp_cache()
 | 
						|
 | 
						|
  use_qcom = not Params().get_bool("UbloxAvailable")
 | 
						|
  if use_qcom:
 | 
						|
    raw_name = "qcomGnss"
 | 
						|
  else:
 | 
						|
    raw_name = "ubloxGnss"
 | 
						|
  raw_gnss_sock = messaging.sub_sock(raw_name, conflate=False)
 | 
						|
  if pm is None:
 | 
						|
    pm = messaging.PubMaster(['gnssMeasurements'])
 | 
						|
 | 
						|
  # disable until set as main gps source, to better analyze startup time
 | 
						|
  # TODO ensure low CPU usage before enabling
 | 
						|
  use_internet = False  # "LAIKAD_NO_INTERNET" not in os.environ
 | 
						|
 | 
						|
  replay = "REPLAY" in os.environ
 | 
						|
  laikad = Laikad(save_ephemeris=not replay, auto_fetch_navs=use_internet, use_qcom=use_qcom)
 | 
						|
 | 
						|
  while True:
 | 
						|
    for in_msg in messaging.drain_sock(raw_gnss_sock, wait_for_one=True):
 | 
						|
      out_msg = laikad.process_gnss_msg(getattr(in_msg, raw_name), in_msg.logMonoTime, replay)
 | 
						|
      pm.send('gnssMeasurements', out_msg)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  main()
 | 
						|
 |