You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
7.4 KiB
208 lines
7.4 KiB
import numpy as np
|
|
|
|
import common.filters as filters
|
|
from selfdrive.controls.lib.latcontrol import calc_curvature
|
|
|
|
|
|
# Calibration Status
|
|
class CalibStatus(object):
|
|
INCOMPLETE = 0
|
|
VALID = 1
|
|
INVALID = 2
|
|
|
|
|
|
def line_intersection(line1, line2, no_int_sub = [0,0]):
|
|
xdiff = (line1[0][0] - line1[1][0], line2[0][0] - line2[1][0])
|
|
ydiff = (line1[0][1] - line1[1][1], line2[0][1] - line2[1][1])
|
|
|
|
def det(a, b):
|
|
return a[0] * b[1] - a[1] * b[0]
|
|
|
|
div = det(xdiff, ydiff)
|
|
if div == 0:
|
|
# since we are in float domain, this should really never happen
|
|
return no_int_sub
|
|
|
|
d = (det(*line1), det(*line2))
|
|
x = det(d, xdiff) / div
|
|
y = det(d, ydiff) / div
|
|
return [x, y]
|
|
|
|
def points_inside_hit_box(pts, box):
|
|
"""Determine which points lie inside a box.
|
|
|
|
Inputs:
|
|
pts: An nx2 array of points to hit test.
|
|
box: An array [[x_left, y_top], [x_right, y_bottom]] describing a box to
|
|
use for hit testing.
|
|
Returns:
|
|
A logical array with true for every member of pts inside box.
|
|
"""
|
|
hits = np.all(np.logical_and(pts > box[0, :], pts < box[1, :]), axis=1)
|
|
return hits
|
|
|
|
def warp_points(pt_s, warp_matrix):
|
|
# pt_s are the source points, nx2 array.
|
|
pt_d = np.dot(warp_matrix[:, :2], pt_s.T) + warp_matrix[:, 2][:, np.newaxis]
|
|
|
|
# divide by third dimension for representation in image space.
|
|
return (pt_d[:2, :] / pt_d[2, :]).T
|
|
|
|
class ViewCalibrator(object):
|
|
def __init__(self, box_size, big_box_size, vp_r, warp_matrix_start, vp_f=None, cal_cycle=0, cal_status=0):
|
|
self.calibration_threshold = 3000
|
|
self.box_size = box_size
|
|
self.big_box_size = big_box_size
|
|
|
|
self.warp_matrix_start = warp_matrix_start
|
|
self.vp_r = list(vp_r)
|
|
|
|
if vp_f is None:
|
|
self.vp_f = list(vp_r)
|
|
else:
|
|
self.vp_f = list(vp_f)
|
|
|
|
# slow filter fot the vanishing point
|
|
vp_fr = 0.005 # Hz, slow filter
|
|
self.dt = 0.05 # camera runs at 20Hz
|
|
|
|
self.update_warp_matrix()
|
|
|
|
self.vp_x_filter = filters.FirstOrderLowpassFilter(vp_fr, self.dt, self.vp_f[0])
|
|
self.vp_y_filter = filters.FirstOrderLowpassFilter(vp_fr, self.dt, self.vp_f[1])
|
|
|
|
self.cal_cycle = cal_cycle
|
|
self.cal_status = cal_status
|
|
self.cal_perc = int(np.minimum(self.cal_cycle*100./self.calibration_threshold, 100))
|
|
|
|
def vanishing_point_process(self, old_ps, new_ps, v_ego, steer_angle, VP):
|
|
# correct diffs by yaw rate
|
|
cam_fov = 23.06*np.pi/180. # deg
|
|
curvature = calc_curvature(v_ego, steer_angle, VP)
|
|
yaw_rate = curvature * v_ego
|
|
hor_angle_shift = yaw_rate * self.dt * self.box_size[0] / cam_fov
|
|
old_ps += [hor_angle_shift, 0] # old points have moved in the image due to yaw rate
|
|
|
|
pos_ps = [None]*len(new_ps)
|
|
for ii in range(len(old_ps)):
|
|
xo = old_ps[ii][0]
|
|
yo = old_ps[ii][1]
|
|
yn = new_ps[ii][1]
|
|
|
|
# don't consider points with low flow in y
|
|
if abs(yn - yo) > 1:
|
|
if xo > (self.vp_f[0] + 20):
|
|
pos_ps[ii] = 'r' # right lane point
|
|
elif xo < (self.vp_f[0] - 20):
|
|
pos_ps[ii] = 'l' # left lane point
|
|
|
|
# intersect all the right lines with the left lines
|
|
idxs_l = [i for i, x in enumerate(pos_ps) if x == 'l']
|
|
idxs_r = [i for i, x in enumerate(pos_ps) if x == 'r']
|
|
|
|
old_ps_l, new_ps_l = old_ps[idxs_l], new_ps[idxs_l]
|
|
old_ps_r, new_ps_r = old_ps[idxs_r], new_ps[idxs_r]
|
|
# return None if there is one side with no lines, the speed is low or the steer angle is high
|
|
if len(old_ps_l) == 0 or len(old_ps_r) == 0 or v_ego < 20 or abs(steer_angle) > 5:
|
|
return None
|
|
|
|
int_ps = [[None] * len(old_ps_r)] * len(old_ps_l)
|
|
for ll in range(len(old_ps_l)):
|
|
for rr in range(len(old_ps_r)):
|
|
old_p_l, old_p_r, new_p_l, new_p_r = old_ps_l[ll], old_ps_r[
|
|
rr], new_ps_l[ll], new_ps_r[rr]
|
|
line_l = [[old_p_l[0], old_p_l[1]], [new_p_l[0], new_p_l[1]]]
|
|
line_r = [[old_p_r[0], old_p_r[1]], [new_p_r[0], new_p_r[1]]]
|
|
int_ps[ll][rr] = line_intersection(
|
|
line_l, line_r, no_int_sub=self.vp_f)
|
|
# saturate outliers that are too far from the estimated vp
|
|
int_ps[ll][rr][0] = np.clip(int_ps[ll][rr][0], self.vp_f[0] - 20, self.vp_f[0] + 20)
|
|
int_ps[ll][rr][1] = np.clip(int_ps[ll][rr][1], self.vp_f[1] - 30, self.vp_f[1] + 30)
|
|
vp = np.mean(np.mean(np.array(int_ps), axis=0), axis=0)
|
|
|
|
return vp
|
|
|
|
def calibration_validity(self):
|
|
# this function sanity checks that the small box is contained in the big box.
|
|
# otherwise the warp function will generate black spots on the small box
|
|
cp = np.asarray([[0, 0],
|
|
[self.box_size[0], 0],
|
|
[self.box_size[0], self.box_size[1]],
|
|
[0, self.box_size[1]]])
|
|
|
|
cpw = warp_points(cp, self.warp_matrix)
|
|
|
|
# pixel margin for validity hysteresys:
|
|
# - if calibration is good, keep it good until small box is inside the big box
|
|
# - if calibration isn't good, then make it good again if small box is in big box with margin
|
|
margin_px = 0 if self.cal_status == CalibStatus.VALID else 5
|
|
big_hit_box = np.asarray(
|
|
[[margin_px, margin_px],
|
|
[self.big_box_size[0], self.big_box_size[1] - margin_px]])
|
|
|
|
cpw_outside_big_box = np.logical_not(points_inside_hit_box(cpw, big_hit_box))
|
|
return not np.any(cpw_outside_big_box)
|
|
|
|
|
|
def get_calibration_hit_box(self):
|
|
"""Returns an axis-aligned hit box in canonical image space.
|
|
Points which do not fall within this box should not be used for
|
|
calibration.
|
|
|
|
Returns:
|
|
An array [[x_left, y_top], [x_right, y_bottom]] describing a box inside
|
|
which all calibration points should lie.
|
|
"""
|
|
# We mainly care about feature from lanes, so removed points from sky.
|
|
y_filter = 50.
|
|
return np.asarray([[0, y_filter], [self.box_size[0], self.box_size[1]]])
|
|
|
|
|
|
def update_warp_matrix(self):
|
|
translation_matrix = np.asarray(
|
|
[[1, 0, self.vp_f[0] - self.vp_r[0]],
|
|
[0, 1, self.vp_f[1] - self.vp_r[1]],
|
|
[0, 0, 1]])
|
|
self.warp_matrix = np.dot(translation_matrix, self.warp_matrix_start)
|
|
self.warp_matrix_inv = np.linalg.inv(self.warp_matrix)
|
|
|
|
def calibration(self, p0, p1, st, v_ego, steer_angle, VP):
|
|
# convert to np array first thing
|
|
p0 = np.asarray(p0)
|
|
p1 = np.asarray(p1)
|
|
st = np.asarray(st)
|
|
|
|
p0 = p0.reshape((-1,2))
|
|
p1 = p1.reshape((-1,2))
|
|
|
|
# filter out pts with bad status
|
|
p0 = p0[st==1]
|
|
p1 = p1[st==1]
|
|
|
|
calib_hit_box = self.get_calibration_hit_box()
|
|
# remove all the points outside the small box and above the horizon line
|
|
good_idxs = points_inside_hit_box(
|
|
warp_points(p0, self.warp_matrix_inv), calib_hit_box)
|
|
p0 = p0[good_idxs]
|
|
p1 = p1[good_idxs]
|
|
|
|
# print("unwarped points: {}".format(warp_points(p0, self.warp_matrix_inv)))
|
|
# print("good_idxs {}:".format(good_idxs))
|
|
|
|
# get instantaneous vp
|
|
vp = self.vanishing_point_process(p0, p1, v_ego, steer_angle, VP)
|
|
|
|
if vp is not None:
|
|
# filter the vanishing point
|
|
self.vp_f = [self.vp_x_filter(vp[0]), self.vp_y_filter(vp[1])]
|
|
self.cal_cycle += 1
|
|
|
|
if not self.calibration_validity():
|
|
self.cal_status = CalibStatus.INVALID
|
|
else:
|
|
# 10 minutes @5Hz TODO: make this threshold function of convergency speed
|
|
self.cal_status = CalibStatus.VALID
|
|
#self.cal_status = CalibStatus.VALID if self.cal_cycle > self.calibration_threshold else CalibStatus.INCOMPLETE
|
|
self.cal_perc = int(np.minimum(self.cal_cycle*100./self.calibration_threshold, 100))
|
|
|
|
self.update_warp_matrix()
|
|
|