|
|
|
@ -14,9 +14,12 @@ from openpilot.common.realtime import DT_CTRL |
|
|
|
|
from openpilot.selfdrive.boardd.boardd import can_list_to_can_capnp |
|
|
|
|
from openpilot.selfdrive.car.car_helpers import get_car, get_one_can |
|
|
|
|
from openpilot.selfdrive.car.interfaces import CarInterfaceBase |
|
|
|
|
from openpilot.selfdrive.controls.lib.events import Events |
|
|
|
|
|
|
|
|
|
REPLAY = "REPLAY" in os.environ |
|
|
|
|
|
|
|
|
|
EventName = car.CarEvent.EventName |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CarD: |
|
|
|
|
CI: CarInterfaceBase |
|
|
|
@ -47,9 +50,9 @@ class CarD: |
|
|
|
|
self.CI, self.CP = CI, CI.CP |
|
|
|
|
|
|
|
|
|
# set alternative experiences from parameters |
|
|
|
|
disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator") |
|
|
|
|
self.disengage_on_accelerator = self.params.get_bool("DisengageOnAccelerator") |
|
|
|
|
self.CP.alternativeExperience = 0 |
|
|
|
|
if not disengage_on_accelerator: |
|
|
|
|
if not self.disengage_on_accelerator: |
|
|
|
|
self.CP.alternativeExperience |= ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS |
|
|
|
|
|
|
|
|
|
openpilot_enabled_toggle = self.params.get_bool("OpenpilotEnabledToggle") |
|
|
|
@ -73,6 +76,9 @@ class CarD: |
|
|
|
|
self.params.put_nonblocking("CarParamsCache", cp_bytes) |
|
|
|
|
self.params.put_nonblocking("CarParamsPersistent", cp_bytes) |
|
|
|
|
|
|
|
|
|
self.CS_prev = car.CarState.new_message() |
|
|
|
|
self.events = Events() |
|
|
|
|
|
|
|
|
|
def initialize(self): |
|
|
|
|
"""Initialize CarInterface, once controls are ready""" |
|
|
|
|
self.CI.init(self.CP, self.can_sock, self.pm.sock['sendcan']) |
|
|
|
@ -100,10 +106,26 @@ class CarD: |
|
|
|
|
if can_rcv_valid and REPLAY: |
|
|
|
|
self.can_log_mono_time = messaging.log_from_bytes(can_strs[0]).logMonoTime |
|
|
|
|
|
|
|
|
|
self.update_events(CS) |
|
|
|
|
self.state_publish(CS) |
|
|
|
|
|
|
|
|
|
CS = CS.as_reader() |
|
|
|
|
self.CS_prev = CS |
|
|
|
|
return CS |
|
|
|
|
|
|
|
|
|
def update_events(self, CS: car.CarState) -> car.CarState: |
|
|
|
|
self.events.clear() |
|
|
|
|
|
|
|
|
|
self.events.add_from_msg(CS.events) |
|
|
|
|
|
|
|
|
|
# Disable on rising edge of accelerator or brake. Also disable on brake when speed > 0 |
|
|
|
|
if (CS.gasPressed and not self.CS_prev.gasPressed and self.disengage_on_accelerator) or \ |
|
|
|
|
(CS.brakePressed and (not self.CS_prev.brakePressed or not CS.standstill)) or \ |
|
|
|
|
(CS.regenBraking and (not self.CS_prev.regenBraking or not CS.standstill)): |
|
|
|
|
self.events.add(EventName.pedalPressed) |
|
|
|
|
|
|
|
|
|
CS.events = self.events.to_msg() |
|
|
|
|
|
|
|
|
|
def state_publish(self, CS: car.CarState): |
|
|
|
|
"""carState and carParams publish loop""" |
|
|
|
|
|
|
|
|
|