openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

302 lines
9.0 KiB

#!/usr/bin/env python
import os
import struct
import zmq
import numpy as np
from opendbc import DBC_PATH
from common.realtime import Ratekeeper
import selfdrive.messaging as messaging
from selfdrive.services import service_list
from selfdrive.config import CruiseButtons
from selfdrive.car.honda.hondacan import fix
from selfdrive.car.honda.carstate import get_can_parser
from selfdrive.boardd.boardd import can_capnp_to_can_list, can_list_to_can_capnp
from selfdrive.car.honda.can_parser import CANParser
from cereal import car
from common.dbc import dbc
acura = dbc(os.path.join(DBC_PATH, "acura_ilx_2016_can.dbc"))
def fake_car_params():
ret = car.CarParams.new_message()
# largely copied from honda
ret.carName = "honda"
ret.radarName = "nidec"
ret.carFingerprint = "ACURA ILX 2016 ACURAWATCH PLUS"
ret.enableSteer = True
ret.enableBrake = True
ret.enableGas = True
ret.enableCruise = False
ret.wheelBase = 2.67
ret.steerRatio = 15.3
ret.slipFactor = 0.0014
ret.steerKp, ret.steerKi = 12.0, 1.0
return ret
def car_plant(pos, speed, grade, gas, brake):
# vehicle parameters
mass = 1700
aero_cd = 0.3
force_peak = mass*3.
force_brake_peak = -mass*10. #1g
power_peak = 100000 # 100kW
speed_base = power_peak/force_peak
rolling_res = 0.01
g = 9.81
wheel_r = 0.31
frontal_area = 2.2
air_density = 1.225
gas_to_peak_linear_slope = 3.33
brake_to_peak_linear_slope = 0.2
creep_accel_v = [1., 0.]
creep_accel_bp = [0., 1.5]
#*** longitudinal model ***
# find speed where peak torque meets peak power
force_brake = brake * force_brake_peak * brake_to_peak_linear_slope
if speed < speed_base: # torque control
force_gas = gas * force_peak * gas_to_peak_linear_slope
else: # power control
force_gas = gas * power_peak / speed * gas_to_peak_linear_slope
force_grade = - grade * mass # positive grade means uphill
creep_accel = np.interp(speed, creep_accel_bp, creep_accel_v)
force_creep = creep_accel * mass
force_resistance = -(rolling_res * mass * g + 0.5 * speed**2 * aero_cd * air_density)
force = force_gas + force_brake + force_resistance + force_grade + force_creep
acceleration = force / mass
# TODO: lateral model
return speed, acceleration
def get_car_can_parser():
dbc_f = 'acura_ilx_2016_can.dbc'
signals = [
("STEER_TORQUE", 0xe4, 0),
("STEER_TORQUE_REQUEST", 0xe4, 0),
("COMPUTER_BRAKE", 0x1fa, 0),
("COMPUTER_BRAKE_REQUEST", 0x1fa, 0),
("GAS_COMMAND", 0x200, 0),
]
checks = [
(0xe4, 100),
(0x1fa, 50),
(0x200, 50),
]
return CANParser(dbc_f, signals, checks)
class Plant(object):
messaging_initialized = False
def __init__(self, lead_relevancy=False, rate=100, speed=0.0, distance_lead=2.0):
self.rate = rate
self.civic = False
self.brake_only = False
if not Plant.messaging_initialized:
context = zmq.Context()
Plant.logcan = messaging.pub_sock(context, service_list['can'].port)
Plant.sendcan = messaging.sub_sock(context, service_list['sendcan'].port)
Plant.model = messaging.pub_sock(context, service_list['model'].port)
Plant.live100 = messaging.sub_sock(context, service_list['live100'].port)
Plant.messaging_initialized = True
self.angle_steer = 0.
self.gear_choice = 0
self.speed, self.speed_prev = 0., 0.
self.esp_disabled = 0
self.main_on = 1
self.user_gas = 0
self.computer_brake,self.user_brake = 0,0
self.brake_pressed = 0
self.distance, self.distance_prev = 0., 0.
self.speed, self.speed_prev = speed, speed
self.steer_error, self.brake_error, self.steer_not_allowed = 0, 0, 0
self.gear_shifter = 4 # D gear
self.pedal_gas = 0
self.cruise_setting = 0
self.seatbelt, self.door_all_closed = True, True
self.steer_torque, self.v_cruise, self.acc_status = 0, 0, 0 # v_cruise is reported from can, not the one used for controls
self.lead_relevancy = lead_relevancy
# lead car
self.distance_lead, self.distance_lead_prev = distance_lead , distance_lead
self.rk = Ratekeeper(rate, print_delay_threshold=100)
self.ts = 1./rate
self.cp = get_car_can_parser()
def close(self):
Plant.logcan.close()
Plant.model.close()
def speed_sensor(self, speed):
if speed<0.3:
return 0
else:
return speed
def current_time(self):
return float(self.rk.frame) / self.rate
def step(self, v_lead=0.0, cruise_buttons=None, grade=0.0, publish_model = True):
# dbc_f, sgs, ivs, msgs, cks_msgs, frqs = initialize_can_struct(self.civic, self.brake_only)
cp2 = get_can_parser(fake_car_params())
sgs = cp2._sgs
msgs = cp2._msgs
cks_msgs = cp2.msgs_ck
# ******** get messages sent to the car ********
can_msgs = []
for a in messaging.drain_sock(Plant.sendcan):
can_msgs.extend(can_capnp_to_can_list(a.sendcan, [0,2]))
self.cp.update_can(can_msgs)
# ******** get live100 messages for plotting ***
live_msgs = []
for a in messaging.drain_sock(Plant.live100):
live_msgs.append(a.live100)
if self.cp.vl[0x1fa]['COMPUTER_BRAKE_REQUEST']:
brake = self.cp.vl[0x1fa]['COMPUTER_BRAKE']
else:
brake = 0.0
if self.cp.vl[0x200]['GAS_COMMAND'] > 0:
gas = self.cp.vl[0x200]['GAS_COMMAND'] / 256.0
else:
gas = 0.0
if self.cp.vl[0xe4]['STEER_TORQUE_REQUEST']:
steer_torque = self.cp.vl[0xe4]['STEER_TORQUE']*1.0/0xf00
else:
steer_torque = 0.0
distance_lead = self.distance_lead_prev + v_lead * self.ts
# ******** run the car ********
speed, acceleration = car_plant(self.distance_prev, self.speed_prev, grade, gas, brake)
standstill = (speed == 0)
distance = self.distance_prev + speed * self.ts
speed = self.speed_prev + self.ts * acceleration
if speed <= 0:
speed = 0
acceleration = 0
# ******** lateral ********
self.angle_steer -= (steer_torque/10.0) * self.ts
# *** radar model ***
if self.lead_relevancy:
d_rel = np.maximum(0., distance_lead - distance)
v_rel = v_lead - speed
else:
d_rel = 200.
v_rel = 0.
a_rel = 0
lateral_pos_rel = 0.
# print at 5hz
if (self.rk.frame%(self.rate/5)) == 0:
print "%6.2f m %6.2f m/s %6.2f m/s2 %.2f ang gas: %.2f brake: %.2f steer: %5.2f lead_rel: %6.2f m %6.2f m/s" % (distance, speed, acceleration, self.angle_steer, gas, brake, steer_torque, d_rel, v_rel)
# ******** publish the car ********
vls = [self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed),
self.angle_steer, 0, self.gear_choice, speed!=0,
0, 0, 0, 0,
self.v_cruise, not self.seatbelt, self.seatbelt, self.brake_pressed,
self.user_gas, cruise_buttons, self.esp_disabled, 0,
self.user_brake, self.steer_error, self.speed_sensor(speed), self.brake_error,
self.brake_error, self.gear_shifter, self.main_on, self.acc_status,
self.pedal_gas, self.cruise_setting,
# left_blinker, right_blinker, counter
0,0,0,
# interceptor_gas
0,0]
# TODO: publish each message at proper frequency
can_msgs = []
for msg in set(msgs):
msg_struct = {}
indxs = [i for i, x in enumerate(msgs) if msg == msgs[i]]
for i in indxs:
msg_struct[sgs[i]] = vls[i]
if msg in cks_msgs:
msg_struct["COUNTER"] = self.rk.frame % 4
msg_data = acura.encode(msg, msg_struct)
if msg in cks_msgs:
msg_data = fix(msg_data, msg)
can_msgs.append([msg, 0, msg_data, 0])
# add the radar message
# TODO: use the DBC
def to_3_byte(x):
return struct.pack("!H", int(x)).encode("hex")[1:]
def to_3s_byte(x):
return struct.pack("!h", int(x)).encode("hex")[1:]
radar_msg = to_3_byte(d_rel*16.0) + \
to_3_byte(int(lateral_pos_rel*16.0)&0x3ff) + \
to_3s_byte(int(v_rel*32.0)) + \
"0f00000"
can_msgs.append([0x445, 0, radar_msg.decode("hex"), 1])
Plant.logcan.send(can_list_to_can_capnp(can_msgs).to_bytes())
# ******** publish a fake model going straight ********
if publish_model:
md = messaging.new_message()
md.init('model')
md.model.frameId = 0
for x in [md.model.path, md.model.leftLane, md.model.rightLane]:
x.points = [0.0]*50
x.prob = 1.0
x.std = 1.0
# fake values?
Plant.model.send(md.to_bytes())
# ******** update prevs ********
self.speed = speed
self.distance = distance
self.distance_lead = distance_lead
self.speed_prev = speed
self.distance_prev = distance
self.distance_lead_prev = distance_lead
self.rk.keep_time()
return (distance, speed, acceleration, distance_lead, brake, gas, steer_torque, live_msgs)
# simple engage in standalone mode
def plant_thread(rate=100):
plant = Plant(rate)
while 1:
if plant.rk.frame%100 >= 20 and plant.rk.frame%100 <= 25:
cruise_buttons = CruiseButtons.RES_ACCEL
else:
cruise_buttons = 0
plant.step()
if __name__ == "__main__":
plant_thread()