openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

80 lines
2.9 KiB

#!/usr/bin/env python3
import math
import numpy as np
from cereal import messaging, car
from openpilot.common.realtime import DT_CTRL, Ratekeeper
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
LongCtrlState = car.CarControl.Actuators.LongControlState
MAX_LAT_ACCEL = 2.5
def joystickd_thread():
params = Params()
cloudlog.info("joystickd is waiting for CarParams")
CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
VM = VehicleModel(CP)
sm = messaging.SubMaster(['carState', 'onroadEvents', 'liveParameters', 'selfdriveState', 'testJoystick'], frequency=1. / DT_CTRL)
pm = messaging.PubMaster(['carControl', 'controlsState'])
rk = Ratekeeper(100, print_delay_threshold=None)
while 1:
sm.update(0)
cc_msg = messaging.new_message('carControl')
cc_msg.valid = True
CC = cc_msg.carControl
CC.enabled = sm['selfdriveState'].enabled
CC.latActive = sm['selfdriveState'].active and not sm['carState'].steerFaultTemporary and not sm['carState'].steerFaultPermanent
CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in sm['onroadEvents']) and CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = sm['carState'].cruiseState.enabled and (not CC.enabled or not CP.pcmCruise)
CC.hudControl.leadDistanceBars = 2
actuators = CC.actuators
# reset joystick if it hasn't been received in a while
should_reset_joystick = sm.recv_frame['testJoystick'] == 0 or (sm.frame - sm.recv_frame['testJoystick'])*DT_CTRL > 0.2
if not should_reset_joystick:
joystick_axes = sm['testJoystick'].axes
else:
joystick_axes = [0.0, 0.0]
if CC.longActive:
actuators.accel = 4.0 * float(np.clip(joystick_axes[0], -1, 1))
actuators.longControlState = LongCtrlState.pid if sm['carState'].vEgo > CP.vEgoStopping else LongCtrlState.stopping
if CC.latActive:
max_curvature = MAX_LAT_ACCEL / max(sm['carState'].vEgo ** 2, 5)
max_angle = math.degrees(VM.get_steer_from_curvature(max_curvature, sm['carState'].vEgo, sm['liveParameters'].roll))
actuators.steer = float(np.clip(joystick_axes[1], -1, 1))
actuators.steeringAngleDeg, actuators.curvature = actuators.steer * max_angle, actuators.steer * -max_curvature
pm.send('carControl', cc_msg)
cs_msg = messaging.new_message('controlsState')
cs_msg.valid = True
controlsState = cs_msg.controlsState
controlsState.lateralControlState.init('debugState')
lp = sm['liveParameters']
steer_angle_without_offset = math.radians(sm['carState'].steeringAngleDeg - lp.angleOffsetDeg)
controlsState.curvature = -VM.calc_curvature(steer_angle_without_offset, sm['carState'].vEgo, lp.roll)
pm.send('controlsState', cs_msg)
rk.keep_time()
def main():
joystickd_thread()
if __name__ == "__main__":
main()