You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
361 lines
11 KiB
361 lines
11 KiB
import six
|
|
import math
|
|
import json
|
|
import numpy as np
|
|
from datetime import datetime
|
|
from common.basedir import BASEDIR
|
|
from selfdrive.config import Conversions as CV
|
|
from common.transformations.coordinates import LocalCoord, geodetic2ecef
|
|
|
|
LOOKAHEAD_TIME = 10.
|
|
MAPS_LOOKAHEAD_DISTANCE = 50 * LOOKAHEAD_TIME
|
|
|
|
DEFAULT_SPEEDS_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds.json"
|
|
DEFAULT_SPEEDS = {}
|
|
with open(DEFAULT_SPEEDS_JSON_FILE, "rb") as f:
|
|
DEFAULT_SPEEDS = json.loads(f.read())
|
|
|
|
DEFAULT_SPEEDS_BY_REGION_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds_by_region.json"
|
|
DEFAULT_SPEEDS_BY_REGION = {}
|
|
with open(DEFAULT_SPEEDS_BY_REGION_JSON_FILE, "rb") as f:
|
|
DEFAULT_SPEEDS_BY_REGION = json.loads(f.read())
|
|
|
|
def circle_through_points(p1, p2, p3):
|
|
"""Fits a circle through three points
|
|
Formulas from: http://www.ambrsoft.com/trigocalc/circle3d.htm"""
|
|
x1, y1, _ = p1
|
|
x2, y2, _ = p2
|
|
x3, y3, _ = p3
|
|
|
|
A = x1 * (y2 - y3) - y1 * (x2 - x3) + x2 * y3 - x3 * y2
|
|
B = (x1**2 + y1**2) * (y3 - y2) + (x2**2 + y2**2) * (y1 - y3) + (x3**2 + y3**2) * (y2 - y1)
|
|
C = (x1**2 + y1**2) * (x2 - x3) + (x2**2 + y2**2) * (x3 - x1) + (x3**2 + y3**2) * (x1 - x2)
|
|
D = (x1**2 + y1**2) * (x3 * y2 - x2 * y3) + (x2**2 + y2**2) * (x1 * y3 - x3 * y1) + (x3**2 + y3**2) * (x2 * y1 - x1 * y2)
|
|
|
|
return (-B / (2 * A), - C / (2 * A), np.sqrt((B**2 + C**2 - 4 * A * D) / (4 * A**2)))
|
|
|
|
def parse_speed_unit(max_speed):
|
|
"""Converts a maxspeed string to m/s based on the unit present in the input.
|
|
OpenStreetMap defaults to kph if no unit is present. """
|
|
|
|
if not max_speed:
|
|
return None
|
|
|
|
conversion = CV.KPH_TO_MS
|
|
if 'mph' in max_speed:
|
|
max_speed = max_speed.replace(' mph', '')
|
|
conversion = CV.MPH_TO_MS
|
|
try:
|
|
return float(max_speed) * conversion
|
|
except ValueError:
|
|
return None
|
|
|
|
def parse_speed_tags(tags):
|
|
"""Parses tags on a way to find the maxspeed string"""
|
|
max_speed = None
|
|
|
|
if 'maxspeed' in tags:
|
|
max_speed = tags['maxspeed']
|
|
|
|
if 'maxspeed:conditional' in tags:
|
|
try:
|
|
max_speed_cond, cond = tags['maxspeed:conditional'].split(' @ ')
|
|
cond = cond[1:-1]
|
|
|
|
start, end = cond.split('-')
|
|
now = datetime.now() # TODO: Get time and timezone from gps fix so this will work correctly on replays
|
|
start = datetime.strptime(start, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
|
|
end = datetime.strptime(end, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
|
|
|
|
if start <= now <= end:
|
|
max_speed = max_speed_cond
|
|
except ValueError:
|
|
pass
|
|
|
|
if not max_speed and 'source:maxspeed' in tags:
|
|
max_speed = DEFAULT_SPEEDS.get(tags['source:maxspeed'], None)
|
|
if not max_speed and 'maxspeed:type' in tags:
|
|
max_speed = DEFAULT_SPEEDS.get(tags['maxspeed:type'], None)
|
|
|
|
max_speed = parse_speed_unit(max_speed)
|
|
return max_speed
|
|
|
|
def geocode_maxspeed(tags, location_info):
|
|
max_speed = None
|
|
try:
|
|
geocode_country = location_info.get('country', '')
|
|
geocode_region = location_info.get('region', '')
|
|
|
|
country_rules = DEFAULT_SPEEDS_BY_REGION.get(geocode_country, {})
|
|
country_defaults = country_rules.get('Default', [])
|
|
for rule in country_defaults:
|
|
rule_valid = all(
|
|
tag_name in tags
|
|
and tags[tag_name] == value
|
|
for tag_name, value in six.iteritems(rule['tags'])
|
|
)
|
|
if rule_valid:
|
|
max_speed = rule['speed']
|
|
break #stop searching country
|
|
|
|
region_rules = country_rules.get(geocode_region, [])
|
|
for rule in region_rules:
|
|
rule_valid = all(
|
|
tag_name in tags
|
|
and tags[tag_name] == value
|
|
for tag_name, value in six.iteritems(rule['tags'])
|
|
)
|
|
if rule_valid:
|
|
max_speed = rule['speed']
|
|
break #stop searching region
|
|
except KeyError:
|
|
pass
|
|
max_speed = parse_speed_unit(max_speed)
|
|
return max_speed
|
|
|
|
class Way:
|
|
def __init__(self, way, query_results):
|
|
self.id = way.id
|
|
self.way = way
|
|
self.query_results = query_results
|
|
|
|
points = list()
|
|
|
|
for node in self.way.get_nodes(resolve_missing=False):
|
|
points.append((float(node.lat), float(node.lon), 0.))
|
|
|
|
self.points = np.asarray(points)
|
|
|
|
@classmethod
|
|
def closest(cls, query_results, lat, lon, heading, prev_way=None):
|
|
results, tree, real_nodes, node_to_way, location_info = query_results
|
|
|
|
cur_pos = geodetic2ecef((lat, lon, 0))
|
|
nodes = tree.query_ball_point(cur_pos, 500)
|
|
|
|
# If no nodes within 500m, choose closest one
|
|
if not nodes:
|
|
nodes = [tree.query(cur_pos)[1]]
|
|
|
|
ways = []
|
|
for n in nodes:
|
|
real_node = real_nodes[n]
|
|
ways += node_to_way[real_node.id]
|
|
ways = set(ways)
|
|
|
|
closest_way = None
|
|
best_score = None
|
|
for way in ways:
|
|
way = Way(way, query_results)
|
|
points = way.points_in_car_frame(lat, lon, heading)
|
|
|
|
on_way = way.on_way(lat, lon, heading, points)
|
|
if not on_way:
|
|
continue
|
|
|
|
# Create mask of points in front and behind
|
|
x = points[:, 0]
|
|
y = points[:, 1]
|
|
angles = np.arctan2(y, x)
|
|
front = np.logical_and((-np.pi / 2) < angles,
|
|
angles < (np.pi / 2))
|
|
behind = np.logical_not(front)
|
|
|
|
dists = np.linalg.norm(points, axis=1)
|
|
|
|
# Get closest point behind the car
|
|
dists_behind = np.copy(dists)
|
|
dists_behind[front] = np.NaN
|
|
closest_behind = points[np.nanargmin(dists_behind)]
|
|
|
|
# Get closest point in front of the car
|
|
dists_front = np.copy(dists)
|
|
dists_front[behind] = np.NaN
|
|
closest_front = points[np.nanargmin(dists_front)]
|
|
|
|
# fit line: y = a*x + b
|
|
x1, y1, _ = closest_behind
|
|
x2, y2, _ = closest_front
|
|
a = (y2 - y1) / max((x2 - x1), 1e-5)
|
|
b = y1 - a * x1
|
|
|
|
# With a factor of 60 a 20m offset causes the same error as a 20 degree heading error
|
|
# (A 20 degree heading offset results in an a of about 1/3)
|
|
score = abs(a) * 60. + abs(b)
|
|
|
|
# Prefer same type of road
|
|
if prev_way is not None:
|
|
if way.way.tags.get('highway', '') == prev_way.way.tags.get('highway', ''):
|
|
score *= 0.5
|
|
|
|
if closest_way is None or score < best_score:
|
|
closest_way = way
|
|
best_score = score
|
|
|
|
return closest_way
|
|
|
|
def __str__(self):
|
|
return "%s %s" % (self.id, self.way.tags)
|
|
|
|
def max_speed(self):
|
|
"""Extracts the (conditional) speed limit from a way"""
|
|
if not self.way:
|
|
return None
|
|
|
|
max_speed = parse_speed_tags(self.way.tags)
|
|
if not max_speed:
|
|
location_info = self.query_results[4]
|
|
max_speed = geocode_maxspeed(self.way.tags, location_info)
|
|
|
|
return max_speed
|
|
|
|
def max_speed_ahead(self, current_speed_limit, lat, lon, heading, lookahead):
|
|
"""Look ahead for a max speed"""
|
|
if not self.way:
|
|
return None
|
|
|
|
speed_ahead = None
|
|
speed_ahead_dist = None
|
|
lookahead_ways = 5
|
|
way = self
|
|
for i in range(lookahead_ways):
|
|
way_pts = way.points_in_car_frame(lat, lon, heading)
|
|
|
|
# Check current lookahead distance
|
|
max_dist = np.linalg.norm(way_pts[-1, :])
|
|
|
|
if max_dist > 2 * lookahead:
|
|
break
|
|
|
|
if 'maxspeed' in way.way.tags:
|
|
spd = parse_speed_tags(way.way.tags)
|
|
if not spd:
|
|
location_info = self.query_results[4]
|
|
spd = geocode_maxspeed(way.way.tags, location_info)
|
|
if spd < current_speed_limit:
|
|
speed_ahead = spd
|
|
min_dist = np.linalg.norm(way_pts[1, :])
|
|
speed_ahead_dist = min_dist
|
|
break
|
|
# Find next way
|
|
way = way.next_way()
|
|
if not way:
|
|
break
|
|
|
|
return speed_ahead, speed_ahead_dist
|
|
|
|
def advisory_max_speed(self):
|
|
if not self.way:
|
|
return None
|
|
|
|
tags = self.way.tags
|
|
adv_speed = None
|
|
|
|
if 'maxspeed:advisory' in tags:
|
|
adv_speed = tags['maxspeed:advisory']
|
|
adv_speed = parse_speed_unit(adv_speed)
|
|
return adv_speed
|
|
|
|
def on_way(self, lat, lon, heading, points=None):
|
|
if points is None:
|
|
points = self.points_in_car_frame(lat, lon, heading)
|
|
x = points[:, 0]
|
|
return np.min(x) < 0. and np.max(x) > 0.
|
|
|
|
def closest_point(self, lat, lon, heading, points=None):
|
|
if points is None:
|
|
points = self.points_in_car_frame(lat, lon, heading)
|
|
i = np.argmin(np.linalg.norm(points, axis=1))
|
|
return points[i]
|
|
|
|
def distance_to_closest_node(self, lat, lon, heading, points=None):
|
|
if points is None:
|
|
points = self.points_in_car_frame(lat, lon, heading)
|
|
return np.min(np.linalg.norm(points, axis=1))
|
|
|
|
def points_in_car_frame(self, lat, lon, heading):
|
|
lc = LocalCoord.from_geodetic([lat, lon, 0.])
|
|
|
|
# Build rotation matrix
|
|
heading = math.radians(-heading + 90)
|
|
c, s = np.cos(heading), np.sin(heading)
|
|
rot = np.array([[c, s, 0.], [-s, c, 0.], [0., 0., 1.]])
|
|
|
|
# Convert to local coordinates
|
|
points_carframe = lc.geodetic2ned(self.points).T
|
|
|
|
# Rotate with heading of car
|
|
points_carframe = np.dot(rot, points_carframe[(1, 0, 2), :]).T
|
|
|
|
return points_carframe
|
|
|
|
def next_way(self, backwards=False):
|
|
results, tree, real_nodes, node_to_way, location_info = self.query_results
|
|
|
|
if backwards:
|
|
node = self.way.nodes[0]
|
|
else:
|
|
node = self.way.nodes[-1]
|
|
|
|
ways = node_to_way[node.id]
|
|
|
|
way = None
|
|
try:
|
|
# Simple heuristic to find next way
|
|
ways = [w for w in ways if w.id != self.id]
|
|
ways = [w for w in ways if w.nodes[0] == node]
|
|
|
|
# Filter on highway tag
|
|
acceptable_tags = list()
|
|
cur_tag = self.way.tags['highway']
|
|
acceptable_tags.append(cur_tag)
|
|
if cur_tag == 'motorway_link':
|
|
acceptable_tags.append('motorway')
|
|
acceptable_tags.append('trunk')
|
|
acceptable_tags.append('primary')
|
|
ways = [w for w in ways if w.tags['highway'] in acceptable_tags]
|
|
|
|
# Filter on number of lanes
|
|
cur_num_lanes = int(self.way.tags['lanes'])
|
|
if len(ways) > 1:
|
|
ways_same_lanes = [w for w in ways if int(w.tags['lanes']) == cur_num_lanes]
|
|
if len(ways_same_lanes) == 1:
|
|
ways = ways_same_lanes
|
|
if len(ways) > 1:
|
|
ways = [w for w in ways if int(w.tags['lanes']) > cur_num_lanes]
|
|
if len(ways) == 1:
|
|
way = Way(ways[0], self.query_results)
|
|
|
|
except (KeyError, ValueError):
|
|
pass
|
|
|
|
return way
|
|
|
|
def get_lookahead(self, lat, lon, heading, lookahead):
|
|
pnts = None
|
|
way = self
|
|
valid = False
|
|
|
|
for i in range(5):
|
|
# Get new points and append to list
|
|
new_pnts = way.points_in_car_frame(lat, lon, heading)
|
|
|
|
if pnts is None:
|
|
pnts = new_pnts
|
|
else:
|
|
pnts = np.vstack([pnts, new_pnts])
|
|
|
|
# Check current lookahead distance
|
|
max_dist = np.linalg.norm(pnts[-1, :])
|
|
if max_dist > lookahead:
|
|
valid = True
|
|
|
|
if max_dist > 2 * lookahead:
|
|
break
|
|
|
|
# Find next way
|
|
way = way.next_way()
|
|
if not way:
|
|
break
|
|
|
|
return pnts, valid
|
|
|