You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							364 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							364 lines
						
					
					
						
							11 KiB
						
					
					
				import math
 | 
						|
import json
 | 
						|
import numpy as np
 | 
						|
from datetime import datetime
 | 
						|
from common.basedir import BASEDIR
 | 
						|
from selfdrive.config import Conversions as CV
 | 
						|
from common.transformations.coordinates import LocalCoord, geodetic2ecef
 | 
						|
 | 
						|
LOOKAHEAD_TIME = 10.
 | 
						|
MAPS_LOOKAHEAD_DISTANCE = 50 * LOOKAHEAD_TIME
 | 
						|
 | 
						|
DEFAULT_SPEEDS_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds.json"
 | 
						|
DEFAULT_SPEEDS = {}
 | 
						|
with open(DEFAULT_SPEEDS_JSON_FILE, "rb") as f:
 | 
						|
  DEFAULT_SPEEDS = json.loads(f.read())
 | 
						|
 | 
						|
DEFAULT_SPEEDS_BY_REGION_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds_by_region.json"
 | 
						|
DEFAULT_SPEEDS_BY_REGION = {}
 | 
						|
with open(DEFAULT_SPEEDS_BY_REGION_JSON_FILE, "rb") as f:
 | 
						|
  DEFAULT_SPEEDS_BY_REGION = json.loads(f.read())
 | 
						|
 | 
						|
def circle_through_points(p1, p2, p3):
 | 
						|
  """Fits a circle through three points
 | 
						|
  Formulas from: http://www.ambrsoft.com/trigocalc/circle3d.htm"""
 | 
						|
  x1, y1, _ = p1
 | 
						|
  x2, y2, _ = p2
 | 
						|
  x3, y3, _ = p3
 | 
						|
 | 
						|
  A = x1 * (y2 - y3) - y1 * (x2 - x3) + x2 * y3 - x3 * y2
 | 
						|
  B = (x1**2 + y1**2) * (y3 - y2) + (x2**2 + y2**2) * (y1 - y3) + (x3**2 + y3**2) * (y2 - y1)
 | 
						|
  C = (x1**2 + y1**2) * (x2 - x3) + (x2**2 + y2**2) * (x3 - x1) + (x3**2 + y3**2) * (x1 - x2)
 | 
						|
  D = (x1**2 + y1**2) * (x3 * y2 - x2 * y3) + (x2**2 + y2**2) * (x1 * y3 - x3 * y1) + (x3**2 + y3**2) * (x2 * y1 - x1 * y2)
 | 
						|
 | 
						|
  return (-B / (2 * A), - C / (2 * A), np.sqrt((B**2 + C**2 - 4 * A * D) / (4 * A**2)))
 | 
						|
 | 
						|
def parse_speed_unit(max_speed):
 | 
						|
  """Converts a maxspeed string to m/s based on the unit present in the input.
 | 
						|
  OpenStreetMap defaults to kph if no unit is present. """
 | 
						|
 | 
						|
  if not max_speed:
 | 
						|
    return None
 | 
						|
 | 
						|
  conversion = CV.KPH_TO_MS
 | 
						|
  if 'mph' in max_speed:
 | 
						|
    max_speed = max_speed.replace(' mph', '')
 | 
						|
    conversion = CV.MPH_TO_MS
 | 
						|
  try:
 | 
						|
    return float(max_speed) * conversion
 | 
						|
  except ValueError:
 | 
						|
    return None
 | 
						|
 | 
						|
def parse_speed_tags(tags):
 | 
						|
  """Parses tags on a way to find the maxspeed string"""
 | 
						|
  max_speed = None
 | 
						|
 | 
						|
  if 'maxspeed' in tags:
 | 
						|
    max_speed = tags['maxspeed']
 | 
						|
 | 
						|
  if 'maxspeed:conditional' in tags:
 | 
						|
    try:
 | 
						|
      max_speed_cond, cond = tags['maxspeed:conditional'].split(' @ ')
 | 
						|
      cond = cond[1:-1]
 | 
						|
 | 
						|
      start, end = cond.split('-')
 | 
						|
      now = datetime.now()  # TODO: Get time and timezone from gps fix so this will work correctly on replays
 | 
						|
      start = datetime.strptime(start, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
 | 
						|
      end = datetime.strptime(end, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
 | 
						|
 | 
						|
      if start <= now <= end:
 | 
						|
        max_speed = max_speed_cond
 | 
						|
    except ValueError:
 | 
						|
      pass
 | 
						|
 | 
						|
  if not max_speed and 'source:maxspeed' in tags:
 | 
						|
    max_speed = DEFAULT_SPEEDS.get(tags['source:maxspeed'], None)
 | 
						|
  if not max_speed and 'maxspeed:type' in tags:
 | 
						|
    max_speed = DEFAULT_SPEEDS.get(tags['maxspeed:type'], None)
 | 
						|
 | 
						|
  max_speed = parse_speed_unit(max_speed)
 | 
						|
  return max_speed
 | 
						|
 | 
						|
def geocode_maxspeed(tags, location_info):
 | 
						|
  max_speed = None
 | 
						|
  try:
 | 
						|
    geocode_country = location_info.get('country', '')
 | 
						|
    geocode_region = location_info.get('region', '')
 | 
						|
 | 
						|
    country_rules = DEFAULT_SPEEDS_BY_REGION.get(geocode_country, {})
 | 
						|
    country_defaults = country_rules.get('Default', [])
 | 
						|
    for rule in country_defaults:
 | 
						|
      rule_valid = all(
 | 
						|
        tag_name in tags
 | 
						|
        and tags[tag_name] == value
 | 
						|
        for tag_name, value in rule['tags'].items()
 | 
						|
      )
 | 
						|
      if rule_valid:
 | 
						|
        max_speed = rule['speed']
 | 
						|
        break #stop searching country
 | 
						|
 | 
						|
    region_rules = country_rules.get(geocode_region, [])
 | 
						|
    for rule in region_rules:
 | 
						|
      rule_valid = all(
 | 
						|
        tag_name in tags
 | 
						|
        and tags[tag_name] == value
 | 
						|
        for tag_name, value in rule['tags'].items()
 | 
						|
      )
 | 
						|
      if rule_valid:
 | 
						|
        max_speed = rule['speed']
 | 
						|
        break #stop searching region
 | 
						|
  except KeyError:
 | 
						|
    pass
 | 
						|
  max_speed = parse_speed_unit(max_speed)
 | 
						|
  return max_speed
 | 
						|
 | 
						|
class Way:
 | 
						|
  def __init__(self, way, query_results):
 | 
						|
    self.id = way.id
 | 
						|
    self.way = way
 | 
						|
    self.query_results = query_results
 | 
						|
 | 
						|
    points = list()
 | 
						|
 | 
						|
    for node in self.way.get_nodes(resolve_missing=False):
 | 
						|
      points.append((float(node.lat), float(node.lon), 0.))
 | 
						|
 | 
						|
    self.points = np.asarray(points)
 | 
						|
 | 
						|
  @classmethod
 | 
						|
  def closest(cls, query_results, lat, lon, heading, prev_way=None):
 | 
						|
    results, tree, real_nodes, node_to_way, location_info = query_results
 | 
						|
 | 
						|
    cur_pos = geodetic2ecef((lat, lon, 0))
 | 
						|
    nodes = tree.query_ball_point(cur_pos, 500)
 | 
						|
 | 
						|
    # If no nodes within 500m, choose closest one
 | 
						|
    if not nodes:
 | 
						|
      nodes = [tree.query(cur_pos)[1]]
 | 
						|
 | 
						|
    ways = []
 | 
						|
    for n in nodes:
 | 
						|
      real_node = real_nodes[n]
 | 
						|
      ways += node_to_way[real_node.id]
 | 
						|
    ways = set(ways)
 | 
						|
 | 
						|
    closest_way = None
 | 
						|
    best_score = None
 | 
						|
    for way in ways:
 | 
						|
      way = Way(way, query_results)
 | 
						|
      points = way.points_in_car_frame(lat, lon, heading)
 | 
						|
 | 
						|
      on_way = way.on_way(lat, lon, heading, points)
 | 
						|
      if not on_way:
 | 
						|
        continue
 | 
						|
 | 
						|
      # Create mask of points in front and behind
 | 
						|
      x = points[:, 0]
 | 
						|
      y = points[:, 1]
 | 
						|
      angles = np.arctan2(y, x)
 | 
						|
      front = np.logical_and((-np.pi / 2) < angles,
 | 
						|
                                angles < (np.pi / 2))
 | 
						|
      behind = np.logical_not(front)
 | 
						|
 | 
						|
      dists = np.linalg.norm(points, axis=1)
 | 
						|
 | 
						|
      # Get closest point behind the car
 | 
						|
      dists_behind = np.copy(dists)
 | 
						|
      dists_behind[front] = np.NaN
 | 
						|
      closest_behind = points[np.nanargmin(dists_behind)]
 | 
						|
 | 
						|
      # Get closest point in front of the car
 | 
						|
      dists_front = np.copy(dists)
 | 
						|
      dists_front[behind] = np.NaN
 | 
						|
      closest_front = points[np.nanargmin(dists_front)]
 | 
						|
 | 
						|
      # fit line: y = a*x + b
 | 
						|
      x1, y1, _ = closest_behind
 | 
						|
      x2, y2, _ = closest_front
 | 
						|
      a = (y2 - y1) / max((x2 - x1), 1e-5)
 | 
						|
      b = y1 - a * x1
 | 
						|
 | 
						|
      # With a factor of 60 a 20m offset causes the same error as a 20 degree heading error
 | 
						|
      # (A 20 degree heading offset results in an a of about 1/3)
 | 
						|
      score = abs(a) * 60. + abs(b)
 | 
						|
 | 
						|
      # Prefer same type of road
 | 
						|
      if prev_way is not None:
 | 
						|
        if way.way.tags.get('highway', '') == prev_way.way.tags.get('highway', ''):
 | 
						|
          score *= 0.5
 | 
						|
 | 
						|
      if closest_way is None or score < best_score:
 | 
						|
        closest_way = way
 | 
						|
        best_score = score
 | 
						|
 | 
						|
    # Normal score is < 5
 | 
						|
    if best_score > 50:
 | 
						|
      return None
 | 
						|
 | 
						|
    return closest_way
 | 
						|
 | 
						|
  def __str__(self):
 | 
						|
    return "%s %s" % (self.id, self.way.tags)
 | 
						|
 | 
						|
  def max_speed(self):
 | 
						|
    """Extracts the (conditional) speed limit from a way"""
 | 
						|
    if not self.way:
 | 
						|
      return None
 | 
						|
 | 
						|
    max_speed = parse_speed_tags(self.way.tags)
 | 
						|
    if not max_speed:
 | 
						|
      location_info = self.query_results[4]
 | 
						|
      max_speed = geocode_maxspeed(self.way.tags, location_info)
 | 
						|
 | 
						|
    return max_speed
 | 
						|
 | 
						|
  def max_speed_ahead(self, current_speed_limit, lat, lon, heading, lookahead):
 | 
						|
    """Look ahead for a max speed"""
 | 
						|
    if not self.way:
 | 
						|
      return None
 | 
						|
 | 
						|
    speed_ahead = None
 | 
						|
    speed_ahead_dist = None
 | 
						|
    lookahead_ways = 5
 | 
						|
    way = self
 | 
						|
    for i in range(lookahead_ways):
 | 
						|
      way_pts = way.points_in_car_frame(lat, lon, heading)
 | 
						|
 | 
						|
      # Check current lookahead distance
 | 
						|
      max_dist = np.linalg.norm(way_pts[-1, :])
 | 
						|
 | 
						|
      if max_dist > 2 * lookahead:
 | 
						|
        break
 | 
						|
 | 
						|
      if 'maxspeed' in way.way.tags:
 | 
						|
        spd = parse_speed_tags(way.way.tags)
 | 
						|
        if not spd:
 | 
						|
          location_info = self.query_results[4]
 | 
						|
          spd = geocode_maxspeed(way.way.tags, location_info)
 | 
						|
        if spd < current_speed_limit:
 | 
						|
          speed_ahead = spd
 | 
						|
          min_dist = np.linalg.norm(way_pts[1, :])
 | 
						|
          speed_ahead_dist = min_dist
 | 
						|
          break
 | 
						|
      # Find next way
 | 
						|
      way = way.next_way()
 | 
						|
      if not way:
 | 
						|
        break
 | 
						|
 | 
						|
    return speed_ahead, speed_ahead_dist
 | 
						|
 | 
						|
  def advisory_max_speed(self):
 | 
						|
    if not self.way:
 | 
						|
      return None
 | 
						|
 | 
						|
    tags = self.way.tags
 | 
						|
    adv_speed = None
 | 
						|
 | 
						|
    if 'maxspeed:advisory' in tags:
 | 
						|
      adv_speed = tags['maxspeed:advisory']
 | 
						|
      adv_speed = parse_speed_unit(adv_speed)
 | 
						|
    return adv_speed
 | 
						|
 | 
						|
  def on_way(self, lat, lon, heading, points=None):
 | 
						|
    if points is None:
 | 
						|
      points = self.points_in_car_frame(lat, lon, heading)
 | 
						|
    x = points[:, 0]
 | 
						|
    return np.min(x) < 0. and np.max(x) > 0.
 | 
						|
 | 
						|
  def closest_point(self, lat, lon, heading, points=None):
 | 
						|
    if points is None:
 | 
						|
      points = self.points_in_car_frame(lat, lon, heading)
 | 
						|
    i = np.argmin(np.linalg.norm(points, axis=1))
 | 
						|
    return points[i]
 | 
						|
 | 
						|
  def distance_to_closest_node(self, lat, lon, heading, points=None):
 | 
						|
    if points is None:
 | 
						|
      points = self.points_in_car_frame(lat, lon, heading)
 | 
						|
    return np.min(np.linalg.norm(points, axis=1))
 | 
						|
 | 
						|
  def points_in_car_frame(self, lat, lon, heading):
 | 
						|
    lc = LocalCoord.from_geodetic([lat, lon, 0.])
 | 
						|
 | 
						|
    # Build rotation matrix
 | 
						|
    heading = math.radians(-heading + 90)
 | 
						|
    c, s = np.cos(heading), np.sin(heading)
 | 
						|
    rot = np.array([[c, s, 0.], [-s, c, 0.], [0., 0., 1.]])
 | 
						|
 | 
						|
    # Convert to local coordinates
 | 
						|
    points_carframe = lc.geodetic2ned(self.points).T
 | 
						|
 | 
						|
    # Rotate with heading of car
 | 
						|
    points_carframe = np.dot(rot, points_carframe[(1, 0, 2), :]).T
 | 
						|
 | 
						|
    return points_carframe
 | 
						|
 | 
						|
  def next_way(self, backwards=False):
 | 
						|
    results, tree, real_nodes, node_to_way, location_info = self.query_results
 | 
						|
 | 
						|
    if backwards:
 | 
						|
      node = self.way.nodes[0]
 | 
						|
    else:
 | 
						|
      node = self.way.nodes[-1]
 | 
						|
 | 
						|
    ways = node_to_way[node.id]
 | 
						|
 | 
						|
    way = None
 | 
						|
    try:
 | 
						|
      # Simple heuristic to find next way
 | 
						|
      ways = [w for w in ways if w.id != self.id]
 | 
						|
      ways = [w for w in ways if w.nodes[0] == node]
 | 
						|
 | 
						|
      # Filter on highway tag
 | 
						|
      acceptable_tags = list()
 | 
						|
      cur_tag = self.way.tags['highway']
 | 
						|
      acceptable_tags.append(cur_tag)
 | 
						|
      if cur_tag == 'motorway_link':
 | 
						|
        acceptable_tags.append('motorway')
 | 
						|
        acceptable_tags.append('trunk')
 | 
						|
        acceptable_tags.append('primary')
 | 
						|
      ways = [w for w in ways if w.tags['highway'] in acceptable_tags]
 | 
						|
 | 
						|
      # Filter on number of lanes
 | 
						|
      cur_num_lanes = int(self.way.tags['lanes'])
 | 
						|
      if len(ways) > 1:
 | 
						|
        ways_same_lanes = [w for w in ways if int(w.tags['lanes']) == cur_num_lanes]
 | 
						|
        if len(ways_same_lanes) == 1:
 | 
						|
          ways = ways_same_lanes
 | 
						|
      if len(ways) > 1:
 | 
						|
        ways = [w for w in ways if int(w.tags['lanes']) > cur_num_lanes]
 | 
						|
      if len(ways) == 1:
 | 
						|
        way = Way(ways[0], self.query_results)
 | 
						|
 | 
						|
    except (KeyError, ValueError):
 | 
						|
      pass
 | 
						|
 | 
						|
    return way
 | 
						|
 | 
						|
  def get_lookahead(self, lat, lon, heading, lookahead):
 | 
						|
    pnts = None
 | 
						|
    way = self
 | 
						|
    valid = False
 | 
						|
 | 
						|
    for i in range(5):
 | 
						|
      # Get new points and append to list
 | 
						|
      new_pnts = way.points_in_car_frame(lat, lon, heading)
 | 
						|
 | 
						|
      if pnts is None:
 | 
						|
        pnts = new_pnts
 | 
						|
      else:
 | 
						|
        pnts = np.vstack([pnts, new_pnts])
 | 
						|
 | 
						|
      # Check current lookahead distance
 | 
						|
      max_dist = np.linalg.norm(pnts[-1, :])
 | 
						|
      if max_dist > lookahead:
 | 
						|
        valid = True
 | 
						|
 | 
						|
      if max_dist > 2 * lookahead:
 | 
						|
        break
 | 
						|
 | 
						|
      # Find next way
 | 
						|
      way = way.next_way()
 | 
						|
      if not way:
 | 
						|
        break
 | 
						|
 | 
						|
    return pnts, valid
 | 
						|
 |