#!/usr/bin/env python3 import os import argparse import threading import numpy as np from inputs import UnpluggedError, get_gamepad from cereal import messaging from openpilot.common.params import Params from openpilot.common.realtime import Ratekeeper from openpilot.system.hardware import HARDWARE from openpilot.tools.lib.kbhit import KBHit EXPO = 0.4 class Keyboard: def __init__(self): self.kb = KBHit() self.axis_increment = 0.05 # 5% of full actuation each key press self.axes_map = {'w': 'gb', 's': 'gb', 'a': 'steer', 'd': 'steer'} self.axes_values = {'gb': 0., 'steer': 0.} self.axes_order = ['gb', 'steer'] self.cancel = False def update(self): key = self.kb.getch().lower() self.cancel = False if key == 'r': self.axes_values = {ax: 0. for ax in self.axes_values} elif key == 'c': self.cancel = True elif key in self.axes_map: axis = self.axes_map[key] incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment self.axes_values[axis] = float(np.clip(self.axes_values[axis] + incr, -1, 1)) else: return False return True class Joystick: def __init__(self): # This class supports a PlayStation 5 DualSense controller on the comma 3X # TODO: find a way to get this from API or detect gamepad/PC, perhaps "inputs" doesn't support it self.cancel_button = 'BTN_NORTH' # BTN_NORTH=X/triangle if HARDWARE.get_device_type() == 'pc': accel_axis = 'ABS_Z' steer_axis = 'ABS_RX' # TODO: once the longcontrol API is finalized, we can replace this with outputting gas/brake and steering self.flip_map = {'ABS_RZ': accel_axis} else: accel_axis = 'ABS_RX' steer_axis = 'ABS_Z' self.flip_map = {'ABS_RY': accel_axis} self.min_axis_value = {accel_axis: 0., steer_axis: 0.} self.max_axis_value = {accel_axis: 255., steer_axis: 255.} self.axes_values = {accel_axis: 0., steer_axis: 0.} self.axes_order = [accel_axis, steer_axis] self.cancel = False def update(self): try: joystick_event = get_gamepad()[0] except (OSError, UnpluggedError): self.axes_values = {ax: 0. for ax in self.axes_values} return False event = (joystick_event.code, joystick_event.state) # flip left trigger to negative accel if event[0] in self.flip_map: event = (self.flip_map[event[0]], -event[1]) if event[0] == self.cancel_button: if event[1] == 1: self.cancel = True elif event[1] == 0: # state 0 is falling edge self.cancel = False elif event[0] in self.axes_values: self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]]) self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]]) norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.])) norm = norm if abs(norm) > 0.03 else 0. # center can be noisy, deadzone of 3% self.axes_values[event[0]] = EXPO * norm ** 3 + (1 - EXPO) * norm # less action near center for fine control else: return False return True def send_thread(joystick): pm = messaging.PubMaster(['testJoystick']) rk = Ratekeeper(100, print_delay_threshold=None) while True: if rk.frame % 20 == 0: print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items())) joystick_msg = messaging.new_message('testJoystick') joystick_msg.valid = True joystick_msg.testJoystick.axes = [joystick.axes_values[ax] for ax in joystick.axes_order] pm.send('testJoystick', joystick_msg) rk.keep_time() def joystick_control_thread(joystick): Params().put_bool('JoystickDebugMode', True) threading.Thread(target=send_thread, args=(joystick,), daemon=True).start() while True: joystick.update() def main(): joystick_control_thread(Joystick()) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Publishes events from your joystick to control your car.\n' + 'openpilot must be offroad before starting joystick_control. This tool supports ' + 'a PlayStation 5 DualSense controller on the comma 3X.', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--keyboard', action='store_true', help='Use your keyboard instead of a joystick') args = parser.parse_args() if not Params().get_bool("IsOffroad") and "ZMQ" not in os.environ: print("The car must be off before running joystick_control.") exit() print() if args.keyboard: print('Gas/brake control: `W` and `S` keys') print('Steering control: `A` and `D` keys') print('Buttons') print('- `R`: Resets axes') print('- `C`: Cancel cruise control') else: print('Using joystick, make sure to run cereal/messaging/bridge on your device if running over the network!') print('If not running on a comma device, the mapping may need to be adjusted.') joystick = Keyboard() if args.keyboard else Joystick() joystick_control_thread(joystick)