openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
6.5 KiB

#!/usr/bin/env python3
import os
import sys
import numpy as np
import sympy as sp
from rednose.helpers import TEMPLATE_DIR, load_code, write_code
from rednose.helpers.sympy_helpers import quat_rotate, sympy_into_c, rot_matrix, rotations_from_quats
def generate_residual(K):
x_sym = sp.MatrixSymbol('abr', 3, 1)
poses_sym = sp.MatrixSymbol('poses', 7 * K, 1)
img_pos_sym = sp.MatrixSymbol('img_positions', 2 * K, 1)
alpha, beta, rho = x_sym
to_c = sp.Matrix(rot_matrix(-np.pi / 2, -np.pi / 2, 0))
pos_0 = sp.Matrix(np.array(poses_sym[K * 7 - 7:K * 7 - 4])[:, 0])
q = poses_sym[K * 7 - 4:K * 7]
quat_rot = quat_rotate(*q)
rot_g_to_0 = to_c * quat_rot.T
rows = []
for i in range(K):
pos_i = sp.Matrix(np.array(poses_sym[i * 7:i * 7 + 3])[:, 0])
q = poses_sym[7 * i + 3:7 * i + 7]
quat_rot = quat_rotate(*q)
rot_g_to_i = to_c * quat_rot.T
rot_0_to_i = rot_g_to_i * rot_g_to_0.T
trans_0_to_i = rot_g_to_i * (pos_0 - pos_i)
funct_vec = rot_0_to_i * sp.Matrix([alpha, beta, 1]) + rho * trans_0_to_i
h1, h2, h3 = funct_vec
rows.append(h1 / h3 - img_pos_sym[i * 2 + 0])
rows.append(h2 / h3 - img_pos_sym[i * 2 + 1])
img_pos_residual_sym = sp.Matrix(rows)
# sympy into c
sympy_functions = []
sympy_functions.append(('res_fun', img_pos_residual_sym, [x_sym, poses_sym, img_pos_sym]))
sympy_functions.append(('jac_fun', img_pos_residual_sym.jacobian(x_sym), [x_sym, poses_sym, img_pos_sym]))
return sympy_functions
class LstSqComputer():
name = 'pos_computer'
@staticmethod
def generate_code(generated_dir, K=4):
sympy_functions = generate_residual(K)
header, sympy_code = sympy_into_c(sympy_functions)
code = "\n#include \"rednose/helpers/common_ekf.h\"\n"
code += "\n#define KDIM %d\n" % K
code += "extern \"C\" {\n"
code += sympy_code
code += "\n" + open(os.path.join(TEMPLATE_DIR, "compute_pos.c"), encoding='utf-8').read() + "\n"
code += "}\n"
header += "\nvoid compute_pos(double *to_c, double *in_poses, double *in_img_positions, double *param, double *pos);\n"
filename = f"{LstSqComputer.name}_{K}"
write_code(generated_dir, filename, code, header)
def __init__(self, generated_dir, K=4, MIN_DEPTH=2, MAX_DEPTH=500):
self.to_c = rot_matrix(-np.pi / 2, -np.pi / 2, 0)
self.MAX_DEPTH = MAX_DEPTH
self.MIN_DEPTH = MIN_DEPTH
name = f"{LstSqComputer.name}_{K}"
ffi, lib = load_code(generated_dir, name)
# wrap c functions
def residual_jac(x, poses, img_positions):
out = np.zeros(((K * 2, 3)), dtype=np.float64)
lib.jac_fun(ffi.cast("double *", x.ctypes.data),
ffi.cast("double *", poses.ctypes.data),
ffi.cast("double *", img_positions.ctypes.data),
ffi.cast("double *", out.ctypes.data))
return out
self.residual_jac = residual_jac
def residual(x, poses, img_positions):
out = np.zeros((K * 2), dtype=np.float64)
lib.res_fun(ffi.cast("double *", x.ctypes.data),
ffi.cast("double *", poses.ctypes.data),
ffi.cast("double *", img_positions.ctypes.data),
ffi.cast("double *", out.ctypes.data))
return out
self.residual = residual
def compute_pos_c(poses, img_positions):
pos = np.zeros(3, dtype=np.float64)
param = np.zeros(3, dtype=np.float64)
# Can't be a view for the ctype
img_positions = np.copy(img_positions)
lib.compute_pos(ffi.cast("double *", self.to_c.ctypes.data),
ffi.cast("double *", poses.ctypes.data),
ffi.cast("double *", img_positions.ctypes.data),
ffi.cast("double *", param.ctypes.data),
ffi.cast("double *", pos.ctypes.data))
return pos, param
self.compute_pos_c = compute_pos_c
def compute_pos(self, poses, img_positions, debug=False):
pos, param = self.compute_pos_c(poses, img_positions)
# pos, param = self.compute_pos_python(poses, img_positions)
depth = 1 / param[2]
if debug:
# orient_err_jac = self.orient_error_jac(param, poses, img_positions, np.zeros(3)).reshape((-1,2,3))
jac = self.residual_jac(param, poses, img_positions).reshape((-1, 2, 3))
res = self.residual(param, poses, img_positions).reshape((-1, 2))
return pos, param, res, jac # , orient_err_jac
elif (self.MIN_DEPTH < depth < self.MAX_DEPTH):
return pos
else:
return None
def gauss_newton(self, fun, jac, x, args):
poses, img_positions = args
delta = 1
counter = 0
while abs(np.linalg.norm(delta)) > 1e-4 and counter < 30:
delta = np.linalg.pinv(jac(x, poses, img_positions)).dot(fun(x, poses, img_positions))
x = x - delta
counter += 1
return [x]
def compute_pos_python(self, poses, img_positions, check_quality=False):
import scipy.optimize as opt
# This procedure is also described
# in the MSCKF paper (Mourikis et al. 2007)
x = np.array([img_positions[-1][0],
img_positions[-1][1], 0.1])
res = opt.leastsq(self.residual, x, Dfun=self.residual_jac, args=(poses, img_positions)) # scipy opt
# res = self.gauss_newton(self.residual, self.residual_jac, x, (poses, img_positions)) # diy gauss_newton
alpha, beta, rho = res[0]
rot_0_to_g = (rotations_from_quats(poses[-1, 3:])).dot(self.to_c.T)
return (rot_0_to_g.dot(np.array([alpha, beta, 1]))) / rho + poses[-1, :3]
# EXPERIMENTAL CODE
def unroll_shutter(img_positions, poses, v, rot_rates, ecef_pos):
# only speed correction for now
t_roll = 0.016 # 16ms rolling shutter?
vroll, vpitch, vyaw = rot_rates
A = 0.5 * np.array([[-1, -vroll, -vpitch, -vyaw],
[vroll, 0, vyaw, -vpitch],
[vpitch, -vyaw, 0, vroll],
[vyaw, vpitch, -vroll, 0]])
q_dot = A.dot(poses[-1][3:7])
v = np.append(v, q_dot)
v = np.array([v[0], v[1], v[2], 0, 0, 0, 0])
current_pose = poses[-1] + v * 0.05
poses = np.vstack((current_pose, poses))
dt = -img_positions[:, 1] * t_roll / 0.48
errs = project(poses, ecef_pos) - project(poses + np.atleast_2d(dt).T.dot(np.atleast_2d(v)), ecef_pos)
return img_positions - errs
def project(poses, ecef_pos):
img_positions = np.zeros((len(poses), 2))
for i, p in enumerate(poses):
cam_frame = rotations_from_quats(p[3:]).T.dot(ecef_pos - p[:3])
img_positions[i] = np.array([cam_frame[1] / cam_frame[0], cam_frame[2] / cam_frame[0]])
return img_positions
if __name__ == "__main__":
K = int(sys.argv[1].split("_")[-1])
generated_dir = sys.argv[2]
LstSqComputer.generate_code(generated_dir, K=K)