calibrationd: add function typing (#24018)

old-commit-hash: 9f1c663ae2
taco
Dylan Herman 3 years ago committed by GitHub
parent b24d669861
commit c6f626d116
  1. 33
      selfdrive/locationd/calibrationd.py

@ -8,8 +8,9 @@ and the image input into the neural network is not corrected for roll.
import gc import gc
import os import os
import capnp
import numpy as np import numpy as np
from typing import NoReturn from typing import List, NoReturn, Optional
from cereal import log from cereal import log
import cereal.messaging as messaging import cereal.messaging as messaging
@ -46,11 +47,11 @@ class Calibration:
INVALID = 2 INVALID = 2
def is_calibration_valid(rpy): def is_calibration_valid(rpy: np.ndarray) -> bool:
return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1]) return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])
def sanity_clip(rpy): def sanity_clip(rpy: np.ndarray) -> np.ndarray:
if np.isnan(rpy).any(): if np.isnan(rpy).any():
rpy = RPY_INIT rpy = RPY_INIT
return np.array([rpy[0], return np.array([rpy[0],
@ -58,8 +59,8 @@ def sanity_clip(rpy):
np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)]) np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)])
class Calibrator(): class Calibrator:
def __init__(self, param_put=False): def __init__(self, param_put: bool = False):
self.param_put = param_put self.param_put = param_put
# Read saved calibration # Read saved calibration
@ -80,7 +81,7 @@ class Calibrator():
self.reset(rpy_init, valid_blocks) self.reset(rpy_init, valid_blocks)
self.update_status() self.update_status()
def reset(self, rpy_init=RPY_INIT, valid_blocks=0, smooth_from=None): def reset(self, rpy_init: np.ndarray = RPY_INIT, valid_blocks: int = 0, smooth_from: Optional[np.ndarray] = None) -> None:
if not np.isfinite(rpy_init).all(): if not np.isfinite(rpy_init).all():
self.rpy = RPY_INIT.copy() self.rpy = RPY_INIT.copy()
else: else:
@ -95,7 +96,7 @@ class Calibrator():
self.idx = 0 self.idx = 0
self.block_idx = 0 self.block_idx = 0
self.v_ego = 0 self.v_ego = 0.0
if smooth_from is None: if smooth_from is None:
self.old_rpy = RPY_INIT self.old_rpy = RPY_INIT
@ -104,13 +105,13 @@ class Calibrator():
self.old_rpy = smooth_from self.old_rpy = smooth_from
self.old_rpy_weight = 1.0 self.old_rpy_weight = 1.0
def get_valid_idxs(self): def get_valid_idxs(self) -> List[int]:
# exclude current block_idx from validity window # exclude current block_idx from validity window
before_current = list(range(self.block_idx)) before_current = list(range(self.block_idx))
after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks)) after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks))
return before_current + after_current return before_current + after_current
def update_status(self): def update_status(self) -> None:
valid_idxs = self.get_valid_idxs() valid_idxs = self.get_valid_idxs()
if valid_idxs: if valid_idxs:
rpys = self.rpys[valid_idxs] rpys = self.rpys[valid_idxs]
@ -137,16 +138,16 @@ class Calibrator():
if self.param_put and write_this_cycle: if self.param_put and write_this_cycle:
put_nonblocking("CalibrationParams", self.get_msg().to_bytes()) put_nonblocking("CalibrationParams", self.get_msg().to_bytes())
def handle_v_ego(self, v_ego): def handle_v_ego(self, v_ego: float) -> None:
self.v_ego = v_ego self.v_ego = v_ego
def get_smooth_rpy(self): def get_smooth_rpy(self) -> np.ndarray:
if self.old_rpy_weight > 0: if self.old_rpy_weight > 0:
return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy
else: else:
return self.rpy return self.rpy
def handle_cam_odom(self, trans, rot, trans_std): def handle_cam_odom(self, trans: List[float], rot: List[float], trans_std: List[float]) -> Optional[np.ndarray]:
self.old_rpy_weight = min(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES) self.old_rpy_weight = min(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES)
straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER)) straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
@ -176,7 +177,7 @@ class Calibrator():
return new_rpy return new_rpy
def get_msg(self): def get_msg(self) -> capnp.lib.capnp._DynamicStructBuilder:
smooth_rpy = self.get_smooth_rpy() smooth_rpy = self.get_smooth_rpy()
extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height) extrinsic_matrix = get_view_frame_from_road_frame(0, smooth_rpy[1], smooth_rpy[2], model_height)
@ -189,11 +190,11 @@ class Calibrator():
msg.liveCalibration.rpyCalibSpread = self.calib_spread.tolist() msg.liveCalibration.rpyCalibSpread = self.calib_spread.tolist()
return msg return msg
def send_data(self, pm) -> None: def send_data(self, pm: messaging.PubMaster) -> None:
pm.send('liveCalibration', self.get_msg()) pm.send('liveCalibration', self.get_msg())
def calibrationd_thread(sm=None, pm=None) -> NoReturn: def calibrationd_thread(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn:
gc.disable() gc.disable()
set_realtime_priority(1) set_realtime_priority(1)
@ -223,7 +224,7 @@ def calibrationd_thread(sm=None, pm=None) -> NoReturn:
calibrator.send_data(pm) calibrator.send_data(pm)
def main(sm=None, pm=None) -> NoReturn: def main(sm: Optional[messaging.SubMaster] = None, pm: Optional[messaging.PubMaster] = None) -> NoReturn:
calibrationd_thread(sm, pm) calibrationd_thread(sm, pm)

Loading…
Cancel
Save