You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							364 lines
						
					
					
						
							11 KiB
						
					
					
				
			
		
		
	
	
							364 lines
						
					
					
						
							11 KiB
						
					
					
				| import math
 | |
| import json
 | |
| import numpy as np
 | |
| from datetime import datetime
 | |
| from common.basedir import BASEDIR
 | |
| from selfdrive.config import Conversions as CV
 | |
| from common.transformations.coordinates import LocalCoord, geodetic2ecef
 | |
| 
 | |
| LOOKAHEAD_TIME = 10.
 | |
| MAPS_LOOKAHEAD_DISTANCE = 50 * LOOKAHEAD_TIME
 | |
| 
 | |
| DEFAULT_SPEEDS_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds.json"
 | |
| DEFAULT_SPEEDS = {}
 | |
| with open(DEFAULT_SPEEDS_JSON_FILE, "rb") as f:
 | |
|   DEFAULT_SPEEDS = json.loads(f.read())
 | |
| 
 | |
| DEFAULT_SPEEDS_BY_REGION_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds_by_region.json"
 | |
| DEFAULT_SPEEDS_BY_REGION = {}
 | |
| with open(DEFAULT_SPEEDS_BY_REGION_JSON_FILE, "rb") as f:
 | |
|   DEFAULT_SPEEDS_BY_REGION = json.loads(f.read())
 | |
| 
 | |
| def circle_through_points(p1, p2, p3):
 | |
|   """Fits a circle through three points
 | |
|   Formulas from: http://www.ambrsoft.com/trigocalc/circle3d.htm"""
 | |
|   x1, y1, _ = p1
 | |
|   x2, y2, _ = p2
 | |
|   x3, y3, _ = p3
 | |
| 
 | |
|   A = x1 * (y2 - y3) - y1 * (x2 - x3) + x2 * y3 - x3 * y2
 | |
|   B = (x1**2 + y1**2) * (y3 - y2) + (x2**2 + y2**2) * (y1 - y3) + (x3**2 + y3**2) * (y2 - y1)
 | |
|   C = (x1**2 + y1**2) * (x2 - x3) + (x2**2 + y2**2) * (x3 - x1) + (x3**2 + y3**2) * (x1 - x2)
 | |
|   D = (x1**2 + y1**2) * (x3 * y2 - x2 * y3) + (x2**2 + y2**2) * (x1 * y3 - x3 * y1) + (x3**2 + y3**2) * (x2 * y1 - x1 * y2)
 | |
| 
 | |
|   return (-B / (2 * A), - C / (2 * A), np.sqrt((B**2 + C**2 - 4 * A * D) / (4 * A**2)))
 | |
| 
 | |
| def parse_speed_unit(max_speed):
 | |
|   """Converts a maxspeed string to m/s based on the unit present in the input.
 | |
|   OpenStreetMap defaults to kph if no unit is present. """
 | |
| 
 | |
|   if not max_speed:
 | |
|     return None
 | |
| 
 | |
|   conversion = CV.KPH_TO_MS
 | |
|   if 'mph' in max_speed:
 | |
|     max_speed = max_speed.replace(' mph', '')
 | |
|     conversion = CV.MPH_TO_MS
 | |
|   try:
 | |
|     return float(max_speed) * conversion
 | |
|   except ValueError:
 | |
|     return None
 | |
| 
 | |
| def parse_speed_tags(tags):
 | |
|   """Parses tags on a way to find the maxspeed string"""
 | |
|   max_speed = None
 | |
| 
 | |
|   if 'maxspeed' in tags:
 | |
|     max_speed = tags['maxspeed']
 | |
| 
 | |
|   if 'maxspeed:conditional' in tags:
 | |
|     try:
 | |
|       max_speed_cond, cond = tags['maxspeed:conditional'].split(' @ ')
 | |
|       cond = cond[1:-1]
 | |
| 
 | |
|       start, end = cond.split('-')
 | |
|       now = datetime.now()  # TODO: Get time and timezone from gps fix so this will work correctly on replays
 | |
|       start = datetime.strptime(start, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
 | |
|       end = datetime.strptime(end, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
 | |
| 
 | |
|       if start <= now <= end:
 | |
|         max_speed = max_speed_cond
 | |
|     except ValueError:
 | |
|       pass
 | |
| 
 | |
|   if not max_speed and 'source:maxspeed' in tags:
 | |
|     max_speed = DEFAULT_SPEEDS.get(tags['source:maxspeed'], None)
 | |
|   if not max_speed and 'maxspeed:type' in tags:
 | |
|     max_speed = DEFAULT_SPEEDS.get(tags['maxspeed:type'], None)
 | |
| 
 | |
|   max_speed = parse_speed_unit(max_speed)
 | |
|   return max_speed
 | |
| 
 | |
| def geocode_maxspeed(tags, location_info):
 | |
|   max_speed = None
 | |
|   try:
 | |
|     geocode_country = location_info.get('country', '')
 | |
|     geocode_region = location_info.get('region', '')
 | |
| 
 | |
|     country_rules = DEFAULT_SPEEDS_BY_REGION.get(geocode_country, {})
 | |
|     country_defaults = country_rules.get('Default', [])
 | |
|     for rule in country_defaults:
 | |
|       rule_valid = all(
 | |
|         tag_name in tags
 | |
|         and tags[tag_name] == value
 | |
|         for tag_name, value in rule['tags'].items()
 | |
|       )
 | |
|       if rule_valid:
 | |
|         max_speed = rule['speed']
 | |
|         break #stop searching country
 | |
| 
 | |
|     region_rules = country_rules.get(geocode_region, [])
 | |
|     for rule in region_rules:
 | |
|       rule_valid = all(
 | |
|         tag_name in tags
 | |
|         and tags[tag_name] == value
 | |
|         for tag_name, value in rule['tags'].items()
 | |
|       )
 | |
|       if rule_valid:
 | |
|         max_speed = rule['speed']
 | |
|         break #stop searching region
 | |
|   except KeyError:
 | |
|     pass
 | |
|   max_speed = parse_speed_unit(max_speed)
 | |
|   return max_speed
 | |
| 
 | |
| class Way:
 | |
|   def __init__(self, way, query_results):
 | |
|     self.id = way.id
 | |
|     self.way = way
 | |
|     self.query_results = query_results
 | |
| 
 | |
|     points = list()
 | |
| 
 | |
|     for node in self.way.get_nodes(resolve_missing=False):
 | |
|       points.append((float(node.lat), float(node.lon), 0.))
 | |
| 
 | |
|     self.points = np.asarray(points)
 | |
| 
 | |
|   @classmethod
 | |
|   def closest(cls, query_results, lat, lon, heading, prev_way=None):
 | |
|     results, tree, real_nodes, node_to_way, location_info = query_results
 | |
| 
 | |
|     cur_pos = geodetic2ecef((lat, lon, 0))
 | |
|     nodes = tree.query_ball_point(cur_pos, 500)
 | |
| 
 | |
|     # If no nodes within 500m, choose closest one
 | |
|     if not nodes:
 | |
|       nodes = [tree.query(cur_pos)[1]]
 | |
| 
 | |
|     ways = []
 | |
|     for n in nodes:
 | |
|       real_node = real_nodes[n]
 | |
|       ways += node_to_way[real_node.id]
 | |
|     ways = set(ways)
 | |
| 
 | |
|     closest_way = None
 | |
|     best_score = None
 | |
|     for way in ways:
 | |
|       way = Way(way, query_results)
 | |
|       points = way.points_in_car_frame(lat, lon, heading)
 | |
| 
 | |
|       on_way = way.on_way(lat, lon, heading, points)
 | |
|       if not on_way:
 | |
|         continue
 | |
| 
 | |
|       # Create mask of points in front and behind
 | |
|       x = points[:, 0]
 | |
|       y = points[:, 1]
 | |
|       angles = np.arctan2(y, x)
 | |
|       front = np.logical_and((-np.pi / 2) < angles,
 | |
|                                 angles < (np.pi / 2))
 | |
|       behind = np.logical_not(front)
 | |
| 
 | |
|       dists = np.linalg.norm(points, axis=1)
 | |
| 
 | |
|       # Get closest point behind the car
 | |
|       dists_behind = np.copy(dists)
 | |
|       dists_behind[front] = np.NaN
 | |
|       closest_behind = points[np.nanargmin(dists_behind)]
 | |
| 
 | |
|       # Get closest point in front of the car
 | |
|       dists_front = np.copy(dists)
 | |
|       dists_front[behind] = np.NaN
 | |
|       closest_front = points[np.nanargmin(dists_front)]
 | |
| 
 | |
|       # fit line: y = a*x + b
 | |
|       x1, y1, _ = closest_behind
 | |
|       x2, y2, _ = closest_front
 | |
|       a = (y2 - y1) / max((x2 - x1), 1e-5)
 | |
|       b = y1 - a * x1
 | |
| 
 | |
|       # With a factor of 60 a 20m offset causes the same error as a 20 degree heading error
 | |
|       # (A 20 degree heading offset results in an a of about 1/3)
 | |
|       score = abs(a) * 60. + abs(b)
 | |
| 
 | |
|       # Prefer same type of road
 | |
|       if prev_way is not None:
 | |
|         if way.way.tags.get('highway', '') == prev_way.way.tags.get('highway', ''):
 | |
|           score *= 0.5
 | |
| 
 | |
|       if closest_way is None or score < best_score:
 | |
|         closest_way = way
 | |
|         best_score = score
 | |
| 
 | |
|     # Normal score is < 5
 | |
|     if best_score > 50:
 | |
|       return None
 | |
| 
 | |
|     return closest_way
 | |
| 
 | |
|   def __str__(self):
 | |
|     return "%s %s" % (self.id, self.way.tags)
 | |
| 
 | |
|   def max_speed(self):
 | |
|     """Extracts the (conditional) speed limit from a way"""
 | |
|     if not self.way:
 | |
|       return None
 | |
| 
 | |
|     max_speed = parse_speed_tags(self.way.tags)
 | |
|     if not max_speed:
 | |
|       location_info = self.query_results[4]
 | |
|       max_speed = geocode_maxspeed(self.way.tags, location_info)
 | |
| 
 | |
|     return max_speed
 | |
| 
 | |
|   def max_speed_ahead(self, current_speed_limit, lat, lon, heading, lookahead):
 | |
|     """Look ahead for a max speed"""
 | |
|     if not self.way:
 | |
|       return None
 | |
| 
 | |
|     speed_ahead = None
 | |
|     speed_ahead_dist = None
 | |
|     lookahead_ways = 5
 | |
|     way = self
 | |
|     for i in range(lookahead_ways):
 | |
|       way_pts = way.points_in_car_frame(lat, lon, heading)
 | |
| 
 | |
|       # Check current lookahead distance
 | |
|       max_dist = np.linalg.norm(way_pts[-1, :])
 | |
| 
 | |
|       if max_dist > 2 * lookahead:
 | |
|         break
 | |
| 
 | |
|       if 'maxspeed' in way.way.tags:
 | |
|         spd = parse_speed_tags(way.way.tags)
 | |
|         if not spd:
 | |
|           location_info = self.query_results[4]
 | |
|           spd = geocode_maxspeed(way.way.tags, location_info)
 | |
|         if spd < current_speed_limit:
 | |
|           speed_ahead = spd
 | |
|           min_dist = np.linalg.norm(way_pts[1, :])
 | |
|           speed_ahead_dist = min_dist
 | |
|           break
 | |
|       # Find next way
 | |
|       way = way.next_way()
 | |
|       if not way:
 | |
|         break
 | |
| 
 | |
|     return speed_ahead, speed_ahead_dist
 | |
| 
 | |
|   def advisory_max_speed(self):
 | |
|     if not self.way:
 | |
|       return None
 | |
| 
 | |
|     tags = self.way.tags
 | |
|     adv_speed = None
 | |
| 
 | |
|     if 'maxspeed:advisory' in tags:
 | |
|       adv_speed = tags['maxspeed:advisory']
 | |
|       adv_speed = parse_speed_unit(adv_speed)
 | |
|     return adv_speed
 | |
| 
 | |
|   def on_way(self, lat, lon, heading, points=None):
 | |
|     if points is None:
 | |
|       points = self.points_in_car_frame(lat, lon, heading)
 | |
|     x = points[:, 0]
 | |
|     return np.min(x) < 0. and np.max(x) > 0.
 | |
| 
 | |
|   def closest_point(self, lat, lon, heading, points=None):
 | |
|     if points is None:
 | |
|       points = self.points_in_car_frame(lat, lon, heading)
 | |
|     i = np.argmin(np.linalg.norm(points, axis=1))
 | |
|     return points[i]
 | |
| 
 | |
|   def distance_to_closest_node(self, lat, lon, heading, points=None):
 | |
|     if points is None:
 | |
|       points = self.points_in_car_frame(lat, lon, heading)
 | |
|     return np.min(np.linalg.norm(points, axis=1))
 | |
| 
 | |
|   def points_in_car_frame(self, lat, lon, heading):
 | |
|     lc = LocalCoord.from_geodetic([lat, lon, 0.])
 | |
| 
 | |
|     # Build rotation matrix
 | |
|     heading = math.radians(-heading + 90)
 | |
|     c, s = np.cos(heading), np.sin(heading)
 | |
|     rot = np.array([[c, s, 0.], [-s, c, 0.], [0., 0., 1.]])
 | |
| 
 | |
|     # Convert to local coordinates
 | |
|     points_carframe = lc.geodetic2ned(self.points).T
 | |
| 
 | |
|     # Rotate with heading of car
 | |
|     points_carframe = np.dot(rot, points_carframe[(1, 0, 2), :]).T
 | |
| 
 | |
|     return points_carframe
 | |
| 
 | |
|   def next_way(self, backwards=False):
 | |
|     results, tree, real_nodes, node_to_way, location_info = self.query_results
 | |
| 
 | |
|     if backwards:
 | |
|       node = self.way.nodes[0]
 | |
|     else:
 | |
|       node = self.way.nodes[-1]
 | |
| 
 | |
|     ways = node_to_way[node.id]
 | |
| 
 | |
|     way = None
 | |
|     try:
 | |
|       # Simple heuristic to find next way
 | |
|       ways = [w for w in ways if w.id != self.id]
 | |
|       ways = [w for w in ways if w.nodes[0] == node]
 | |
| 
 | |
|       # Filter on highway tag
 | |
|       acceptable_tags = list()
 | |
|       cur_tag = self.way.tags['highway']
 | |
|       acceptable_tags.append(cur_tag)
 | |
|       if cur_tag == 'motorway_link':
 | |
|         acceptable_tags.append('motorway')
 | |
|         acceptable_tags.append('trunk')
 | |
|         acceptable_tags.append('primary')
 | |
|       ways = [w for w in ways if w.tags['highway'] in acceptable_tags]
 | |
| 
 | |
|       # Filter on number of lanes
 | |
|       cur_num_lanes = int(self.way.tags['lanes'])
 | |
|       if len(ways) > 1:
 | |
|         ways_same_lanes = [w for w in ways if int(w.tags['lanes']) == cur_num_lanes]
 | |
|         if len(ways_same_lanes) == 1:
 | |
|           ways = ways_same_lanes
 | |
|       if len(ways) > 1:
 | |
|         ways = [w for w in ways if int(w.tags['lanes']) > cur_num_lanes]
 | |
|       if len(ways) == 1:
 | |
|         way = Way(ways[0], self.query_results)
 | |
| 
 | |
|     except (KeyError, ValueError):
 | |
|       pass
 | |
| 
 | |
|     return way
 | |
| 
 | |
|   def get_lookahead(self, lat, lon, heading, lookahead):
 | |
|     pnts = None
 | |
|     way = self
 | |
|     valid = False
 | |
| 
 | |
|     for i in range(5):
 | |
|       # Get new points and append to list
 | |
|       new_pnts = way.points_in_car_frame(lat, lon, heading)
 | |
| 
 | |
|       if pnts is None:
 | |
|         pnts = new_pnts
 | |
|       else:
 | |
|         pnts = np.vstack([pnts, new_pnts])
 | |
| 
 | |
|       # Check current lookahead distance
 | |
|       max_dist = np.linalg.norm(pnts[-1, :])
 | |
|       if max_dist > lookahead:
 | |
|         valid = True
 | |
| 
 | |
|       if max_dist > 2 * lookahead:
 | |
|         break
 | |
| 
 | |
|       # Find next way
 | |
|       way = way.next_way()
 | |
|       if not way:
 | |
|         break
 | |
| 
 | |
|     return pnts, valid
 | |
| 
 |