You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							372 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							372 lines
						
					
					
						
							12 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import json
 | 
						|
import math
 | 
						|
import os
 | 
						|
import threading
 | 
						|
 | 
						|
import requests
 | 
						|
import numpy as np
 | 
						|
 | 
						|
import cereal.messaging as messaging
 | 
						|
from cereal import log
 | 
						|
from openpilot.common.api import Api
 | 
						|
from openpilot.common.params import Params
 | 
						|
from openpilot.common.realtime import Ratekeeper
 | 
						|
from openpilot.common.transformations.coordinates import ecef2geodetic
 | 
						|
from openpilot.selfdrive.navd.helpers import (Coordinate, coordinate_from_param,
 | 
						|
                                    distance_along_geometry, maxspeed_to_ms,
 | 
						|
                                    minimum_distance,
 | 
						|
                                    parse_banner_instructions)
 | 
						|
from openpilot.system.swaglog import cloudlog
 | 
						|
 | 
						|
REROUTE_DISTANCE = 25
 | 
						|
MANEUVER_TRANSITION_THRESHOLD = 10
 | 
						|
VALID_POS_STD = 50.0
 | 
						|
REROUTE_COUNTER_MIN = 3
 | 
						|
 | 
						|
 | 
						|
class RouteEngine:
 | 
						|
  def __init__(self, sm, pm):
 | 
						|
    self.sm = sm
 | 
						|
    self.pm = pm
 | 
						|
 | 
						|
    self.params = Params()
 | 
						|
 | 
						|
    # Get last gps position from params
 | 
						|
    self.last_position = coordinate_from_param("LastGPSPosition", self.params)
 | 
						|
    self.last_bearing = None
 | 
						|
 | 
						|
    self.gps_ok = False
 | 
						|
    self.localizer_valid = False
 | 
						|
 | 
						|
    self.nav_destination = None
 | 
						|
    self.step_idx = None
 | 
						|
    self.route = None
 | 
						|
    self.route_geometry = None
 | 
						|
 | 
						|
    self.recompute_backoff = 0
 | 
						|
    self.recompute_countdown = 0
 | 
						|
 | 
						|
    self.ui_pid = None
 | 
						|
 | 
						|
    self.reroute_counter = 0
 | 
						|
 | 
						|
    if "MAPBOX_TOKEN" in os.environ:
 | 
						|
      self.mapbox_token = os.environ["MAPBOX_TOKEN"]
 | 
						|
      self.mapbox_host = "https://api.mapbox.com"
 | 
						|
    else:
 | 
						|
      try:
 | 
						|
        self.mapbox_token = Api(self.params.get("DongleId", encoding='utf8')).get_token(expiry_hours=4 * 7 * 24)
 | 
						|
      except FileNotFoundError:
 | 
						|
        cloudlog.exception("Failed to generate mapbox token due to missing private key. Ensure device is registered.")
 | 
						|
        self.mapbox_token = ""
 | 
						|
      self.mapbox_host = "https://maps.comma.ai"
 | 
						|
 | 
						|
  def update(self):
 | 
						|
    self.sm.update(0)
 | 
						|
 | 
						|
    if self.sm.updated["managerState"]:
 | 
						|
      ui_pid = [p.pid for p in self.sm["managerState"].processes if p.name == "ui" and p.running]
 | 
						|
      if ui_pid:
 | 
						|
        if self.ui_pid and self.ui_pid != ui_pid[0]:
 | 
						|
          cloudlog.warning("UI restarting, sending route")
 | 
						|
          threading.Timer(5.0, self.send_route).start()
 | 
						|
        self.ui_pid = ui_pid[0]
 | 
						|
 | 
						|
    self.update_location()
 | 
						|
    self.recompute_route()
 | 
						|
    self.send_instruction()
 | 
						|
 | 
						|
  def update_location(self):
 | 
						|
    location = self.sm['liveLocationKalman']
 | 
						|
    laikad = self.sm['gnssMeasurements']
 | 
						|
 | 
						|
    locationd_valid = (location.status == log.LiveLocationKalman.Status.valid) and location.positionGeodetic.valid
 | 
						|
    laikad_valid = laikad.positionECEF.valid and np.linalg.norm(laikad.positionECEF.std) < VALID_POS_STD
 | 
						|
 | 
						|
    self.localizer_valid = locationd_valid or laikad_valid
 | 
						|
    self.gps_ok = location.gpsOK or laikad_valid
 | 
						|
 | 
						|
    if locationd_valid:
 | 
						|
      self.last_bearing = math.degrees(location.calibratedOrientationNED.value[2])
 | 
						|
      self.last_position = Coordinate(location.positionGeodetic.value[0], location.positionGeodetic.value[1])
 | 
						|
    elif laikad_valid:
 | 
						|
      geodetic = ecef2geodetic(laikad.positionECEF.value)
 | 
						|
      self.last_position = Coordinate(geodetic[0], geodetic[1])
 | 
						|
      self.last_bearing = None
 | 
						|
 | 
						|
  def recompute_route(self):
 | 
						|
    if self.last_position is None:
 | 
						|
      return
 | 
						|
 | 
						|
    new_destination = coordinate_from_param("NavDestination", self.params)
 | 
						|
    if new_destination is None:
 | 
						|
      self.clear_route()
 | 
						|
      self.reset_recompute_limits()
 | 
						|
      return
 | 
						|
 | 
						|
    should_recompute = self.should_recompute()
 | 
						|
    if new_destination != self.nav_destination:
 | 
						|
      cloudlog.warning(f"Got new destination from NavDestination param {new_destination}")
 | 
						|
      should_recompute = True
 | 
						|
 | 
						|
    # Don't recompute when GPS drifts in tunnels
 | 
						|
    if not self.gps_ok and self.step_idx is not None:
 | 
						|
      return
 | 
						|
 | 
						|
    if self.recompute_countdown == 0 and should_recompute:
 | 
						|
      self.recompute_countdown = 2**self.recompute_backoff
 | 
						|
      self.recompute_backoff = min(6, self.recompute_backoff + 1)
 | 
						|
      self.calculate_route(new_destination)
 | 
						|
      self.reroute_counter = 0
 | 
						|
    else:
 | 
						|
      self.recompute_countdown = max(0, self.recompute_countdown - 1)
 | 
						|
 | 
						|
  def calculate_route(self, destination):
 | 
						|
    cloudlog.warning(f"Calculating route {self.last_position} -> {destination}")
 | 
						|
    self.nav_destination = destination
 | 
						|
 | 
						|
    lang = self.params.get('LanguageSetting', encoding='utf8')
 | 
						|
    if lang is not None:
 | 
						|
      lang = lang.replace('main_', '')
 | 
						|
 | 
						|
    params = {
 | 
						|
      'access_token': self.mapbox_token,
 | 
						|
      'annotations': 'maxspeed',
 | 
						|
      'geometries': 'geojson',
 | 
						|
      'overview': 'full',
 | 
						|
      'steps': 'true',
 | 
						|
      'banner_instructions': 'true',
 | 
						|
      'alternatives': 'false',
 | 
						|
      'language': lang,
 | 
						|
    }
 | 
						|
 | 
						|
    # TODO: move waypoints into NavDestination param?
 | 
						|
    waypoints = self.params.get('NavDestinationWaypoints', encoding='utf8')
 | 
						|
    waypoint_coords = []
 | 
						|
    if waypoints is not None and len(waypoints) > 0:
 | 
						|
      waypoint_coords = json.loads(waypoints)
 | 
						|
 | 
						|
    coords = [
 | 
						|
      (self.last_position.longitude, self.last_position.latitude),
 | 
						|
      *waypoint_coords,
 | 
						|
      (destination.longitude, destination.latitude)
 | 
						|
    ]
 | 
						|
    params['waypoints'] = f'0;{len(coords)-1}'
 | 
						|
    if self.last_bearing is not None:
 | 
						|
      params['bearings'] = f"{(self.last_bearing + 360) % 360:.0f},90" + (';'*(len(coords)-1))
 | 
						|
 | 
						|
    coords_str = ';'.join([f'{lon},{lat}' for lon, lat in coords])
 | 
						|
    url = self.mapbox_host + '/directions/v5/mapbox/driving-traffic/' + coords_str
 | 
						|
    try:
 | 
						|
      resp = requests.get(url, params=params, timeout=10)
 | 
						|
      if resp.status_code != 200:
 | 
						|
        cloudlog.event("API request failed", status_code=resp.status_code, text=resp.text, error=True)
 | 
						|
      resp.raise_for_status()
 | 
						|
 | 
						|
      r = resp.json()
 | 
						|
      if len(r['routes']):
 | 
						|
        self.route = r['routes'][0]['legs'][0]['steps']
 | 
						|
        self.route_geometry = []
 | 
						|
 | 
						|
        maxspeed_idx = 0
 | 
						|
        maxspeeds = r['routes'][0]['legs'][0]['annotation']['maxspeed']
 | 
						|
 | 
						|
        # Convert coordinates
 | 
						|
        for step in self.route:
 | 
						|
          coords = []
 | 
						|
 | 
						|
          for c in step['geometry']['coordinates']:
 | 
						|
            coord = Coordinate.from_mapbox_tuple(c)
 | 
						|
 | 
						|
            # Last step does not have maxspeed
 | 
						|
            if (maxspeed_idx < len(maxspeeds)):
 | 
						|
              maxspeed = maxspeeds[maxspeed_idx]
 | 
						|
              if ('unknown' not in maxspeed) and ('none' not in maxspeed):
 | 
						|
                coord.annotations['maxspeed'] = maxspeed_to_ms(maxspeed)
 | 
						|
 | 
						|
            coords.append(coord)
 | 
						|
            maxspeed_idx += 1
 | 
						|
 | 
						|
          self.route_geometry.append(coords)
 | 
						|
          maxspeed_idx -= 1  # Every segment ends with the same coordinate as the start of the next
 | 
						|
 | 
						|
        self.step_idx = 0
 | 
						|
      else:
 | 
						|
        cloudlog.warning("Got empty route response")
 | 
						|
        self.clear_route()
 | 
						|
 | 
						|
      # clear waypoints to avoid a re-route including past waypoints
 | 
						|
      # TODO: only clear once we're past a waypoint
 | 
						|
      self.params.remove('NavDestinationWaypoints')
 | 
						|
 | 
						|
    except requests.exceptions.RequestException:
 | 
						|
      cloudlog.exception("failed to get route")
 | 
						|
      self.clear_route()
 | 
						|
 | 
						|
    self.send_route()
 | 
						|
 | 
						|
  def send_instruction(self):
 | 
						|
    msg = messaging.new_message('navInstruction')
 | 
						|
 | 
						|
    if self.step_idx is None:
 | 
						|
      msg.valid = False
 | 
						|
      self.pm.send('navInstruction', msg)
 | 
						|
      return
 | 
						|
 | 
						|
    step = self.route[self.step_idx]
 | 
						|
    geometry = self.route_geometry[self.step_idx]
 | 
						|
    along_geometry = distance_along_geometry(geometry, self.last_position)
 | 
						|
    distance_to_maneuver_along_geometry = step['distance'] - along_geometry
 | 
						|
 | 
						|
    # Banner instructions are for the following maneuver step, don't use empty last step
 | 
						|
    banner_step = step
 | 
						|
    if not len(banner_step['bannerInstructions']) and self.step_idx == len(self.route) - 1:
 | 
						|
      banner_step = self.route[max(self.step_idx - 1, 0)]
 | 
						|
 | 
						|
    # Current instruction
 | 
						|
    msg.navInstruction.maneuverDistance = distance_to_maneuver_along_geometry
 | 
						|
    instruction = parse_banner_instructions(banner_step['bannerInstructions'], distance_to_maneuver_along_geometry)
 | 
						|
    if instruction is not None:
 | 
						|
      for k,v in instruction.items():
 | 
						|
        setattr(msg.navInstruction, k, v)
 | 
						|
 | 
						|
    # All instructions
 | 
						|
    maneuvers = []
 | 
						|
    for i, step_i in enumerate(self.route):
 | 
						|
      if i < self.step_idx:
 | 
						|
        distance_to_maneuver = -sum(self.route[j]['distance'] for j in range(i+1, self.step_idx)) - along_geometry
 | 
						|
      elif i == self.step_idx:
 | 
						|
        distance_to_maneuver = distance_to_maneuver_along_geometry
 | 
						|
      else:
 | 
						|
        distance_to_maneuver = distance_to_maneuver_along_geometry + sum(self.route[j]['distance'] for j in range(self.step_idx+1, i+1))
 | 
						|
 | 
						|
      instruction = parse_banner_instructions(step_i['bannerInstructions'], distance_to_maneuver)
 | 
						|
      if instruction is None:
 | 
						|
        continue
 | 
						|
      maneuver = {'distance': distance_to_maneuver}
 | 
						|
      if 'maneuverType' in instruction:
 | 
						|
        maneuver['type'] = instruction['maneuverType']
 | 
						|
      if 'maneuverModifier' in instruction:
 | 
						|
        maneuver['modifier'] = instruction['maneuverModifier']
 | 
						|
      maneuvers.append(maneuver)
 | 
						|
 | 
						|
    msg.navInstruction.allManeuvers = maneuvers
 | 
						|
 | 
						|
    # Compute total remaining time and distance
 | 
						|
    remaining = 1.0 - along_geometry / max(step['distance'], 1)
 | 
						|
    total_distance = step['distance'] * remaining
 | 
						|
    total_time = step['duration'] * remaining
 | 
						|
 | 
						|
    if step['duration_typical'] is None:
 | 
						|
      total_time_typical = total_time
 | 
						|
    else:
 | 
						|
      total_time_typical = step['duration_typical'] * remaining
 | 
						|
 | 
						|
    # Add up totals for future steps
 | 
						|
    for i in range(self.step_idx + 1, len(self.route)):
 | 
						|
      total_distance += self.route[i]['distance']
 | 
						|
      total_time += self.route[i]['duration']
 | 
						|
      total_time_typical += self.route[i]['duration_typical']
 | 
						|
 | 
						|
    msg.navInstruction.distanceRemaining = total_distance
 | 
						|
    msg.navInstruction.timeRemaining = total_time
 | 
						|
    msg.navInstruction.timeRemainingTypical = total_time_typical
 | 
						|
 | 
						|
    # Speed limit
 | 
						|
    closest_idx, closest = min(enumerate(geometry), key=lambda p: p[1].distance_to(self.last_position))
 | 
						|
    if closest_idx > 0:
 | 
						|
      # If we are not past the closest point, show previous
 | 
						|
      if along_geometry < distance_along_geometry(geometry, geometry[closest_idx]):
 | 
						|
        closest = geometry[closest_idx - 1]
 | 
						|
 | 
						|
    if ('maxspeed' in closest.annotations) and self.localizer_valid:
 | 
						|
      msg.navInstruction.speedLimit = closest.annotations['maxspeed']
 | 
						|
 | 
						|
    # Speed limit sign type
 | 
						|
    if 'speedLimitSign' in step:
 | 
						|
      if step['speedLimitSign'] == 'mutcd':
 | 
						|
        msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.mutcd
 | 
						|
      elif step['speedLimitSign'] == 'vienna':
 | 
						|
        msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.vienna
 | 
						|
 | 
						|
    self.pm.send('navInstruction', msg)
 | 
						|
 | 
						|
    # Transition to next route segment
 | 
						|
    if distance_to_maneuver_along_geometry < -MANEUVER_TRANSITION_THRESHOLD:
 | 
						|
      if self.step_idx + 1 < len(self.route):
 | 
						|
        self.step_idx += 1
 | 
						|
        self.reset_recompute_limits()
 | 
						|
      else:
 | 
						|
        cloudlog.warning("Destination reached")
 | 
						|
 | 
						|
        # Clear route if driving away from destination
 | 
						|
        dist = self.nav_destination.distance_to(self.last_position)
 | 
						|
        if dist > REROUTE_DISTANCE:
 | 
						|
          self.params.remove("NavDestination")
 | 
						|
          self.clear_route()
 | 
						|
 | 
						|
  def send_route(self):
 | 
						|
    coords = []
 | 
						|
 | 
						|
    if self.route is not None:
 | 
						|
      for path in self.route_geometry:
 | 
						|
        coords += [c.as_dict() for c in path]
 | 
						|
 | 
						|
    msg = messaging.new_message('navRoute')
 | 
						|
    msg.navRoute.coordinates = coords
 | 
						|
    self.pm.send('navRoute', msg)
 | 
						|
 | 
						|
  def clear_route(self):
 | 
						|
    self.route = None
 | 
						|
    self.route_geometry = None
 | 
						|
    self.step_idx = None
 | 
						|
    self.nav_destination = None
 | 
						|
 | 
						|
  def reset_recompute_limits(self):
 | 
						|
    self.recompute_backoff = 0
 | 
						|
    self.recompute_countdown = 0
 | 
						|
 | 
						|
  def should_recompute(self):
 | 
						|
    if self.step_idx is None or self.route is None:
 | 
						|
      return True
 | 
						|
 | 
						|
    # Don't recompute in last segment, assume destination is reached
 | 
						|
    if self.step_idx == len(self.route) - 1:
 | 
						|
      return False
 | 
						|
 | 
						|
    # Compute closest distance to all line segments in the current path
 | 
						|
    min_d = REROUTE_DISTANCE + 1
 | 
						|
    path = self.route_geometry[self.step_idx]
 | 
						|
    for i in range(len(path) - 1):
 | 
						|
      a = path[i]
 | 
						|
      b = path[i + 1]
 | 
						|
 | 
						|
      if a.distance_to(b) < 1.0:
 | 
						|
        continue
 | 
						|
 | 
						|
      min_d = min(min_d, minimum_distance(a, b, self.last_position))
 | 
						|
 | 
						|
    if min_d > REROUTE_DISTANCE:
 | 
						|
      self.reroute_counter += 1
 | 
						|
    else:
 | 
						|
      self.reroute_counter = 0
 | 
						|
    return self.reroute_counter > REROUTE_COUNTER_MIN
 | 
						|
    # TODO: Check for going wrong way in segment
 | 
						|
 | 
						|
 | 
						|
def main(sm=None, pm=None):
 | 
						|
  if sm is None:
 | 
						|
    sm = messaging.SubMaster(['liveLocationKalman', 'gnssMeasurements', 'managerState'])
 | 
						|
  if pm is None:
 | 
						|
    pm = messaging.PubMaster(['navInstruction', 'navRoute'])
 | 
						|
 | 
						|
  rk = Ratekeeper(1.0)
 | 
						|
  route_engine = RouteEngine(sm, pm)
 | 
						|
  while True:
 | 
						|
    route_engine.update()
 | 
						|
    rk.keep_time()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  main()
 | 
						|
 |