|
|
@ -33,12 +33,17 @@ class CarInterfaceBase(ABC): |
|
|
|
self.low_speed_alert = False |
|
|
|
self.low_speed_alert = False |
|
|
|
self.silent_steer_warning = True |
|
|
|
self.silent_steer_warning = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.CS = None |
|
|
|
|
|
|
|
self.can_parsers = [] |
|
|
|
if CarState is not None: |
|
|
|
if CarState is not None: |
|
|
|
self.CS = CarState(CP) |
|
|
|
self.CS = CarState(CP) |
|
|
|
|
|
|
|
|
|
|
|
self.cp = self.CS.get_can_parser(CP) |
|
|
|
self.cp = self.CS.get_can_parser(CP) |
|
|
|
self.cp_cam = self.CS.get_cam_can_parser(CP) |
|
|
|
self.cp_cam = self.CS.get_cam_can_parser(CP) |
|
|
|
|
|
|
|
self.cp_adas = self.CS.get_adas_can_parser(CP) |
|
|
|
self.cp_body = self.CS.get_body_can_parser(CP) |
|
|
|
self.cp_body = self.CS.get_body_can_parser(CP) |
|
|
|
self.cp_loopback = self.CS.get_loopback_can_parser(CP) |
|
|
|
self.cp_loopback = self.CS.get_loopback_can_parser(CP) |
|
|
|
|
|
|
|
self.can_parsers = [self.cp, self.cp_cam, self.cp_adas, self.cp_body, self.cp_loopback] |
|
|
|
|
|
|
|
|
|
|
|
self.CC = None |
|
|
|
self.CC = None |
|
|
|
if CarController is not None: |
|
|
|
if CarController is not None: |
|
|
@ -100,9 +105,28 @@ class CarInterfaceBase(ABC): |
|
|
|
return ret |
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod |
|
|
|
@abstractmethod |
|
|
|
def update(self, c: car.CarControl, can_strings: List[bytes]) -> car.CarState: |
|
|
|
def _update(self, c: car.CarControl) -> car.CarState: |
|
|
|
pass |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update(self, c: car.CarControl, can_strings: List[bytes]) -> car.CarState: |
|
|
|
|
|
|
|
# parse can |
|
|
|
|
|
|
|
for cp in self.can_parsers: |
|
|
|
|
|
|
|
if cp is not None: |
|
|
|
|
|
|
|
cp.update_strings(can_strings) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# get CarState |
|
|
|
|
|
|
|
ret = self._update(c) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ret.canValid = all(cp.can_valid for cp in self.can_parsers if cp is not None) |
|
|
|
|
|
|
|
ret.canTimeout = any(cp.bus_timeout for cp in self.can_parsers if cp is not None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# copy back for next iteration |
|
|
|
|
|
|
|
reader = ret.as_reader() |
|
|
|
|
|
|
|
if self.CS is not None: |
|
|
|
|
|
|
|
self.CS.out = reader |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return reader |
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod |
|
|
|
@abstractmethod |
|
|
|
def apply(self, c: car.CarControl) -> Tuple[car.CarControl.Actuators, List[bytes]]: |
|
|
|
def apply(self, c: car.CarControl) -> Tuple[car.CarControl.Actuators, List[bytes]]: |
|
|
|
pass |
|
|
|
pass |
|
|
@ -254,6 +278,10 @@ class CarStateBase(ABC): |
|
|
|
def get_cam_can_parser(CP): |
|
|
|
def get_cam_can_parser(CP): |
|
|
|
return None |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
|
|
|
def get_adas_can_parser(CP): |
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
@staticmethod |
|
|
|
def get_body_can_parser(CP): |
|
|
|
def get_body_can_parser(CP): |
|
|
|
return None |
|
|
|
return None |
|
|
|