openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

98 lines
3.0 KiB

5 years ago
#!/usr/bin/env python3
from collections import defaultdict
5 years ago
from cereal import car
from opendbc.can.parser import CANParser
from selfdrive.car.interfaces import RadarInterfaceBase
from selfdrive.car.honda.values import DBC
5 years ago
def _create_nidec_can_parser(car_fingerprint):
5 years ago
radar_messages = [0x400] + list(range(0x430, 0x43A)) + list(range(0x440, 0x446))
messages = [(m, 20) for m in radar_messages]
return CANParser(DBC[car_fingerprint]['radar'], messages, 1)
5 years ago
class RadarInterface(RadarInterfaceBase):
def __init__(self, CP):
super().__init__(CP)
5 years ago
self.track_id = 0
self.radar_fault = False
self.radar_wrong_config = False
self.radar_off_can = CP.radarUnavailable
5 years ago
self.radar_ts = CP.radarTimeStep
self.delay = int(round(0.1 / CP.radarTimeStep)) # 0.1s delay of radar
# Nidec
if self.radar_off_can:
self.rcp = None
else:
self.rcp = _create_nidec_can_parser(CP.carFingerprint)
5 years ago
self.trigger_msg = 0x445
self.updated_values = defaultdict(lambda: defaultdict(list))
5 years ago
def update(self, can_strings):
# in Bosch radar and we are only steering for now, so sleep 0.05s to keep
# radard at 20Hz and return no points
if self.radar_off_can:
return None
5 years ago
addresses = self.rcp.update_strings(can_strings)
for addr in addresses:
vals_dict = self.rcp.vl_all[addr]
for sig_name, vals in vals_dict.items():
self.updated_values[addr][sig_name].extend(vals)
5 years ago
if self.trigger_msg not in self.updated_values:
5 years ago
return None
radar_data = self._radar_msg_from_buffer(self.updated_values, self.rcp.can_valid)
self.updated_values.clear()
return radar_data
5 years ago
def _radar_msg_from_buffer(self, updated_values, can_valid):
5 years ago
ret = car.RadarData.new_message()
for ii in sorted(updated_values):
msgs = updated_values[ii]
n_vals_per_addr = len(list(msgs.values())[0])
cpts = [
{k: v[i] for k, v in msgs.items()}
for i in range(n_vals_per_addr)
]
for cpt in cpts:
if ii == 0x400:
# check for radar faults
self.radar_fault = cpt['RADAR_STATE'] != 0x79
self.radar_wrong_config = cpt['RADAR_STATE'] == 0x69
elif cpt['LONG_DIST'] < 255:
if ii not in self.pts or cpt['NEW_TRACK']:
self.pts[ii] = car.RadarData.RadarPoint.new_message()
self.pts[ii].trackId = self.track_id
self.track_id += 1
self.pts[ii].dRel = cpt['LONG_DIST'] # from front of car
self.pts[ii].yRel = -cpt['LAT_DIST'] # in car frame's y axis, left is positive
self.pts[ii].vRel = cpt['REL_SPEED']
self.pts[ii].aRel = float('nan')
self.pts[ii].yvRel = float('nan')
self.pts[ii].measured = True
else:
if ii in self.pts:
del self.pts[ii]
5 years ago
errors = []
if not can_valid:
5 years ago
errors.append("canError")
if self.radar_fault:
errors.append("fault")
if self.radar_wrong_config:
errors.append("wrongConfig")
ret.errors = errors
ret.points = list(self.pts.values())
return ret