You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
8.4 KiB
275 lines
8.4 KiB
#!/usr/bin/env python
|
|
import os
|
|
import struct
|
|
|
|
import zmq
|
|
import numpy as np
|
|
|
|
from dbcs import DBC_PATH
|
|
|
|
from common.realtime import Ratekeeper
|
|
from common.services import service_list
|
|
|
|
import selfdrive.messaging as messaging
|
|
from selfdrive.config import CruiseButtons
|
|
from selfdrive.car.honda.hondacan import fix
|
|
from selfdrive.car.honda.carstate import get_can_parser
|
|
from selfdrive.boardd.boardd import can_capnp_to_can_list_old, can_capnp_to_can_list, can_list_to_can_capnp
|
|
|
|
from selfdrive.car.honda.can_parser import CANParser
|
|
|
|
|
|
from common.dbc import dbc
|
|
acura = dbc(os.path.join(DBC_PATH, "acura_ilx_2016_can.dbc"))
|
|
|
|
def car_plant(pos, speed, grade, gas, brake):
|
|
# vehicle parameters
|
|
mass = 1700
|
|
aero_cd = 0.3
|
|
force_peak = mass*3.
|
|
force_brake_peak = -mass*10. #1g
|
|
power_peak = 100000 # 100kW
|
|
speed_base = power_peak/force_peak
|
|
rolling_res = 0.01
|
|
g = 9.81
|
|
wheel_r = 0.31
|
|
frontal_area = 2.2
|
|
air_density = 1.225
|
|
gas_to_peak_linear_slope = 3.33
|
|
brake_to_peak_linear_slope = 0.2
|
|
creep_accel_v = [1., 0.]
|
|
creep_accel_bp = [0., 1.5]
|
|
|
|
#*** longitudinal model ***
|
|
# find speed where peak torque meets peak power
|
|
force_brake = brake * force_brake_peak * brake_to_peak_linear_slope
|
|
if speed < speed_base: # torque control
|
|
force_gas = gas * force_peak * gas_to_peak_linear_slope
|
|
else: # power control
|
|
force_gas = gas * power_peak / speed * gas_to_peak_linear_slope
|
|
|
|
force_grade = - grade * mass # positive grade means uphill
|
|
|
|
creep_accel = np.interp(speed, creep_accel_bp, creep_accel_v)
|
|
force_creep = creep_accel * mass
|
|
|
|
force_resistance = -(rolling_res * mass * g + 0.5 * speed**2 * aero_cd * air_density)
|
|
force = force_gas + force_brake + force_resistance + force_grade + force_creep
|
|
acceleration = force / mass
|
|
|
|
# TODO: lateral model
|
|
return speed, acceleration
|
|
|
|
def get_car_can_parser():
|
|
dbc_f = 'acura_ilx_2016_can.dbc'
|
|
signals = [
|
|
("STEER_TORQUE", 0xe4, 0),
|
|
("STEER_TORQUE_REQUEST", 0xe4, 0),
|
|
("COMPUTER_BRAKE", 0x1fa, 0),
|
|
("COMPUTER_BRAKE_REQUEST", 0x1fa, 0),
|
|
("GAS_COMMAND", 0x200, 0),
|
|
]
|
|
checks = [
|
|
(0xe4, 100),
|
|
(0x1fa, 50),
|
|
(0x200, 50),
|
|
]
|
|
return CANParser(dbc_f, signals, checks)
|
|
|
|
|
|
class Plant(object):
|
|
messaging_initialized = False
|
|
|
|
def __init__(self, lead_relevancy=False, rate=100, speed=0.0, distance_lead=2.0):
|
|
self.rate = rate
|
|
self.civic = False
|
|
self.brake_only = False
|
|
|
|
if not Plant.messaging_initialized:
|
|
context = zmq.Context()
|
|
Plant.logcan = messaging.pub_sock(context, service_list['can'].port)
|
|
Plant.sendcan = messaging.sub_sock(context, service_list['sendcan'].port)
|
|
Plant.model = messaging.pub_sock(context, service_list['model'].port)
|
|
Plant.live100 = messaging.sub_sock(context, service_list['live100'].port)
|
|
Plant.messaging_initialized = True
|
|
|
|
self.angle_steer = 0.
|
|
self.gear_choice = 0
|
|
self.speed, self.speed_prev = 0., 0.
|
|
|
|
self.esp_disabled = 0
|
|
self.main_on = 1
|
|
self.user_gas = 0
|
|
self.computer_brake,self.user_brake = 0,0
|
|
self.brake_pressed = 0
|
|
self.distance, self.distance_prev = 0., 0.
|
|
self.speed, self.speed_prev = speed, speed
|
|
self.steer_error, self.brake_error, self.steer_not_allowed = 0, 0, 0
|
|
self.gear_shifter = 4 # D gear
|
|
self.pedal_gas = 0
|
|
self.cruise_setting = 0
|
|
|
|
self.seatbelt, self.door_all_closed = True, True
|
|
self.steer_torque, self.v_cruise, self.acc_status = 0, 0, 0 # v_cruise is reported from can, not the one used for controls
|
|
|
|
self.lead_relevancy = lead_relevancy
|
|
|
|
# lead car
|
|
self.distance_lead, self.distance_lead_prev = distance_lead , distance_lead
|
|
|
|
self.rk = Ratekeeper(rate, print_delay_threshold=100)
|
|
self.ts = 1./rate
|
|
|
|
self.cp = get_car_can_parser()
|
|
|
|
def speed_sensor(self, speed):
|
|
if speed<0.3:
|
|
return 0
|
|
else:
|
|
return speed
|
|
|
|
def current_time(self):
|
|
return float(self.rk.frame) / self.rate
|
|
|
|
def step(self, v_lead=0.0, cruise_buttons=None, grade=0.0):
|
|
# dbc_f, sgs, ivs, msgs, cks_msgs, frqs = initialize_can_struct(self.civic, self.brake_only)
|
|
cp2 = get_can_parser(self.civic, self.brake_only)
|
|
sgs = cp2._sgs
|
|
msgs = cp2._msgs
|
|
cks_msgs = cp2.msgs_ck
|
|
|
|
# ******** get messages sent to the car ********
|
|
can_msgs = []
|
|
for a in messaging.drain_sock(Plant.sendcan):
|
|
can_msgs += can_capnp_to_can_list_old(a.sendcan, [0,2])
|
|
#print can_msgs
|
|
self.cp.update_can(can_msgs)
|
|
|
|
# ******** get live100 messages for plotting ***
|
|
live_msgs = []
|
|
for a in messaging.drain_sock(Plant.live100):
|
|
live_msgs.append(a.live100)
|
|
|
|
|
|
if self.cp.vl[0x1fa]['COMPUTER_BRAKE_REQUEST']:
|
|
brake = self.cp.vl[0x1fa]['COMPUTER_BRAKE']
|
|
else:
|
|
brake = 0.0
|
|
|
|
if self.cp.vl[0x200]['GAS_COMMAND'] > 0:
|
|
gas = self.cp.vl[0x200]['GAS_COMMAND'] / 256.0
|
|
else:
|
|
gas = 0.0
|
|
|
|
if self.cp.vl[0xe4]['STEER_TORQUE_REQUEST']:
|
|
steer_torque = self.cp.vl[0xe4]['STEER_TORQUE']*1.0/0xf00
|
|
else:
|
|
steer_torque = 0.0
|
|
|
|
distance_lead = self.distance_lead_prev + v_lead * self.ts
|
|
|
|
# ******** run the car ********
|
|
speed, acceleration = car_plant(self.distance_prev, self.speed_prev, grade, gas, brake)
|
|
standstill = (speed == 0)
|
|
distance = self.distance_prev + speed * self.ts
|
|
speed = self.speed_prev + self.ts * acceleration
|
|
if speed <= 0:
|
|
speed = 0
|
|
acceleration = 0
|
|
|
|
# ******** lateral ********
|
|
self.angle_steer -= steer_torque
|
|
|
|
# *** radar model ***
|
|
if self.lead_relevancy:
|
|
d_rel = np.maximum(0., self.distance_lead - distance)
|
|
v_rel = v_lead - speed
|
|
else:
|
|
d_rel = 200.
|
|
v_rel = 0.
|
|
a_rel = 0
|
|
lateral_pos_rel = 0.
|
|
|
|
# print at 5hz
|
|
if (self.rk.frame%(self.rate/5)) == 0:
|
|
print "%6.2f m %6.2f m/s %6.2f m/s2 %.2f ang gas: %.2f brake: %.2f steer: %5.2f lead_rel: %6.2f m %6.2f m/s" % (distance, speed, acceleration, self.angle_steer, gas, brake, steer_torque, d_rel, v_rel)
|
|
|
|
# ******** publish the car ********
|
|
vls = [self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed), self.speed_sensor(speed),
|
|
self.angle_steer, 0, self.gear_choice, speed!=0,
|
|
0, 0, 0, 0,
|
|
self.v_cruise, not self.seatbelt, self.seatbelt, self.brake_pressed,
|
|
self.user_gas, cruise_buttons, self.esp_disabled, 0,
|
|
self.user_brake, self.steer_error, self.speed_sensor(speed), self.brake_error,
|
|
self.brake_error, self.gear_shifter, self.main_on, self.acc_status,
|
|
self.pedal_gas, self.cruise_setting,
|
|
# left_blinker, right_blinker, counter
|
|
0,0,0,
|
|
# interceptor_gas
|
|
0,0]
|
|
|
|
|
|
# TODO: publish each message at proper frequency
|
|
can_msgs = []
|
|
for msg in set(msgs):
|
|
msg_struct = {}
|
|
indxs = [i for i, x in enumerate(msgs) if msg == msgs[i]]
|
|
for i in indxs:
|
|
msg_struct[sgs[i]] = vls[i]
|
|
if msg in cks_msgs:
|
|
msg_struct["COUNTER"] = self.rk.frame % 4
|
|
|
|
msg_data = acura.encode(msg, msg_struct)
|
|
|
|
if msg in cks_msgs:
|
|
msg_data = fix(msg_data, msg)
|
|
|
|
can_msgs.append([msg, 0, msg_data, 0])
|
|
|
|
# add the radar message
|
|
# TODO: use the DBC
|
|
def to_3_byte(x):
|
|
return struct.pack("!H", int(x)).encode("hex")[1:]
|
|
|
|
def to_3s_byte(x):
|
|
return struct.pack("!h", int(x)).encode("hex")[1:]
|
|
radar_msg = to_3_byte(d_rel*16.0) + \
|
|
to_3_byte(int(lateral_pos_rel*16.0)&0x3ff) + \
|
|
to_3s_byte(int(v_rel*32.0)) + \
|
|
"0f00000"
|
|
can_msgs.append([0x445, 0, radar_msg.decode("hex"), 1])
|
|
Plant.logcan.send(can_list_to_can_capnp(can_msgs).to_bytes())
|
|
|
|
# ******** publish a fake model going straight ********
|
|
md = messaging.new_message()
|
|
md.init('model')
|
|
md.model.frameId = 0
|
|
for x in [md.model.path, md.model.leftLane, md.model.rightLane]:
|
|
x.points = [0.0]*50
|
|
x.prob = 1.0
|
|
x.std = 1.0
|
|
# fake values?
|
|
Plant.model.send(md.to_bytes())
|
|
|
|
|
|
# ******** update prevs ********
|
|
self.speed_prev = speed
|
|
self.distance_prev = distance
|
|
self.distance_lead_prev = distance_lead
|
|
|
|
self.rk.keep_time()
|
|
return (distance, speed, acceleration, distance_lead, brake, gas, steer_torque, live_msgs)
|
|
|
|
# simple engage in standalone mode
|
|
def plant_thread(rate=100):
|
|
plant = Plant(rate)
|
|
while 1:
|
|
if plant.rk.frame%100 >= 20 and plant.rk.frame%100 <= 25:
|
|
cruise_buttons = CruiseButtons.RES_ACCEL
|
|
else:
|
|
cruise_buttons = 0
|
|
plant.step()
|
|
|
|
if __name__ == "__main__":
|
|
plant_thread()
|
|
|
|
|