openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

176 lines
5.0 KiB

import os
import time
from collections import defaultdict
from selfdrive.can.libdbc_py import libdbc, ffi
class CANParser(object):
def __init__(self, dbc_name, signals, checks=[], bus=0):
self.can_valid = True
self.vl = defaultdict(dict)
self.ts = defaultdict(dict)
sig_names = dict((name, ffi.new("char[]", name)) for name, _, _ in signals)
signal_options_c = ffi.new("SignalParseOptions[]", [
{
'address': sig_address,
'name': sig_names[sig_name],
'default_value': sig_default,
} for sig_name, sig_address, sig_default in signals])
message_options = dict((address, 0) for _, address, _ in signals)
message_options.update(dict(checks))
message_options_c = ffi.new("MessageParseOptions[]", [
{
'address': address,
'check_frequency': freq,
} for address, freq in message_options.iteritems()])
self.can = libdbc.can_init(bus, dbc_name, len(message_options_c), message_options_c,
len(signal_options_c), signal_options_c)
self.p_can_valid = ffi.new("bool*")
value_count = libdbc.can_query(self.can, 0, self.p_can_valid, 0, ffi.NULL)
self.can_values = ffi.new("SignalValue[%d]" % value_count)
self.update_vl(0)
# print "==="
def update_vl(self, sec):
can_values_len = libdbc.can_query(self.can, sec, self.p_can_valid, len(self.can_values), self.can_values)
assert can_values_len <= len(self.can_values)
self.can_valid = self.p_can_valid[0]
# print can_values_len
ret = set()
for i in xrange(can_values_len):
cv = self.can_values[i]
address = cv.address
# print hex(cv.address), ffi.string(cv.name)
name = ffi.string(cv.name)
self.vl[address][name] = cv.value
self.ts[address][name] = cv.ts
ret.add(address)
return ret
def update(self, sec, wait):
libdbc.can_update(self.can, sec, wait)
return self.update_vl(sec)
if __name__ == "__main__":
from common.realtime import sec_since_boot
radar_messages = range(0x430, 0x43A) + range(0x440, 0x446)
# signals = zip(['LONG_DIST'] * 16 + ['NEW_TRACK'] * 16 + ['LAT_DIST'] * 16 +
# ['REL_SPEED'] * 16, radar_messages * 4,
# [255] * 16 + [1] * 16 + [0] * 16 + [0] * 16)
# checks = zip(radar_messages, [20]*16)
# cp = CANParser("acura_ilx_2016_nidec", signals, checks, 1)
# signals = [
# ("XMISSION_SPEED", 0x158, 0), #sig_name, sig_address, default
# ("WHEEL_SPEED_FL", 0x1d0, 0),
# ("WHEEL_SPEED_FR", 0x1d0, 0),
# ("WHEEL_SPEED_RL", 0x1d0, 0),
# ("STEER_ANGLE", 0x14a, 0),
# ("STEER_TORQUE_SENSOR", 0x18f, 0),
# ("GEAR", 0x191, 0),
# ("WHEELS_MOVING", 0x1b0, 1),
# ("DOOR_OPEN_FL", 0x405, 1),
# ("DOOR_OPEN_FR", 0x405, 1),
# ("DOOR_OPEN_RL", 0x405, 1),
# ("DOOR_OPEN_RR", 0x405, 1),
# ("CRUISE_SPEED_PCM", 0x324, 0),
# ("SEATBELT_DRIVER_LAMP", 0x305, 1),
# ("SEATBELT_DRIVER_LATCHED", 0x305, 0),
# ("BRAKE_PRESSED", 0x17c, 0),
# ("CAR_GAS", 0x130, 0),
# ("CRUISE_BUTTONS", 0x296, 0),
# ("ESP_DISABLED", 0x1a4, 1),
# ("HUD_LEAD", 0x30c, 0),
# ("USER_BRAKE", 0x1a4, 0),
# ("STEER_STATUS", 0x18f, 5),
# ("WHEEL_SPEED_RR", 0x1d0, 0),
# ("BRAKE_ERROR_1", 0x1b0, 1),
# ("BRAKE_ERROR_2", 0x1b0, 1),
# ("GEAR_SHIFTER", 0x191, 0),
# ("MAIN_ON", 0x326, 0),
# ("ACC_STATUS", 0x17c, 0),
# ("PEDAL_GAS", 0x17c, 0),
# ("CRUISE_SETTING", 0x296, 0),
# ("LEFT_BLINKER", 0x326, 0),
# ("RIGHT_BLINKER", 0x326, 0),
# ("COUNTER", 0x324, 0),
# ("ENGINE_RPM", 0x17C, 0)
# ]
# checks = [
# (0x14a, 100), # address, frequency
# (0x158, 100),
# (0x17c, 100),
# (0x191, 100),
# (0x1a4, 50),
# (0x326, 10),
# (0x1b0, 50),
# (0x1d0, 50),
# (0x305, 10),
# (0x324, 10),
# (0x405, 3),
# ]
# cp = CANParser("honda_civic_touring_2016_can", signals, checks, 0)
signals = [
# sig_name, sig_address, default
("GEAR", 956, 0x20),
("BRAKE_PRESSED", 548, 0),
("GAS_PEDAL", 705, 0),
("WHEEL_SPEED_FL", 170, 0),
("WHEEL_SPEED_FR", 170, 0),
("WHEEL_SPEED_RL", 170, 0),
("WHEEL_SPEED_RR", 170, 0),
("DOOR_OPEN_FL", 1568, 1),
("DOOR_OPEN_FR", 1568, 1),
("DOOR_OPEN_RL", 1568, 1),
("DOOR_OPEN_RR", 1568, 1),
("SEATBELT_DRIVER_UNLATCHED", 1568, 1),
("TC_DISABLED", 951, 1),
("STEER_ANGLE", 37, 0),
("STEER_FRACTION", 37, 0),
("STEER_RATE", 37, 0),
("GAS_RELEASED", 466, 0),
("CRUISE_STATE", 466, 0),
("MAIN_ON", 467, 0),
("SET_SPEED", 467, 0),
("STEER_TORQUE_DRIVER", 608, 0),
("STEER_TORQUE_EPS", 608, 0),
("TURN_SIGNALS", 1556, 3), # 3 is no blinkers
("LKA_STATE", 610, 0),
]
checks = [
(548, 40),
(705, 33),
(170, 80),
(37, 80),
(466, 33),
(608, 50),
]
cp = CANParser("toyota_rav4_2017_pt", signals, checks, 0)
# print cp.vl
while True:
cp.update(int(sec_since_boot()*1e9), True)
# print cp.vl
print cp.ts
print cp.can_valid
time.sleep(0.01)