You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					100 lines
				
				3.4 KiB
			
		
		
			
		
	
	
					100 lines
				
				3.4 KiB
			| 
								 
											3 weeks ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 | 
							
								import math
							 | 
						||
| 
								 | 
							
								from opendbc.can import CANParser
							 | 
						||
| 
								 | 
							
								from opendbc.car import Bus, structs
							 | 
						||
| 
								 | 
							
								from opendbc.car.common.conversions import Conversions as CV
							 | 
						||
| 
								 | 
							
								from opendbc.car.gm.values import DBC, CanBus
							 | 
						||
| 
								 | 
							
								from opendbc.car.interfaces import RadarInterfaceBase
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								RADAR_HEADER_MSG = 1120  # F_LRR_Obj_Header
							 | 
						||
| 
								 | 
							
								CAMERA_DATA_HEADER_MSG = 1056  # F_Vision_Obj_Header
							 | 
						||
| 
								 | 
							
								SLOT_1_MSG = RADAR_HEADER_MSG + 1
							 | 
						||
| 
								 | 
							
								NUM_SLOTS = 20
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Actually it's 0x47f, but can parser only reports
							 | 
						||
| 
								 | 
							
								# messages that are present in DBC
							 | 
						||
| 
								 | 
							
								LAST_RADAR_MSG = RADAR_HEADER_MSG + NUM_SLOTS
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def create_radar_can_parser(car_fingerprint):
							 | 
						||
| 
								 | 
							
								  # C1A-ARS3-A by Continental
							 | 
						||
| 
								 | 
							
								  radar_targets = list(range(SLOT_1_MSG, SLOT_1_MSG + NUM_SLOTS))
							 | 
						||
| 
								 | 
							
								  signals = list(zip(['FLRRNumValidTargets',
							 | 
						||
| 
								 | 
							
								                      'FLRRSnsrBlckd', 'FLRRYawRtPlsblityFlt',
							 | 
						||
| 
								 | 
							
								                      'FLRRHWFltPrsntInt', 'FLRRAntTngFltPrsnt',
							 | 
						||
| 
								 | 
							
								                      'FLRRAlgnFltPrsnt', 'FLRRSnstvFltPrsntInt'] +
							 | 
						||
| 
								 | 
							
								                     ['TrkRange'] * NUM_SLOTS + ['TrkRangeRate'] * NUM_SLOTS +
							 | 
						||
| 
								 | 
							
								                     ['TrkRangeAccel'] * NUM_SLOTS + ['TrkAzimuth'] * NUM_SLOTS +
							 | 
						||
| 
								 | 
							
								                     ['TrkWidth'] * NUM_SLOTS + ['TrkObjectID'] * NUM_SLOTS,
							 | 
						||
| 
								 | 
							
								                     [RADAR_HEADER_MSG] * 7 + radar_targets * 6, strict=True))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  messages = list({(s[1], 14) for s in signals})
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return CANParser(DBC[car_fingerprint][Bus.radar], messages, CanBus.OBSTACLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class RadarInterface(RadarInterfaceBase):
							 | 
						||
| 
								 | 
							
								  def __init__(self, CP):
							 | 
						||
| 
								 | 
							
								    super().__init__(CP)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    self.rcp = None if CP.radarUnavailable else create_radar_can_parser(CP.carFingerprint)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    self.trigger_msg = LAST_RADAR_MSG
							 | 
						||
| 
								 | 
							
								    self.updated_messages = set()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def update(self, can_strings):
							 | 
						||
| 
								 | 
							
								    if self.rcp is None:
							 | 
						||
| 
								 | 
							
								      return super().update(None)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    vls = self.rcp.update(can_strings)
							 | 
						||
| 
								 | 
							
								    self.updated_messages.update(vls)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if self.trigger_msg not in self.updated_messages:
							 | 
						||
| 
								 | 
							
								      return None
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    ret = structs.RadarData()
							 | 
						||
| 
								 | 
							
								    header = self.rcp.vl[RADAR_HEADER_MSG]
							 | 
						||
| 
								 | 
							
								    fault = header['FLRRSnsrBlckd'] or header['FLRRSnstvFltPrsntInt'] or \
							 | 
						||
| 
								 | 
							
								      header['FLRRYawRtPlsblityFlt'] or header['FLRRHWFltPrsntInt'] or \
							 | 
						||
| 
								 | 
							
								      header['FLRRAntTngFltPrsnt'] or header['FLRRAlgnFltPrsnt']
							 | 
						||
| 
								 | 
							
								    if not self.rcp.can_valid:
							 | 
						||
| 
								 | 
							
								      ret.errors.canError = True
							 | 
						||
| 
								 | 
							
								    if fault:
							 | 
						||
| 
								 | 
							
								      ret.errors.radarFault = True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    currentTargets = set()
							 | 
						||
| 
								 | 
							
								    num_targets = header['FLRRNumValidTargets']
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Not all radar messages describe targets,
							 | 
						||
| 
								 | 
							
								    # no need to monitor all of the self.rcp.msgs_upd
							 | 
						||
| 
								 | 
							
								    for ii in self.updated_messages:
							 | 
						||
| 
								 | 
							
								      if ii == RADAR_HEADER_MSG:
							 | 
						||
| 
								 | 
							
								        continue
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      if num_targets == 0:
							 | 
						||
| 
								 | 
							
								        break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      cpt = self.rcp.vl[ii]
							 | 
						||
| 
								 | 
							
								      # Zero distance means it's an empty target slot
							 | 
						||
| 
								 | 
							
								      if cpt['TrkRange'] > 0.0:
							 | 
						||
| 
								 | 
							
								        targetId = cpt['TrkObjectID']
							 | 
						||
| 
								 | 
							
								        currentTargets.add(targetId)
							 | 
						||
| 
								 | 
							
								        if targetId not in self.pts:
							 | 
						||
| 
								 | 
							
								          self.pts[targetId] = structs.RadarData.RadarPoint()
							 | 
						||
| 
								 | 
							
								          self.pts[targetId].trackId = targetId
							 | 
						||
| 
								 | 
							
								        distance = cpt['TrkRange']
							 | 
						||
| 
								 | 
							
								        self.pts[targetId].dRel = distance  # from front of car
							 | 
						||
| 
								 | 
							
								        # From driver's pov, left is positive
							 | 
						||
| 
								 | 
							
								        self.pts[targetId].yRel = math.sin(cpt['TrkAzimuth'] * CV.DEG_TO_RAD) * distance
							 | 
						||
| 
								 | 
							
								        self.pts[targetId].vRel = cpt['TrkRangeRate']
							 | 
						||
| 
								 | 
							
								        self.pts[targetId].aRel = float('nan')
							 | 
						||
| 
								 | 
							
								        self.pts[targetId].yvRel = float('nan')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    for oldTarget in list(self.pts.keys()):
							 | 
						||
| 
								 | 
							
								      if oldTarget not in currentTargets:
							 | 
						||
| 
								 | 
							
								        del self.pts[oldTarget]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    ret.points = list(self.pts.values())
							 | 
						||
| 
								 | 
							
								    self.updated_messages.clear()
							 | 
						||
| 
								 | 
							
								    return ret
							 |