small calibration refactor + tests (#2641)

* calibration tests

* read from capnp

* save using cereal struct

* reset calibration if new car

* car params
old-commit-hash: ee43eb552b
commatwo_master
Adeeb Shihadeh 4 years ago committed by GitHub
parent c6b35839d6
commit 7ec1329d53
  1. 89
      selfdrive/locationd/calibrationd.py
  2. 36
      selfdrive/locationd/test/test_calibrationd.py

@ -7,16 +7,18 @@ and the image input into the neural network is not corrected for roll.
''' '''
import os import os
import capnp
import copy import copy
import json import json
import numpy as np import numpy as np
import cereal.messaging as messaging import cereal.messaging as messaging
from selfdrive.config import Conversions as CV from cereal import car, log
from selfdrive.swaglog import cloudlog
from common.params import Params, put_nonblocking from common.params import Params, put_nonblocking
from common.transformations.model import model_height from common.transformations.model import model_height
from common.transformations.camera import get_view_frame_from_road_frame from common.transformations.camera import get_view_frame_from_road_frame
from common.transformations.orientation import rot_from_euler, euler_from_rot from common.transformations.orientation import rot_from_euler, euler_from_rot
from selfdrive.config import Conversions as CV
from selfdrive.swaglog import cloudlog
MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS
MAX_VEL_ANGLE_STD = np.radians(0.25) MAX_VEL_ANGLE_STD = np.radians(0.25)
@ -59,18 +61,32 @@ class Calibrator():
self.param_put = param_put self.param_put = param_put
# Read saved calibration # Read saved calibration
calibration_params = Params().get("CalibrationParams") params = Params()
calibration_params = params.get("CalibrationParams")
rpy_init = RPY_INIT rpy_init = RPY_INIT
valid_blocks = 0 valid_blocks = 0
cached_params = params.get("CarParamsCache")
if cached_params is not None:
CP = car.CarParams.from_bytes(params.get("CarParams", block=True))
cached_params = car.CarParams.from_bytes(cached_params)
if cached_params.carFingerprint != CP.carFingerprint:
calibration_params = None
if param_put and calibration_params: if param_put and calibration_params:
try: try:
msg = log.Event.from_bytes(calibration_params)
rpy_init = list(msg.liveCalibration.rpyCalib)
valid_blocks = msg.liveCalibration.validBlocks
except (ValueError, capnp.lib.capnp.KjException):
# TODO: remove this after next release
calibration_params = json.loads(calibration_params) calibration_params = json.loads(calibration_params)
rpy_init = calibration_params["calib_radians"] rpy_init = calibration_params["calib_radians"]
valid_blocks = calibration_params['valid_blocks'] valid_blocks = calibration_params['valid_blocks']
except Exception: except Exception:
cloudlog.exception("CalibrationParams file found but error encountered") cloudlog.exception("CalibrationParams file found but error encountered")
self.reset(rpy_init, valid_blocks) self.reset(rpy_init, valid_blocks)
self.update_status() self.update_status()
@ -118,10 +134,7 @@ class Calibrator():
write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5)
if self.param_put and write_this_cycle: if self.param_put and write_this_cycle:
# TODO: this should use the liveCalibration struct from cereal put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
cal_params = {"calib_radians": list(self.rpy),
"valid_blocks": int(self.valid_blocks)}
put_nonblocking("CalibrationParams", json.dumps(cal_params).encode('utf8'))
def handle_v_ego(self, v_ego): def handle_v_ego(self, v_ego):
self.v_ego = v_ego self.v_ego = v_ego
@ -138,42 +151,44 @@ class Calibrator():
straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER)) straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
certain_if_calib = ((np.arctan2(trans_std[1], trans[0]) < MAX_VEL_ANGLE_STD) or certain_if_calib = ((np.arctan2(trans_std[1], trans[0]) < MAX_VEL_ANGLE_STD) or
(self.valid_blocks < INPUTS_NEEDED)) (self.valid_blocks < INPUTS_NEEDED))
if straight_and_fast and certain_if_calib:
observed_rpy = np.array([0, if not (straight_and_fast and certain_if_calib):
-np.arctan2(trans[2], trans[0]),
np.arctan2(trans[1], trans[0])])
new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy)))
new_rpy = sanity_clip(new_rpy)
self.rpys[self.block_idx] = (self.idx*self.rpys[self.block_idx] + (BLOCK_SIZE - self.idx) * new_rpy) / float(BLOCK_SIZE)
self.idx = (self.idx + 1) % BLOCK_SIZE
if self.idx == 0:
self.block_idx += 1
self.valid_blocks = max(self.block_idx, self.valid_blocks)
self.block_idx = self.block_idx % INPUTS_WANTED
if self.valid_blocks > 0:
self.rpy = np.mean(self.rpys[:self.valid_blocks], axis=0)
self.update_status()
return new_rpy
else:
return None return None
def send_data(self, pm): observed_rpy = np.array([0,
-np.arctan2(trans[2], trans[0]),
np.arctan2(trans[1], trans[0])])
new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy)))
new_rpy = sanity_clip(new_rpy)
self.rpys[self.block_idx] = (self.idx*self.rpys[self.block_idx] + (BLOCK_SIZE - self.idx) * new_rpy) / float(BLOCK_SIZE)
self.idx = (self.idx + 1) % BLOCK_SIZE
if self.idx == 0:
self.block_idx += 1
self.valid_blocks = max(self.block_idx, self.valid_blocks)
self.block_idx = self.block_idx % INPUTS_WANTED
if self.valid_blocks > 0:
self.rpy = np.mean(self.rpys[:self.valid_blocks], axis=0)
self.update_status()
return new_rpy
def get_msg(self):
smooth_rpy = self.get_smooth_rpy() smooth_rpy = self.get_smooth_rpy()
extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height) extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height)
cal_send = messaging.new_message('liveCalibration') msg = messaging.new_message('liveCalibration')
cal_send.liveCalibration.validBlocks = self.valid_blocks msg.liveCalibration.validBlocks = self.valid_blocks
cal_send.liveCalibration.calStatus = self.cal_status msg.liveCalibration.calStatus = self.cal_status
cal_send.liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100) msg.liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100)
cal_send.liveCalibration.extrinsicMatrix = [float(x) for x in extrinsic_matrix.flatten()] msg.liveCalibration.extrinsicMatrix = [float(x) for x in extrinsic_matrix.flatten()]
cal_send.liveCalibration.rpyCalib = [float(x) for x in smooth_rpy] msg.liveCalibration.rpyCalib = [float(x) for x in smooth_rpy]
cal_send.liveCalibration.rpyCalibSpread = [float(x) for x in self.calib_spread] msg.liveCalibration.rpyCalibSpread = [float(x) for x in self.calib_spread]
return msg
pm.send('liveCalibration', cal_send) def send_data(self, pm):
pm.send('liveCalibration', self.get_msg())
def calibrationd_thread(sm=None, pm=None): def calibrationd_thread(sm=None, pm=None):

@ -0,0 +1,36 @@
#!/usr/bin/env python3
import json
import random
import unittest
import cereal.messaging as messaging
from common.params import Params
from selfdrive.locationd.calibrationd import Calibrator
class TestCalibrationd(unittest.TestCase):
def test_read_saved_params_json(self):
r = [random.random() for _ in range(3)]
b = random.randint(1, 10)
cal_params = {"calib_radians": r,
"valid_blocks": b}
Params().put("CalibrationParams", json.dumps(cal_params).encode('utf8'))
c = Calibrator(param_put=True)
self.assertEqual(r, c.rpy)
self.assertEqual(b, c.valid_blocks)
def test_read_saved_params(self):
msg = messaging.new_message('liveCalibration')
msg.liveCalibration.validBlocks = random.randint(1, 10)
msg.liveCalibration.rpyCalib = [random.random() for _ in range(3)]
Params().put("CalibrationParams", msg.to_bytes())
c = Calibrator(param_put=True)
self.assertEqual(list(msg.liveCalibration.rpyCalib), c.rpy)
self.assertEqual(msg.liveCalibration.validBlocks, c.valid_blocks)
if __name__ == "__main__":
unittest.main()
Loading…
Cancel
Save