navd: broadcast sped limit (#24681)

* populate speedLimit field in navInstruction

* show in ui

* ensure coordinate is behind you

* handle unknown

* handle here

* a bit prettier

* US design

* no ui changes in this PR

* parse sign
old-commit-hash: 64ceb64511
taco
Willem Melching 3 years ago committed by GitHub
parent 875961bb62
commit 7b3faad3ee
  1. 14
      selfdrive/navd/helpers.py
  2. 41
      selfdrive/navd/navd.py

@ -2,18 +2,24 @@ from __future__ import annotations
import json import json
import math import math
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple, Union, cast
from common.conversions import Conversions
from common.numpy_fast import clip from common.numpy_fast import clip
from common.params import Params from common.params import Params
EARTH_MEAN_RADIUS = 6371007.2 EARTH_MEAN_RADIUS = 6371007.2
SPEED_CONVERSIONS = {
'km/h': Conversions.KPH_TO_MS,
'mph': Conversions.MPH_TO_MS,
}
class Coordinate: class Coordinate:
def __init__(self, latitude: float, longitude: float) -> None: def __init__(self, latitude: float, longitude: float) -> None:
self.latitude = latitude self.latitude = latitude
self.longitude = longitude self.longitude = longitude
self.annotations: Dict[str, float] = {}
@classmethod @classmethod
def from_mapbox_tuple(cls, t: Tuple[float, float]) -> Coordinate: def from_mapbox_tuple(cls, t: Tuple[float, float]) -> Coordinate:
@ -116,6 +122,12 @@ def string_to_direction(direction: str) -> str:
return 'none' return 'none'
def maxspeed_to_ms(maxspeed: Dict[str, Union[str, float]]) -> float:
unit = cast(str, maxspeed['unit'])
speed = cast(float, maxspeed['speed'])
return SPEED_CONVERSIONS[unit] * speed
def parse_banner_instructions(instruction: Any, banners: Any, distance_to_maneuver: float = 0.0) -> None: def parse_banner_instructions(instruction: Any, banners: Any, distance_to_maneuver: float = 0.0) -> None:
if not len(banners): if not len(banners):
return return

@ -10,11 +10,11 @@ from cereal import log
from common.api import Api from common.api import Api
from common.params import Params from common.params import Params
from common.realtime import Ratekeeper from common.realtime import Ratekeeper
from selfdrive.swaglog import cloudlog
from selfdrive.navd.helpers import (Coordinate, coordinate_from_param, from selfdrive.navd.helpers import (Coordinate, coordinate_from_param,
distance_along_geometry, distance_along_geometry, maxspeed_to_ms,
minimum_distance, minimum_distance,
parse_banner_instructions) parse_banner_instructions)
from selfdrive.swaglog import cloudlog
REROUTE_DISTANCE = 25 REROUTE_DISTANCE = 25
MANEUVER_TRANSITION_THRESHOLD = 10 MANEUVER_TRANSITION_THRESHOLD = 10
@ -110,7 +110,7 @@ class RouteEngine:
params = { params = {
'access_token': self.mapbox_token, 'access_token': self.mapbox_token,
# 'annotations': 'maxspeed', 'annotations': 'maxspeed',
'geometries': 'geojson', 'geometries': 'geojson',
'overview': 'full', 'overview': 'full',
'steps': 'true', 'steps': 'true',
@ -131,9 +131,25 @@ class RouteEngine:
self.route = r['routes'][0]['legs'][0]['steps'] self.route = r['routes'][0]['legs'][0]['steps']
self.route_geometry = [] self.route_geometry = []
maxspeed_idx = 0
maxspeeds = r['routes'][0]['legs'][0]['annotation']['maxspeed']
# Convert coordinates # Convert coordinates
for step in self.route: for step in self.route:
self.route_geometry.append([Coordinate.from_mapbox_tuple(c) for c in step['geometry']['coordinates']]) coords = []
for c in step['geometry']['coordinates']:
coord = Coordinate.from_mapbox_tuple(c)
# Last step does not have maxspeed
if (maxspeed_idx < len(maxspeeds)) and ('unknown' not in maxspeeds[maxspeed_idx]):
coord.annotations['maxspeed'] = maxspeed_to_ms(maxspeeds[maxspeed_idx])
coords.append(coord)
maxspeed_idx += 1
self.route_geometry.append(coords)
maxspeed_idx -= 1 # Every segment ends with the same coordinate as the start of the next
self.step_idx = 0 self.step_idx = 0
else: else:
@ -179,6 +195,23 @@ class RouteEngine:
msg.navInstruction.timeRemaining = total_time msg.navInstruction.timeRemaining = total_time
msg.navInstruction.timeRemainingTypical = total_time_typical msg.navInstruction.timeRemainingTypical = total_time_typical
# Speed limit
closest_idx, closest = min(enumerate(geometry), key=lambda p: p[1].distance_to(self.last_position))
if closest_idx > 0:
# If we are not past the closest point, show previous
if along_geometry < distance_along_geometry(geometry, geometry[closest_idx]):
closest = geometry[closest_idx - 1]
if 'maxspeed' in closest.annotations:
msg.navInstruction.speedLimit = closest.annotations['maxspeed']
# Speed limit sign type
if 'speedLimitSign' in step:
if step['speedLimitSign'] == 'mutcd':
msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.mutcd
elif step['speedLimitSign'] == 'vienna':
msg.navInstruction.speedLimitSign = log.NavInstruction.SpeedLimitSign.vienna
self.pm.send('navInstruction', msg) self.pm.send('navInstruction', msg)
# Transition to next route segment # Transition to next route segment

Loading…
Cancel
Save