openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
2.9 KiB

#!/usr/bin/env python
import os
import zmq
import time
from cereal import car
from selfdrive.can.parser import CANParser
from common.realtime import sec_since_boot
from selfdrive.services import service_list
import selfdrive.messaging as messaging
def _create_nidec_can_parser():
dbc_f = 'acura_ilx_2016_nidec.dbc'
radar_messages = [0x400] + range(0x430, 0x43A) + range(0x440, 0x446)
signals = zip(['RADAR_STATE'] +
['LONG_DIST'] * 16 + ['NEW_TRACK'] * 16 + ['LAT_DIST'] * 16 +
['REL_SPEED'] * 16,
[0x400] + radar_messages[1:] * 4,
[0] + [255] * 16 + [1] * 16 + [0] * 16 + [0] * 16)
checks = zip([0x445], [20])
return CANParser(os.path.splitext(dbc_f)[0], signals, checks, 1)
class RadarInterface(object):
def __init__(self, CP):
# radar
self.pts = {}
self.track_id = 0
self.radar_fault = False
self.radar_wrong_config = False
self.radar_off_can = CP.radarOffCan
self.delay = 0.1 # Delay of radar
# Nidec
self.rcp = _create_nidec_can_parser()
context = zmq.Context()
self.logcan = messaging.sub_sock(context, service_list['can'].port)
def update(self):
canMonoTimes = []
updated_messages = set()
ret = car.RadarState.new_message()
# in Bosch radar and we are only steering for now, so sleep 0.05s to keep
# radard at 20Hz and return no points
if self.radar_off_can:
time.sleep(0.05)
return ret
while 1:
tm = int(sec_since_boot() * 1e9)
updated_messages.update(self.rcp.update(tm, True))
if 0x445 in updated_messages:
break
for ii in updated_messages:
cpt = self.rcp.vl[ii]
if ii == 0x400:
# check for radar faults
self.radar_fault = cpt['RADAR_STATE'] != 0x79
self.radar_wrong_config = cpt['RADAR_STATE'] == 0x69
elif cpt['LONG_DIST'] < 255:
if ii not in self.pts or cpt['NEW_TRACK']:
self.pts[ii] = car.RadarState.RadarPoint.new_message()
self.pts[ii].trackId = self.track_id
self.track_id += 1
self.pts[ii].dRel = cpt['LONG_DIST'] # from front of car
self.pts[ii].yRel = -cpt['LAT_DIST'] # in car frame's y axis, left is positive
self.pts[ii].vRel = cpt['REL_SPEED']
self.pts[ii].aRel = float('nan')
self.pts[ii].yvRel = float('nan')
self.pts[ii].measured = True
else:
if ii in self.pts:
del self.pts[ii]
errors = []
if not self.rcp.can_valid:
errors.append("commIssue")
if self.radar_fault:
errors.append("fault")
if self.radar_wrong_config:
errors.append("wrongConfig")
ret.errors = errors
ret.canMonoTimes = canMonoTimes
ret.points = self.pts.values()
return ret
if __name__ == "__main__":
class CarParams:
radarOffCan = False
RI = RadarInterface(CarParams)
while 1:
ret = RI.update()
print(chr(27) + "[2J")
print(ret)