|
|
|
@ -1,33 +1,63 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
from math import cos, sin |
|
|
|
|
from cereal import car |
|
|
|
|
from common.conversions import Conversions as CV |
|
|
|
|
from opendbc.can.parser import CANParser |
|
|
|
|
from selfdrive.car.ford.values import CANBUS, DBC |
|
|
|
|
from common.conversions import Conversions as CV |
|
|
|
|
from selfdrive.car.ford.values import CANBUS, DBC, RADAR |
|
|
|
|
from selfdrive.car.interfaces import RadarInterfaceBase |
|
|
|
|
|
|
|
|
|
RADAR_MSGS = list(range(0x500, 0x540)) |
|
|
|
|
DELPHI_ESR_RADAR_MSGS = list(range(0x500, 0x540)) |
|
|
|
|
|
|
|
|
|
DELPHI_MRR_RADAR_START_ADDR = 0x120 |
|
|
|
|
DELPHI_MRR_RADAR_MSG_COUNT = 64 |
|
|
|
|
|
|
|
|
|
def _create_radar_can_parser(car_fingerprint): |
|
|
|
|
if DBC[car_fingerprint]['radar'] is None: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
msg_n = len(RADAR_MSGS) |
|
|
|
|
def _create_delphi_esr_radar_can_parser(): |
|
|
|
|
msg_n = len(DELPHI_ESR_RADAR_MSGS) |
|
|
|
|
signals = list(zip(['X_Rel'] * msg_n + ['Angle'] * msg_n + ['V_Rel'] * msg_n, |
|
|
|
|
RADAR_MSGS * 3)) |
|
|
|
|
checks = list(zip(RADAR_MSGS, [20] * msg_n)) |
|
|
|
|
DELPHI_ESR_RADAR_MSGS * 3)) |
|
|
|
|
checks = list(zip(DELPHI_ESR_RADAR_MSGS, [20] * msg_n)) |
|
|
|
|
|
|
|
|
|
return CANParser(RADAR.DELPHI_ESR, signals, checks, CANBUS.radar) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _create_delphi_mrr_radar_can_parser(): |
|
|
|
|
signals = [] |
|
|
|
|
checks = [] |
|
|
|
|
|
|
|
|
|
for i in range(1, DELPHI_MRR_RADAR_MSG_COUNT + 1): |
|
|
|
|
msg = f"MRR_Detection_{i:03d}" |
|
|
|
|
signals += [ |
|
|
|
|
(f"CAN_DET_VALID_LEVEL_{i:02d}", msg), |
|
|
|
|
(f"CAN_DET_AZIMUTH_{i:02d}", msg), |
|
|
|
|
(f"CAN_DET_RANGE_{i:02d}", msg), |
|
|
|
|
(f"CAN_DET_RANGE_RATE_{i:02d}", msg), |
|
|
|
|
(f"CAN_DET_AMPLITUDE_{i:02d}", msg), |
|
|
|
|
(f"CAN_SCAN_INDEX_2LSB_{i:02d}", msg), |
|
|
|
|
] |
|
|
|
|
checks += [(msg, 20)] |
|
|
|
|
|
|
|
|
|
return CANParser(RADAR.DELPHI_MRR, signals, checks, CANBUS.radar) |
|
|
|
|
|
|
|
|
|
return CANParser(DBC[car_fingerprint]['radar'], signals, checks, CANBUS.radar) |
|
|
|
|
|
|
|
|
|
class RadarInterface(RadarInterfaceBase): |
|
|
|
|
def __init__(self, CP): |
|
|
|
|
super().__init__(CP) |
|
|
|
|
self.validCnt = {key: 0 for key in RADAR_MSGS} |
|
|
|
|
self.track_id = 0 |
|
|
|
|
|
|
|
|
|
self.rcp = _create_radar_can_parser(CP.carFingerprint) |
|
|
|
|
self.trigger_msg = 0x53f |
|
|
|
|
self.updated_messages = set() |
|
|
|
|
self.track_id = 0 |
|
|
|
|
self.radar = DBC[CP.carFingerprint]['radar'] |
|
|
|
|
if self.radar is None: |
|
|
|
|
self.rcp = None |
|
|
|
|
elif self.radar == RADAR.DELPHI_ESR: |
|
|
|
|
self.rcp = _create_delphi_esr_radar_can_parser() |
|
|
|
|
self.trigger_msg = DELPHI_ESR_RADAR_MSGS[-1] |
|
|
|
|
self.valid_cnt = {key: 0 for key in DELPHI_ESR_RADAR_MSGS} |
|
|
|
|
elif self.radar == RADAR.DELPHI_MRR: |
|
|
|
|
self.rcp = _create_delphi_mrr_radar_can_parser() |
|
|
|
|
self.trigger_msg = DELPHI_MRR_RADAR_START_ADDR + DELPHI_MRR_RADAR_MSG_COUNT - 1 |
|
|
|
|
else: |
|
|
|
|
raise ValueError(f"Unsupported radar: {self.radar}") |
|
|
|
|
|
|
|
|
|
def update(self, can_strings): |
|
|
|
|
if self.rcp is None: |
|
|
|
@ -45,20 +75,30 @@ class RadarInterface(RadarInterfaceBase): |
|
|
|
|
errors.append("canError") |
|
|
|
|
ret.errors = errors |
|
|
|
|
|
|
|
|
|
if self.radar == RADAR.DELPHI_ESR: |
|
|
|
|
self._update_delphi_esr() |
|
|
|
|
elif self.radar == RADAR.DELPHI_MRR: |
|
|
|
|
self._update_delphi_mrr() |
|
|
|
|
|
|
|
|
|
ret.points = list(self.pts.values()) |
|
|
|
|
self.updated_messages.clear() |
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
def _update_delphi_esr(self): |
|
|
|
|
for ii in sorted(self.updated_messages): |
|
|
|
|
cpt = self.rcp.vl[ii] |
|
|
|
|
|
|
|
|
|
if cpt['X_Rel'] > 0.00001: |
|
|
|
|
self.validCnt[ii] = 0 # reset counter |
|
|
|
|
self.valid_cnt[ii] = 0 # reset counter |
|
|
|
|
|
|
|
|
|
if cpt['X_Rel'] > 0.00001: |
|
|
|
|
self.validCnt[ii] += 1 |
|
|
|
|
self.valid_cnt[ii] += 1 |
|
|
|
|
else: |
|
|
|
|
self.validCnt[ii] = max(self.validCnt[ii] - 1, 0) |
|
|
|
|
#print ii, self.validCnt[ii], cpt['VALID'], cpt['X_Rel'], cpt['Angle'] |
|
|
|
|
self.valid_cnt[ii] = max(self.valid_cnt[ii] - 1, 0) |
|
|
|
|
#print ii, self.valid_cnt[ii], cpt['VALID'], cpt['X_Rel'], cpt['Angle'] |
|
|
|
|
|
|
|
|
|
# radar point only valid if there have been enough valid measurements |
|
|
|
|
if self.validCnt[ii] > 0: |
|
|
|
|
if self.valid_cnt[ii] > 0: |
|
|
|
|
if ii not in self.pts: |
|
|
|
|
self.pts[ii] = car.RadarData.RadarPoint.new_message() |
|
|
|
|
self.pts[ii].trackId = self.track_id |
|
|
|
@ -73,6 +113,36 @@ class RadarInterface(RadarInterfaceBase): |
|
|
|
|
if ii in self.pts: |
|
|
|
|
del self.pts[ii] |
|
|
|
|
|
|
|
|
|
ret.points = list(self.pts.values()) |
|
|
|
|
self.updated_messages.clear() |
|
|
|
|
return ret |
|
|
|
|
def _update_delphi_mrr(self): |
|
|
|
|
for ii in range(1, DELPHI_MRR_RADAR_MSG_COUNT + 1): |
|
|
|
|
msg = self.rcp.vl[f"MRR_Detection_{ii:03d}"] |
|
|
|
|
|
|
|
|
|
# SCAN_INDEX rotates through 0..3 on each message |
|
|
|
|
# treat these as separate points |
|
|
|
|
scanIndex = msg[f"CAN_SCAN_INDEX_2LSB_{ii:02d}"] |
|
|
|
|
i = (ii - 1) * 4 + scanIndex |
|
|
|
|
|
|
|
|
|
if i not in self.pts: |
|
|
|
|
self.pts[i] = car.RadarData.RadarPoint.new_message() |
|
|
|
|
self.pts[i].trackId = self.track_id |
|
|
|
|
self.pts[i].aRel = float('nan') |
|
|
|
|
self.pts[i].yvRel = float('nan') |
|
|
|
|
self.track_id += 1 |
|
|
|
|
|
|
|
|
|
valid = bool(msg[f"CAN_DET_VALID_LEVEL_{ii:02d}"]) |
|
|
|
|
amplitude = msg[f"CAN_DET_AMPLITUDE_{ii:02d}"] # dBsm [-64|63] |
|
|
|
|
|
|
|
|
|
if valid and 0 < amplitude <= 15: |
|
|
|
|
azimuth = msg[f"CAN_DET_AZIMUTH_{ii:02d}"] # rad [-3.1416|3.13964] |
|
|
|
|
dist = msg[f"CAN_DET_RANGE_{ii:02d}"] # m [0|255.984] |
|
|
|
|
distRate = msg[f"CAN_DET_RANGE_RATE_{ii:02d}"] # m/s [-128|127.984] |
|
|
|
|
|
|
|
|
|
# *** openpilot radar point *** |
|
|
|
|
self.pts[i].dRel = cos(azimuth) * dist # m from front of car |
|
|
|
|
self.pts[i].yRel = -sin(azimuth) * dist # in car frame's y axis, left is positive |
|
|
|
|
self.pts[i].vRel = distRate # m/s |
|
|
|
|
|
|
|
|
|
self.pts[i].measured = True |
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
del self.pts[i] |
|
|
|
|