@ -2,16 +2,20 @@
import os
import argparse
import threading
from inputs import get_gamepad
from inputs import UnpluggedError , get_gamepad
import math
import cereal . messaging as messaging
from openpilot . common . realtime import Ratekeeper
from cereal import messaging , car
from openpilot . common . realtime import DT_CTRL , Ratekeeper
from openpilot . common . numpy_fast import interp , clip
from openpilot . common . params import Params
from openpilot . common . swaglog import cloudlog
from openpilot . selfdrive . controls . lib . vehicle_model import VehicleModel
from openpilot . system . hardware import HARDWARE
from openpilot . tools . lib . kbhit import KBHit
JS_EXPO = 0.4
EXPO = 0.4
MAX_LAT_ACCEL = 2.5
class Keyboard :
@ -62,7 +66,12 @@ class Joystick:
self . cancel = False
def update ( self ) :
joystick_event = get_gamepad ( ) [ 0 ]
try :
joystick_event = get_gamepad ( ) [ 0 ]
except ( OSError , UnpluggedError ) :
self . axes_values = { ax : 0. for ax in self . axes_values }
return False
event = ( joystick_event . code , joystick_event . state )
# flip left trigger to negative accel
@ -80,21 +89,56 @@ class Joystick:
norm = - interp ( event [ 1 ] , [ self . min_axis_value [ event [ 0 ] ] , self . max_axis_value [ event [ 0 ] ] ] , [ - 1. , 1. ] )
norm = norm if abs ( norm ) > 0.03 else 0. # center can be noisy, deadzone of 3%
self . axes_values [ event [ 0 ] ] = JS_ EXPO * norm * * 3 + ( 1 - JS_ EXPO) * norm # less action near center for fine control
self . axes_values [ event [ 0 ] ] = EXPO * norm * * 3 + ( 1 - EXPO ) * norm # less action near center for fine control
else :
return False
return True
def send_thread ( joystick ) :
joystick_sock = messaging . pub_sock ( ' testJoystick ' )
params = Params ( )
cloudlog . info ( " joystickd is waiting for CarParams " )
CP = messaging . log_from_bytes ( params . get ( " CarParams " , block = True ) , car . CarParams )
VM = VehicleModel ( CP )
sm = messaging . SubMaster ( [ ' carState ' , ' onroadEvents ' , ' liveParameters ' , ' selfdriveState ' ] , frequency = 1. / DT_CTRL )
pm = messaging . PubMaster ( [ ' carControl ' , ' controlsState ' ] )
rk = Ratekeeper ( 100 , print_delay_threshold = None )
while 1 :
dat = messaging . new_message ( ' testJoystick ' )
dat . testJoystick . axes = [ joystick . axes_values [ a ] for a in joystick . axes_order ]
dat . testJoystick . buttons = [ joystick . cancel ]
joystick_sock . send ( dat . to_bytes ( ) )
print ( ' \n ' + ' , ' . join ( f ' { name } : { round ( v , 3 ) } ' for name , v in joystick . axes_values . items ( ) ) )
sm . update ( 0 )
joystick_axes = [ joystick . axes_values [ a ] for a in joystick . axes_order ]
if rk . frame % 20 == 0 :
print ( ' \n ' + ' , ' . join ( f ' { name } : { round ( v , 3 ) } ' for name , v in joystick . axes_values . items ( ) ) )
cc_msg = messaging . new_message ( ' carControl ' )
cc_msg . valid = True
CC = cc_msg . carControl
CC . enabled = sm [ ' selfdriveState ' ] . enabled
CC . latActive = sm [ ' selfdriveState ' ] . active and not sm [ ' carState ' ] . steerFaultTemporary and not sm [ ' carState ' ] . steerFaultPermanent
CC . longActive = CC . enabled and not any ( e . overrideLongitudinal for e in sm [ ' onroadEvents ' ] ) and CP . openpilotLongitudinalControl
actuators = CC . actuators
if CC . longActive :
actuators . accel = 4.0 * clip ( joystick_axes [ 0 ] , - 1 , 1 )
if CC . latActive :
max_curvature = MAX_LAT_ACCEL / max ( sm [ ' carState ' ] . vEgo * * 2 , 5 )
max_angle = math . degrees ( VM . get_steer_from_curvature ( max_curvature , sm [ ' carState ' ] . vEgo , sm [ ' liveParameters ' ] . roll ) )
actuators . steer = clip ( joystick_axes [ 1 ] , - 1 , 1 )
actuators . steeringAngleDeg , actuators . curvature = actuators . steer * max_angle , actuators . steer * - max_curvature
pm . send ( ' carControl ' , cc_msg )
cs_msg = messaging . new_message ( ' controlsState ' )
cs_msg . valid = True
controlsState = cs_msg . controlsState
controlsState . lateralControlState . init ( ' debugState ' )
pm . send ( ' controlsState ' , cs_msg )
rk . keep_time ( )
@ -105,6 +149,10 @@ def joystick_thread(joystick):
joystick . update ( )
def main ( ) :
joystick_thread ( Joystick ( ) )
if __name__ == ' __main__ ' :
parser = argparse . ArgumentParser ( description = ' Publishes events from your joystick to control your car. \n ' +
' openpilot must be offroad before starting joystickd. This tool supports ' +