openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
5.1 KiB

#!/usr/bin/env python3
import os
import argparse
import threading
import numpy as np
from inputs import UnpluggedError, get_gamepad
from cereal import messaging
from openpilot.common.params import Params
from openpilot.common.realtime import Ratekeeper
from openpilot.system.hardware import HARDWARE
from openpilot.tools.lib.kbhit import KBHit
EXPO = 0.4
class Keyboard:
def __init__(self):
self.kb = KBHit()
self.axis_increment = 0.05 # 5% of full actuation each key press
self.axes_map = {'w': 'gb', 's': 'gb',
'a': 'steer', 'd': 'steer'}
self.axes_values = {'gb': 0., 'steer': 0.}
self.axes_order = ['gb', 'steer']
self.cancel = False
def update(self):
key = self.kb.getch().lower()
self.cancel = False
if key == 'r':
self.axes_values = {ax: 0. for ax in self.axes_values}
elif key == 'c':
self.cancel = True
elif key in self.axes_map:
axis = self.axes_map[key]
incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment
self.axes_values[axis] = np.clip(self.axes_values[axis] + incr, -1, 1)
else:
return False
return True
class Joystick:
def __init__(self):
# This class supports a PlayStation 5 DualSense controller on the comma 3X
# TODO: find a way to get this from API or detect gamepad/PC, perhaps "inputs" doesn't support it
self.cancel_button = 'BTN_NORTH' # BTN_NORTH=X/triangle
if HARDWARE.get_device_type() == 'pc':
accel_axis = 'ABS_Z'
steer_axis = 'ABS_RX'
# TODO: once the longcontrol API is finalized, we can replace this with outputting gas/brake and steering
self.flip_map = {'ABS_RZ': accel_axis}
else:
accel_axis = 'ABS_RX'
steer_axis = 'ABS_Z'
self.flip_map = {'ABS_RY': accel_axis}
self.min_axis_value = {accel_axis: 0., steer_axis: 0.}
self.max_axis_value = {accel_axis: 255., steer_axis: 255.}
self.axes_values = {accel_axis: 0., steer_axis: 0.}
self.axes_order = [accel_axis, steer_axis]
self.cancel = False
def update(self):
try:
joystick_event = get_gamepad()[0]
except (OSError, UnpluggedError):
self.axes_values = {ax: 0. for ax in self.axes_values}
return False
event = (joystick_event.code, joystick_event.state)
# flip left trigger to negative accel
if event[0] in self.flip_map:
event = (self.flip_map[event[0]], -event[1])
if event[0] == self.cancel_button:
if event[1] == 1:
self.cancel = True
elif event[1] == 0: # state 0 is falling edge
self.cancel = False
elif event[0] in self.axes_values:
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]])
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]])
norm = -np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.])
norm = norm if abs(norm) > 0.03 else 0. # center can be noisy, deadzone of 3%
self.axes_values[event[0]] = EXPO * norm ** 3 + (1 - EXPO) * norm # less action near center for fine control
else:
return False
return True
def send_thread(joystick):
pm = messaging.PubMaster(['testJoystick'])
rk = Ratekeeper(100, print_delay_threshold=None)
while True:
if rk.frame % 20 == 0:
print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items()))
joystick_msg = messaging.new_message('testJoystick')
joystick_msg.valid = True
joystick_msg.testJoystick.axes = [joystick.axes_values[ax] for ax in joystick.axes_order]
pm.send('testJoystick', joystick_msg)
rk.keep_time()
def joystick_control_thread(joystick):
Params().put_bool('JoystickDebugMode', True)
threading.Thread(target=send_thread, args=(joystick,), daemon=True).start()
while True:
joystick.update()
def main():
joystick_control_thread(Joystick())
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Publishes events from your joystick to control your car.\n' +
'openpilot must be offroad before starting joystick_control. This tool supports ' +
'a PlayStation 5 DualSense controller on the comma 3X.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--keyboard', action='store_true', help='Use your keyboard instead of a joystick')
args = parser.parse_args()
if not Params().get_bool("IsOffroad") and "ZMQ" not in os.environ:
print("The car must be off before running joystick_control.")
exit()
print()
if args.keyboard:
print('Gas/brake control: `W` and `S` keys')
print('Steering control: `A` and `D` keys')
print('Buttons')
print('- `R`: Resets axes')
print('- `C`: Cancel cruise control')
else:
print('Using joystick, make sure to run cereal/messaging/bridge on your device if running over the network!')
print('If not running on a comma device, the mapping may need to be adjusted.')
joystick = Keyboard() if args.keyboard else Joystick()
joystick_control_thread(joystick)