openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
3.0 KiB

#!/usr/bin/env python3
from cereal import car
from opendbc.can.parser import CANParser
from selfdrive.car.tesla.values import DBC, CANBUS
from selfdrive.car.interfaces import RadarInterfaceBase
RADAR_MSGS_A = list(range(0x310, 0x36E, 3))
RADAR_MSGS_B = list(range(0x311, 0x36F, 3))
NUM_POINTS = len(RADAR_MSGS_A)
def get_radar_can_parser(CP):
# Status messages
signals = [
('RADC_HWFail', 'TeslaRadarSguInfo'),
('RADC_SGUFail', 'TeslaRadarSguInfo'),
('RADC_SensorDirty', 'TeslaRadarSguInfo'),
]
checks = [
('TeslaRadarSguInfo', 10),
]
# Radar tracks. There are also raw point clouds available,
# we don't use those.
for i in range(NUM_POINTS):
msg_id_a = RADAR_MSGS_A[i]
msg_id_b = RADAR_MSGS_B[i]
# There is a bunch more info in the messages,
# but these are the only things actually used in openpilot
signals.extend([
('LongDist', msg_id_a),
('LongSpeed', msg_id_a),
('LatDist', msg_id_a),
('LongAccel', msg_id_a),
('Meas', msg_id_a),
('Tracked', msg_id_a),
('Index', msg_id_a),
('LatSpeed', msg_id_b),
('Index2', msg_id_b),
])
checks.extend([
(msg_id_a, 8),
(msg_id_b, 8),
])
return CANParser(DBC[CP.carFingerprint]['radar'], signals, checks, CANBUS.radar)
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)
self.rcp = get_radar_can_parser(CP)
self.updated_messages = set()
self.track_id = 0
self.trigger_msg = RADAR_MSGS_B[-1]
def update(self, can_strings):
if self.rcp is None:
return super().update(None)
values = self.rcp.update_strings(can_strings)
self.updated_messages.update(values)
if self.trigger_msg not in self.updated_messages:
return None
ret = car.RadarData.new_message()
# Errors
errors = []
sgu_info = self.rcp.vl['TeslaRadarSguInfo']
if not self.rcp.can_valid:
errors.append('canError')
if sgu_info['RADC_HWFail'] or sgu_info['RADC_SGUFail'] or sgu_info['RADC_SensorDirty']:
errors.append('fault')
ret.errors = errors
# Radar tracks
for i in range(NUM_POINTS):
msg_a = self.rcp.vl[RADAR_MSGS_A[i]]
msg_b = self.rcp.vl[RADAR_MSGS_B[i]]
# Make sure msg A and B are together
if msg_a['Index'] != msg_b['Index2']:
continue
# Check if it's a valid track
if not msg_a['Tracked']:
if i in self.pts:
del self.pts[i]
continue
# New track!
if i not in self.pts:
self.pts[i] = car.RadarData.RadarPoint.new_message()
self.pts[i].trackId = self.track_id
self.track_id += 1
# Parse track data
self.pts[i].dRel = msg_a['LongDist']
self.pts[i].yRel = msg_a['LatDist']
self.pts[i].vRel = msg_a['LongSpeed']
self.pts[i].aRel = msg_a['LongAccel']
self.pts[i].yvRel = msg_b['LatSpeed']
self.pts[i].measured = bool(msg_a['Meas'])
ret.points = list(self.pts.values())
self.updated_messages.clear()
return ret