diff --git a/selfdrive/selfdrived/helpers.py b/selfdrive/selfdrived/helpers.py new file mode 100644 index 0000000000..8b4213fcb8 --- /dev/null +++ b/selfdrive/selfdrived/helpers.py @@ -0,0 +1,53 @@ +import math +from enum import StrEnum, auto + +from cereal import car, messaging +from openpilot.common.realtime import DT_CTRL +from openpilot.selfdrive.locationd.helpers import Pose +from opendbc.car import ACCELERATION_DUE_TO_GRAVITY +from opendbc.car.interfaces import ACCEL_MIN, ACCEL_MAX, ISO_LATERAL_ACCEL + +MIN_EXCESSIVE_ACTUATION_COUNT = int(0.25 / DT_CTRL) +MIN_LATERAL_ENGAGE_BUFFER = int(1 / DT_CTRL) + + +class ExcessiveActuationType(StrEnum): + LONGITUDINAL = auto() + LATERAL = auto() + + +class ExcessiveActuationCheck: + def __init__(self): + self._excessive_counter = 0 + self._engaged_counter = 0 + + def update(self, sm: messaging.SubMaster, CS: car.CarState, calibrated_pose: Pose) -> ExcessiveActuationType | None: + # CS.aEgo can be noisy to bumps in the road, transitioning from standstill, losing traction, etc. + # longitudinal + accel_calibrated = calibrated_pose.acceleration.x + excessive_long_actuation = sm['carControl'].longActive and (accel_calibrated > ACCEL_MAX * 2 or accel_calibrated < ACCEL_MIN * 2) + + # lateral + yaw_rate = calibrated_pose.angular_velocity.yaw + roll = sm['liveParameters'].roll + roll_compensated_lateral_accel = (CS.vEgo * yaw_rate) - (math.sin(roll) * ACCELERATION_DUE_TO_GRAVITY) + + # Prevent false positives after overriding + excessive_lat_actuation = False + self._engaged_counter = self._engaged_counter + 1 if sm['carControl'].latActive and not CS.steeringPressed else 0 + if self._engaged_counter > MIN_LATERAL_ENGAGE_BUFFER: + if abs(roll_compensated_lateral_accel) > ISO_LATERAL_ACCEL * 2: + excessive_lat_actuation = True + + # livePose acceleration can be noisy due to bad mounting or aliased livePose measurements + livepose_valid = abs(CS.aEgo - accel_calibrated) < 2 + self._excessive_counter = self._excessive_counter + 1 if livepose_valid and (excessive_long_actuation or excessive_lat_actuation) else 0 + + excessive_type = None + if self._excessive_counter > MIN_EXCESSIVE_ACTUATION_COUNT: + if excessive_long_actuation: + excessive_type = ExcessiveActuationType.LONGITUDINAL + else: + excessive_type = ExcessiveActuationType.LATERAL + + return excessive_type diff --git a/selfdrive/selfdrived/selfdrived.py b/selfdrive/selfdrived/selfdrived.py index 61b98e9886..89fa36c906 100755 --- a/selfdrive/selfdrived/selfdrived.py +++ b/selfdrive/selfdrived/selfdrived.py @@ -7,7 +7,6 @@ import cereal.messaging as messaging from cereal import car, log from msgq.visionipc import VisionIpcClient, VisionStreamType -from opendbc.car.interfaces import ACCEL_MIN, ACCEL_MAX from openpilot.common.params import Params @@ -18,6 +17,7 @@ from openpilot.common.gps import get_gps_location_service from openpilot.selfdrive.car.car_specific import CarSpecificEvents from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose from openpilot.selfdrive.selfdrived.events import Events, ET +from openpilot.selfdrive.selfdrived.helpers import ExcessiveActuationCheck from openpilot.selfdrive.selfdrived.state import StateMachine from openpilot.selfdrive.selfdrived.alertmanager import AlertManager, set_offroad_alert @@ -29,7 +29,6 @@ SIMULATION = "SIMULATION" in os.environ TESTING_CLOSET = "TESTING_CLOSET" in os.environ LONGITUDINAL_PERSONALITY_MAP = {v: k for k, v in log.LongitudinalPersonality.schema.enumerants.items()} -MIN_EXCESSIVE_ACTUATION_COUNT = int(0.25 / DT_CTRL) ThermalStatus = log.DeviceState.ThermalStatus State = log.SelfdriveState.OpenpilotState @@ -43,19 +42,6 @@ SafetyModel = car.CarParams.SafetyModel IGNORED_SAFETY_MODES = (SafetyModel.silent, SafetyModel.noOutput) -def check_excessive_actuation(sm: messaging.SubMaster, CS: car.CarState, calibrated_pose: Pose, counter: int) -> tuple[int, bool]: - # CS.aEgo can be noisy to bumps in the road, transitioning from standstill, losing traction, etc. - accel_calibrated = calibrated_pose.acceleration.x - - # livePose acceleration can be noisy due to bad mounting or aliased livePose measurements - accel_valid = abs(CS.aEgo - accel_calibrated) < 2 - - excessive_actuation = accel_calibrated > ACCEL_MAX * 2 or accel_calibrated < ACCEL_MIN * 2 - counter = counter + 1 if sm['carControl'].longActive and excessive_actuation and accel_valid else 0 - - return counter, counter > MIN_EXCESSIVE_ACTUATION_COUNT - - class SelfdriveD: def __init__(self, CP=None): self.params = Params() @@ -74,6 +60,8 @@ class SelfdriveD: self.pose_calibrator = PoseCalibrator() self.calibrated_pose: Pose | None = None + self.excessive_actuation_check = ExcessiveActuationCheck() + self.excessive_actuation = self.params.get("Offroad_ExcessiveActuation") is not None # Setup sockets self.pm = messaging.PubMaster(['selfdriveState', 'onroadEvents']) @@ -131,8 +119,6 @@ class SelfdriveD: self.experimental_mode = False self.personality = self.params.get("LongitudinalPersonality", return_default=True) self.recalibrating_seen = False - self.excessive_actuation = self.params.get("Offroad_ExcessiveActuation") is not None - self.excessive_actuation_counter = 0 self.state_machine = StateMachine() self.rk = Ratekeeper(100, print_delay_threshold=None) @@ -252,7 +238,7 @@ class SelfdriveD: if self.sm['driverAssistance'].leftLaneDeparture or self.sm['driverAssistance'].rightLaneDeparture: self.events.add(EventName.ldw) - # Check for excessive (longitudinal) actuation + # Check for excessive actuation if self.sm.updated['liveCalibration']: self.pose_calibrator.feed_live_calib(self.sm['liveCalibration']) if self.sm.updated['livePose']: @@ -260,9 +246,9 @@ class SelfdriveD: self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose) if self.calibrated_pose is not None: - self.excessive_actuation_counter, excessive_actuation = check_excessive_actuation(self.sm, CS, self.calibrated_pose, self.excessive_actuation_counter) - if not self.excessive_actuation and excessive_actuation: - set_offroad_alert("Offroad_ExcessiveActuation", True, extra_text="longitudinal") + excessive_actuation = self.excessive_actuation_check.update(self.sm, CS, self.calibrated_pose) + if not self.excessive_actuation and excessive_actuation is not None: + set_offroad_alert("Offroad_ExcessiveActuation", True, extra_text=str(excessive_actuation)) self.excessive_actuation = True if self.excessive_actuation: