#!/usr/bin/env python3
import os
import argparse
import threading
from inputs import get_gamepad
import cereal . messaging as messaging
from openpilot . common . realtime import Ratekeeper
from openpilot . common . numpy_fast import interp , clip
from openpilot . common . params import Params
from openpilot . system . hardware import HARDWARE
from openpilot . tools . lib . kbhit import KBHit
JS_EXPO = 0.4
class Keyboard :
def __init__ ( self ) :
self . kb = KBHit ( )
self . axis_increment = 0.05 # 5% of full actuation each key press
self . axes_map = { ' w ' : ' gb ' , ' s ' : ' gb ' ,
' a ' : ' steer ' , ' d ' : ' steer ' }
self . axes_values = { ' gb ' : 0. , ' steer ' : 0. }
self . axes_order = [ ' gb ' , ' steer ' ]
self . cancel = False
def update ( self ) :
key = self . kb . getch ( ) . lower ( )
self . cancel = False
if key == ' r ' :
self . axes_values = { ax : 0. for ax in self . axes_values }
elif key == ' c ' :
self . cancel = True
elif key in self . axes_map :
axis = self . axes_map [ key ]
incr = self . axis_increment if key in [ ' w ' , ' a ' ] else - self . axis_increment
self . axes_values [ axis ] = clip ( self . axes_values [ axis ] + incr , - 1 , 1 )
else :
return False
return True
class Joystick :
def __init__ ( self ) :
# This class supports a PlayStation 5 DualSense controller on the comma 3X
# TODO: find a way to get this from API or detect gamepad/PC, perhaps "inputs" doesn't support it
self . cancel_button = ' BTN_NORTH ' # BTN_NORTH=X/triangle
if HARDWARE . get_device_type ( ) == ' pc ' :
accel_axis = ' ABS_Z '
steer_axis = ' ABS_RX '
# TODO: once the longcontrol API is finalized, we can replace this with outputting gas/brake and steering
self . flip_map = { ' ABS_RZ ' : accel_axis }
else :
accel_axis = ' ABS_RX '
steer_axis = ' ABS_Z '
self . flip_map = { ' ABS_RY ' : accel_axis }
self . min_axis_value = { accel_axis : 0. , steer_axis : 0. }
self . max_axis_value = { accel_axis : 255. , steer_axis : 255. }
self . axes_values = { accel_axis : 0. , steer_axis : 0. }
self . axes_order = [ accel_axis , steer_axis ]
self . cancel = False
def update ( self ) :
joystick_event = get_gamepad ( ) [ 0 ]
event = ( joystick_event . code , joystick_event . state )
# flip left trigger to negative accel
if event [ 0 ] in self . flip_map :
event = ( self . flip_map [ event [ 0 ] ] , - event [ 1 ] )
if event [ 0 ] == self . cancel_button :
if event [ 1 ] == 1 :
self . cancel = True
elif event [ 1 ] == 0 : # state 0 is falling edge
self . cancel = False
elif event [ 0 ] in self . axes_values :
self . max_axis_value [ event [ 0 ] ] = max ( event [ 1 ] , self . max_axis_value [ event [ 0 ] ] )
self . min_axis_value [ event [ 0 ] ] = min ( event [ 1 ] , self . min_axis_value [ event [ 0 ] ] )
norm = - interp ( event [ 1 ] , [ self . min_axis_value [ event [ 0 ] ] , self . max_axis_value [ event [ 0 ] ] ] , [ - 1. , 1. ] )
norm = norm if abs ( norm ) > 0.03 else 0. # center can be noisy, deadzone of 3%
self . axes_values [ event [ 0 ] ] = JS_EXPO * norm * * 3 + ( 1 - JS_EXPO ) * norm # less action near center for fine control
else :
return False
return True
def send_thread ( joystick ) :
joystick_sock = messaging . pub_sock ( ' testJoystick ' )
rk = Ratekeeper ( 100 , print_delay_threshold = None )
while 1 :
dat = messaging . new_message ( ' testJoystick ' )
dat . testJoystick . axes = [ joystick . axes_values [ a ] for a in joystick . axes_order ]
dat . testJoystick . buttons = [ joystick . cancel ]
joystick_sock . send ( dat . to_bytes ( ) )
print ( ' \n ' + ' , ' . join ( f ' { name } : { round ( v , 3 ) } ' for name , v in joystick . axes_values . items ( ) ) )
rk . keep_time ( )
def joystick_thread ( joystick ) :
Params ( ) . put_bool ( ' JoystickDebugMode ' , True )
threading . Thread ( target = send_thread , args = ( joystick , ) , daemon = True ) . start ( )
while True :
joystick . update ( )
if __name__ == ' __main__ ' :
parser = argparse . ArgumentParser ( description = ' Publishes events from your joystick to control your car. \n ' +
' openpilot must be offroad before starting joystickd. This tool supports ' +
' a PlayStation 5 DualSense controller on the comma 3X. ' ,
formatter_class = argparse . ArgumentDefaultsHelpFormatter )
parser . add_argument ( ' --keyboard ' , action = ' store_true ' , help = ' Use your keyboard instead of a joystick ' )
args = parser . parse_args ( )
if not Params ( ) . get_bool ( " IsOffroad " ) and " ZMQ " not in os . environ :
print ( " The car must be off before running joystickd. " )
exit ( )
print ( )
if args . keyboard :
print ( ' Gas/brake control: `W` and `S` keys ' )
print ( ' Steering control: `A` and `D` keys ' )
print ( ' Buttons ' )
print ( ' - `R`: Resets axes ' )
print ( ' - `C`: Cancel cruise control ' )
else :
print ( ' Using joystick, make sure to run cereal/messaging/bridge on your device if running over the network! ' )
print ( ' If not running on a comma device, the mapping may need to be adjusted. ' )
joystick = Keyboard ( ) if args . keyboard else Joystick ( )
joystick_thread ( joystick )