You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					148 lines
				
				5.1 KiB
			
		
		
			
		
	
	
					148 lines
				
				5.1 KiB
			| 
								 
											1 year ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 | 
							
								import os
							 | 
						||
| 
								 | 
							
								import argparse
							 | 
						||
| 
								 | 
							
								import threading
							 | 
						||
| 
								 | 
							
								from inputs import UnpluggedError, get_gamepad
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from cereal import messaging
							 | 
						||
| 
								 | 
							
								from openpilot.common.numpy_fast import interp, clip
							 | 
						||
| 
								 | 
							
								from openpilot.common.params import Params
							 | 
						||
| 
								 | 
							
								from openpilot.common.realtime import Ratekeeper
							 | 
						||
| 
								 | 
							
								from openpilot.system.hardware import HARDWARE
							 | 
						||
| 
								 | 
							
								from openpilot.tools.lib.kbhit import KBHit
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								EXPO = 0.4
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class Keyboard:
							 | 
						||
| 
								 | 
							
								  def __init__(self):
							 | 
						||
| 
								 | 
							
								    self.kb = KBHit()
							 | 
						||
| 
								 | 
							
								    self.axis_increment = 0.05  # 5% of full actuation each key press
							 | 
						||
| 
								 | 
							
								    self.axes_map = {'w': 'gb', 's': 'gb',
							 | 
						||
| 
								 | 
							
								                     'a': 'steer', 'd': 'steer'}
							 | 
						||
| 
								 | 
							
								    self.axes_values = {'gb': 0., 'steer': 0.}
							 | 
						||
| 
								 | 
							
								    self.axes_order = ['gb', 'steer']
							 | 
						||
| 
								 | 
							
								    self.cancel = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def update(self):
							 | 
						||
| 
								 | 
							
								    key = self.kb.getch().lower()
							 | 
						||
| 
								 | 
							
								    self.cancel = False
							 | 
						||
| 
								 | 
							
								    if key == 'r':
							 | 
						||
| 
								 | 
							
								      self.axes_values = {ax: 0. for ax in self.axes_values}
							 | 
						||
| 
								 | 
							
								    elif key == 'c':
							 | 
						||
| 
								 | 
							
								      self.cancel = True
							 | 
						||
| 
								 | 
							
								    elif key in self.axes_map:
							 | 
						||
| 
								 | 
							
								      axis = self.axes_map[key]
							 | 
						||
| 
								 | 
							
								      incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment
							 | 
						||
| 
								 | 
							
								      self.axes_values[axis] = clip(self.axes_values[axis] + incr, -1, 1)
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								      return False
							 | 
						||
| 
								 | 
							
								    return True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class Joystick:
							 | 
						||
| 
								 | 
							
								  def __init__(self):
							 | 
						||
| 
								 | 
							
								    # This class supports a PlayStation 5 DualSense controller on the comma 3X
							 | 
						||
| 
								 | 
							
								    # TODO: find a way to get this from API or detect gamepad/PC, perhaps "inputs" doesn't support it
							 | 
						||
| 
								 | 
							
								    self.cancel_button = 'BTN_NORTH'  # BTN_NORTH=X/triangle
							 | 
						||
| 
								 | 
							
								    if HARDWARE.get_device_type() == 'pc':
							 | 
						||
| 
								 | 
							
								      accel_axis = 'ABS_Z'
							 | 
						||
| 
								 | 
							
								      steer_axis = 'ABS_RX'
							 | 
						||
| 
								 | 
							
								      # TODO: once the longcontrol API is finalized, we can replace this with outputting gas/brake and steering
							 | 
						||
| 
								 | 
							
								      self.flip_map = {'ABS_RZ': accel_axis}
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								      accel_axis = 'ABS_RX'
							 | 
						||
| 
								 | 
							
								      steer_axis = 'ABS_Z'
							 | 
						||
| 
								 | 
							
								      self.flip_map = {'ABS_RY': accel_axis}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    self.min_axis_value = {accel_axis: 0., steer_axis: 0.}
							 | 
						||
| 
								 | 
							
								    self.max_axis_value = {accel_axis: 255., steer_axis: 255.}
							 | 
						||
| 
								 | 
							
								    self.axes_values = {accel_axis: 0., steer_axis: 0.}
							 | 
						||
| 
								 | 
							
								    self.axes_order = [accel_axis, steer_axis]
							 | 
						||
| 
								 | 
							
								    self.cancel = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def update(self):
							 | 
						||
| 
								 | 
							
								    try:
							 | 
						||
| 
								 | 
							
								      joystick_event = get_gamepad()[0]
							 | 
						||
| 
								 | 
							
								    except (OSError, UnpluggedError):
							 | 
						||
| 
								 | 
							
								      self.axes_values = {ax: 0. for ax in self.axes_values}
							 | 
						||
| 
								 | 
							
								      return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    event = (joystick_event.code, joystick_event.state)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # flip left trigger to negative accel
							 | 
						||
| 
								 | 
							
								    if event[0] in self.flip_map:
							 | 
						||
| 
								 | 
							
								      event = (self.flip_map[event[0]], -event[1])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    if event[0] == self.cancel_button:
							 | 
						||
| 
								 | 
							
								      if event[1] == 1:
							 | 
						||
| 
								 | 
							
								        self.cancel = True
							 | 
						||
| 
								 | 
							
								      elif event[1] == 0:   # state 0 is falling edge
							 | 
						||
| 
								 | 
							
								        self.cancel = False
							 | 
						||
| 
								 | 
							
								    elif event[0] in self.axes_values:
							 | 
						||
| 
								 | 
							
								      self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]])
							 | 
						||
| 
								 | 
							
								      self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      norm = -interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.])
							 | 
						||
| 
								 | 
							
								      norm = norm if abs(norm) > 0.03 else 0.  # center can be noisy, deadzone of 3%
							 | 
						||
| 
								 | 
							
								      self.axes_values[event[0]] = EXPO * norm ** 3 + (1 - EXPO) * norm  # less action near center for fine control
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								      return False
							 | 
						||
| 
								 | 
							
								    return True
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def send_thread(joystick):
							 | 
						||
| 
								 | 
							
								  pm = messaging.PubMaster(['testJoystick'])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  rk = Ratekeeper(100, print_delay_threshold=None)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  while True:
							 | 
						||
| 
								 | 
							
								    if rk.frame % 20 == 0:
							 | 
						||
| 
								 | 
							
								      print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items()))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    joystick_msg = messaging.new_message('testJoystick')
							 | 
						||
| 
								 | 
							
								    joystick_msg.valid = True
							 | 
						||
| 
								 | 
							
								    joystick_msg.testJoystick.axes = [joystick.axes_values[ax] for ax in joystick.axes_order]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    pm.send('testJoystick', joystick_msg)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    rk.keep_time()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def joystick_control_thread(joystick):
							 | 
						||
| 
								 | 
							
								  Params().put_bool('JoystickDebugMode', True)
							 | 
						||
| 
								 | 
							
								  threading.Thread(target=send_thread, args=(joystick,), daemon=True).start()
							 | 
						||
| 
								 | 
							
								  while True:
							 | 
						||
| 
								 | 
							
								    joystick.update()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def main():
							 | 
						||
| 
								 | 
							
								  joystick_control_thread(Joystick())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == '__main__':
							 | 
						||
| 
								 | 
							
								  parser = argparse.ArgumentParser(description='Publishes events from your joystick to control your car.\n' +
							 | 
						||
| 
								 | 
							
								                                               'openpilot must be offroad before starting joystick_control. This tool supports ' +
							 | 
						||
| 
								 | 
							
								                                               'a PlayStation 5 DualSense controller on the comma 3X.',
							 | 
						||
| 
								 | 
							
								                                   formatter_class=argparse.ArgumentDefaultsHelpFormatter)
							 | 
						||
| 
								 | 
							
								  parser.add_argument('--keyboard', action='store_true', help='Use your keyboard instead of a joystick')
							 | 
						||
| 
								 | 
							
								  args = parser.parse_args()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if not Params().get_bool("IsOffroad") and "ZMQ" not in os.environ:
							 | 
						||
| 
								 | 
							
								    print("The car must be off before running joystick_control.")
							 | 
						||
| 
								 | 
							
								    exit()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  print()
							 | 
						||
| 
								 | 
							
								  if args.keyboard:
							 | 
						||
| 
								 | 
							
								    print('Gas/brake control: `W` and `S` keys')
							 | 
						||
| 
								 | 
							
								    print('Steering control: `A` and `D` keys')
							 | 
						||
| 
								 | 
							
								    print('Buttons')
							 | 
						||
| 
								 | 
							
								    print('- `R`: Resets axes')
							 | 
						||
| 
								 | 
							
								    print('- `C`: Cancel cruise control')
							 | 
						||
| 
								 | 
							
								  else:
							 | 
						||
| 
								 | 
							
								    print('Using joystick, make sure to run cereal/messaging/bridge on your device if running over the network!')
							 | 
						||
| 
								 | 
							
								    print('If not running on a comma device, the mapping may need to be adjusted.')
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  joystick = Keyboard() if args.keyboard else Joystick()
							 | 
						||
| 
								 | 
							
								  joystick_control_thread(joystick)
							 |