|
|
@ -1,10 +1,11 @@ |
|
|
|
#!/usr/bin/env python |
|
|
|
#!/usr/bin/env python |
|
|
|
import os |
|
|
|
import os |
|
|
|
import time |
|
|
|
|
|
|
|
import argparse |
|
|
|
import argparse |
|
|
|
|
|
|
|
import threading |
|
|
|
from inputs import get_gamepad |
|
|
|
from inputs import get_gamepad |
|
|
|
|
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
import cereal.messaging as messaging |
|
|
|
|
|
|
|
from common.realtime import Ratekeeper |
|
|
|
from common.numpy_fast import interp, clip |
|
|
|
from common.numpy_fast import interp, clip |
|
|
|
from common.params import Params |
|
|
|
from common.params import Params |
|
|
|
from tools.lib.kbhit import KBHit |
|
|
|
from tools.lib.kbhit import KBHit |
|
|
@ -17,6 +18,8 @@ class Keyboard: |
|
|
|
self.axes_map = {'w': 'gb', 's': 'gb', |
|
|
|
self.axes_map = {'w': 'gb', 's': 'gb', |
|
|
|
'a': 'steer', 'd': 'steer'} |
|
|
|
'a': 'steer', 'd': 'steer'} |
|
|
|
self.axes_values = {'gb': 0., 'steer': 0.} |
|
|
|
self.axes_values = {'gb': 0., 'steer': 0.} |
|
|
|
|
|
|
|
self.axes_order = ['gb', 'steer'] |
|
|
|
|
|
|
|
self.cancel = False |
|
|
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
def update(self): |
|
|
|
key = self.kb.getch().lower() |
|
|
|
key = self.kb.getch().lower() |
|
|
@ -41,13 +44,17 @@ class Joystick: |
|
|
|
self.max_axis_value = {'ABS_Y': 1023., 'ABS_RZ': 255.} |
|
|
|
self.max_axis_value = {'ABS_Y': 1023., 'ABS_RZ': 255.} |
|
|
|
self.cancel_button = 'BTN_TRIGGER' |
|
|
|
self.cancel_button = 'BTN_TRIGGER' |
|
|
|
self.axes_values = {'ABS_Y': 0., 'ABS_RZ': 0.} # gb, steer |
|
|
|
self.axes_values = {'ABS_Y': 0., 'ABS_RZ': 0.} # gb, steer |
|
|
|
|
|
|
|
self.axes_order = ['ABS_Y', 'ABS_RZ'] |
|
|
|
|
|
|
|
self.cancel = False |
|
|
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
def update(self): |
|
|
|
joystick_event = get_gamepad()[0] |
|
|
|
joystick_event = get_gamepad()[0] |
|
|
|
event = (joystick_event.code, joystick_event.state) |
|
|
|
event = (joystick_event.code, joystick_event.state) |
|
|
|
self.cancel = False |
|
|
|
if event[0] == self.cancel_button: |
|
|
|
if event[0] == self.cancel_button and event[1] == 0: # state 0 is falling edge |
|
|
|
if event[1] == 1: |
|
|
|
self.cancel = True |
|
|
|
self.cancel = True |
|
|
|
|
|
|
|
elif event[1] == 0: # state 0 is falling edge |
|
|
|
|
|
|
|
self.cancel = False |
|
|
|
elif event[0] in self.axes_values: |
|
|
|
elif event[0] in self.axes_values: |
|
|
|
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]]) |
|
|
|
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]]) |
|
|
|
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]]) |
|
|
|
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]]) |
|
|
@ -59,25 +66,26 @@ class Joystick: |
|
|
|
return True |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_thread(joystick): |
|
|
|
|
|
|
|
joystick_sock = messaging.pub_sock('testJoystick') |
|
|
|
|
|
|
|
rk = Ratekeeper(100, print_delay_threshold=None) |
|
|
|
|
|
|
|
while 1: |
|
|
|
|
|
|
|
dat = messaging.new_message('testJoystick') |
|
|
|
|
|
|
|
dat.testJoystick.axes = [joystick.axes_values[a] for a in joystick.axes_order] |
|
|
|
|
|
|
|
dat.testJoystick.buttons = [joystick.cancel] |
|
|
|
|
|
|
|
joystick_sock.send(dat.to_bytes()) |
|
|
|
|
|
|
|
print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items())) |
|
|
|
|
|
|
|
if "WEB" in os.environ: |
|
|
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
requests.get("http://"+os.environ["WEB"]+":5000/control/%f/%f" % tuple([joystick.axes_values[a] for a in joystick.axes_order][::-1])) |
|
|
|
|
|
|
|
rk.keep_time() |
|
|
|
|
|
|
|
|
|
|
|
def joystick_thread(use_keyboard): |
|
|
|
def joystick_thread(use_keyboard): |
|
|
|
Params().put_bool('JoystickDebugMode', True) |
|
|
|
Params().put_bool('JoystickDebugMode', True) |
|
|
|
joystick_sock = messaging.pub_sock('testJoystick') |
|
|
|
|
|
|
|
joystick = Keyboard() if use_keyboard else Joystick() |
|
|
|
joystick = Keyboard() if use_keyboard else Joystick() |
|
|
|
|
|
|
|
threading.Thread(target=send_thread, args=(joystick,), daemon=True).start() |
|
|
|
last_update = 0 |
|
|
|
|
|
|
|
while True: |
|
|
|
while True: |
|
|
|
ret = joystick.update() |
|
|
|
joystick.update() |
|
|
|
if ret: |
|
|
|
|
|
|
|
dat = messaging.new_message('testJoystick') |
|
|
|
|
|
|
|
dat.testJoystick.axes = [joystick.axes_values[a] for a in joystick.axes_values] |
|
|
|
|
|
|
|
dat.testJoystick.buttons = [joystick.cancel] |
|
|
|
|
|
|
|
joystick_sock.send(dat.to_bytes()) |
|
|
|
|
|
|
|
print('\n' + ', '.join(f'{name}: {round(v, 3)}' for name, v in joystick.axes_values.items())) |
|
|
|
|
|
|
|
if "WEB" in os.environ and (last_update + 0.02) < time.time(): |
|
|
|
|
|
|
|
import requests |
|
|
|
|
|
|
|
requests.get("http://"+os.environ["WEB"]+":5000/control/%f/%f" % (joystick.axes_values['ABS_RZ'], joystick.axes_values['ABS_Y'])) |
|
|
|
|
|
|
|
last_update = time.time() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
if __name__ == '__main__': |
|
|
|
parser = argparse.ArgumentParser(description='Publishes events from your joystick to control your car.\n' + |
|
|
|
parser = argparse.ArgumentParser(description='Publishes events from your joystick to control your car.\n' + |
|
|
|