#!/usr/bin/env python import math import time import numpy as np from cereal import car from selfdrive.can.parser import CANParser from selfdrive.car.gm.interface import CanBus from selfdrive.car.gm.values import DBC, CAR RADAR_HEADER_MSG = 1120 SLOT_1_MSG = RADAR_HEADER_MSG + 1 NUM_SLOTS = 20 # Actually it's 0x47f, but can parser only reports # messages that are present in DBC LAST_RADAR_MSG = RADAR_HEADER_MSG + NUM_SLOTS def create_radar_can_parser(canbus, car_fingerprint): dbc_f = DBC[car_fingerprint]['radar'] if car_fingerprint in (CAR.VOLT, CAR.MALIBU, CAR.HOLDEN_ASTRA, CAR.ACADIA, CAR.CADILLAC_ATS): # C1A-ARS3-A by Continental radar_targets = range(SLOT_1_MSG, SLOT_1_MSG + NUM_SLOTS) signals = list(zip(['FLRRNumValidTargets', 'FLRRSnsrBlckd', 'FLRRYawRtPlsblityFlt', 'FLRRHWFltPrsntInt', 'FLRRAntTngFltPrsnt', 'FLRRAlgnFltPrsnt', 'FLRRSnstvFltPrsntInt'] + ['TrkRange'] * NUM_SLOTS + ['TrkRangeRate'] * NUM_SLOTS + ['TrkRangeAccel'] * NUM_SLOTS + ['TrkAzimuth'] * NUM_SLOTS + ['TrkWidth'] * NUM_SLOTS + ['TrkObjectID'] * NUM_SLOTS, [RADAR_HEADER_MSG] * 7 + radar_targets * 6, [0] * 7 + [0.0] * NUM_SLOTS + [0.0] * NUM_SLOTS + [0.0] * NUM_SLOTS + [0.0] * NUM_SLOTS + [0.0] * NUM_SLOTS + [0] * NUM_SLOTS)) checks = [] return CANParser(dbc_f, signals, checks, canbus.obstacle) else: return None class RadarInterface(object): def __init__(self, CP): # radar self.pts = {} self.delay = 0.0 # Delay of radar canbus = CanBus() print "Using %d as obstacle CAN bus ID" % canbus.obstacle self.rcp = create_radar_can_parser(canbus, CP.carFingerprint) self.trigger_msg = LAST_RADAR_MSG self.updated_messages = set() def update(self, can_strings): if self.rcp is None: time.sleep(0.05) # nothing to do return car.RadarData.new_message() vls = self.rcp.update_strings(can_strings) self.updated_messages.update(vls) if self.trigger_msg not in self.updated_messages: return None ret = car.RadarData.new_message() header = self.rcp.vl[RADAR_HEADER_MSG] fault = header['FLRRSnsrBlckd'] or header['FLRRSnstvFltPrsntInt'] or \ header['FLRRYawRtPlsblityFlt'] or header['FLRRHWFltPrsntInt'] or \ header['FLRRAntTngFltPrsnt'] or header['FLRRAlgnFltPrsnt'] errors = [] if not self.rcp.can_valid: errors.append("canError") if fault: errors.append("fault") ret.errors = errors currentTargets = set() num_targets = header['FLRRNumValidTargets'] # Not all radar messages describe targets, # no need to monitor all of the self.rcp.msgs_upd for ii in self.updated_messages: if ii == RADAR_HEADER_MSG: continue if num_targets == 0: break cpt = self.rcp.vl[ii] # Zero distance means it's an empty target slot if cpt['TrkRange'] > 0.0: targetId = cpt['TrkObjectID'] currentTargets.add(targetId) if targetId not in self.pts: self.pts[targetId] = car.RadarData.RadarPoint.new_message() self.pts[targetId].trackId = targetId distance = cpt['TrkRange'] self.pts[targetId].dRel = distance # from front of car # From driver's pov, left is positive deg_to_rad = np.pi/180. self.pts[targetId].yRel = math.sin(deg_to_rad * cpt['TrkAzimuth']) * distance self.pts[targetId].vRel = cpt['TrkRangeRate'] self.pts[targetId].aRel = float('nan') self.pts[targetId].yvRel = float('nan') for oldTarget in self.pts.keys(): if not oldTarget in currentTargets: del self.pts[oldTarget] ret.points = self.pts.values() self.updated_messages.clear() return ret