openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
4.9 KiB

#!/usr/bin/env python3
# set up wheel
import os, struct, array
from fcntl import ioctl
# Iterate over the joystick devices.
print('Available devices:')
for fn in os.listdir('/dev/input'):
if fn.startswith('js'):
print(' /dev/input/%s' % (fn))
# We'll store the states here.
axis_states = {}
button_states = {}
# These constants were borrowed from linux/input.h
axis_names = {
0x00 : 'x',
0x01 : 'y',
0x02 : 'z',
0x03 : 'rx',
0x04 : 'ry',
0x05 : 'rz',
0x06 : 'trottle',
0x07 : 'rudder',
0x08 : 'wheel',
0x09 : 'gas',
0x0a : 'brake',
0x10 : 'hat0x',
0x11 : 'hat0y',
0x12 : 'hat1x',
0x13 : 'hat1y',
0x14 : 'hat2x',
0x15 : 'hat2y',
0x16 : 'hat3x',
0x17 : 'hat3y',
0x18 : 'pressure',
0x19 : 'distance',
0x1a : 'tilt_x',
0x1b : 'tilt_y',
0x1c : 'tool_width',
0x20 : 'volume',
0x28 : 'misc',
}
button_names = {
0x120 : 'trigger',
0x121 : 'thumb',
0x122 : 'thumb2',
0x123 : 'top',
0x124 : 'top2',
0x125 : 'pinkie',
0x126 : 'base',
0x127 : 'base2',
0x128 : 'base3',
0x129 : 'base4',
0x12a : 'base5',
0x12b : 'base6',
0x12f : 'dead',
0x130 : 'a',
0x131 : 'b',
0x132 : 'c',
0x133 : 'x',
0x134 : 'y',
0x135 : 'z',
0x136 : 'tl',
0x137 : 'tr',
0x138 : 'tl2',
0x139 : 'tr2',
0x13a : 'select',
0x13b : 'start',
0x13c : 'mode',
0x13d : 'thumbl',
0x13e : 'thumbr',
0x220 : 'dpad_up',
0x221 : 'dpad_down',
0x222 : 'dpad_left',
0x223 : 'dpad_right',
# XBox 360 controller uses these codes.
0x2c0 : 'dpad_left',
0x2c1 : 'dpad_right',
0x2c2 : 'dpad_up',
0x2c3 : 'dpad_down',
}
axis_map = []
button_map = []
def wheel_poll_thread():
# Open the joystick device.
fn = '/dev/input/js0'
print('Opening %s...' % fn)
jsdev = open(fn, 'rb')
# Get the device name.
#buf = bytearray(63)
buf = array.array('B', [0] * 64)
ioctl(jsdev, 0x80006a13 + (0x10000 * len(buf)), buf) # JSIOCGNAME(len)
js_name = buf.tobytes().rstrip(b'\x00').decode('utf-8')
print('Device name: %s' % js_name)
# Get number of axes and buttons.
buf = array.array('B', [0])
ioctl(jsdev, 0x80016a11, buf) # JSIOCGAXES
num_axes = buf[0]
buf = array.array('B', [0])
ioctl(jsdev, 0x80016a12, buf) # JSIOCGBUTTONS
num_buttons = buf[0]
# Get the axis map.
buf = array.array('B', [0] * 0x40)
ioctl(jsdev, 0x80406a32, buf) # JSIOCGAXMAP
for axis in buf[:num_axes]:
axis_name = axis_names.get(axis, 'unknown(0x%02x)' % axis)
axis_map.append(axis_name)
axis_states[axis_name] = 0.0
# Get the button map.
buf = array.array('H', [0] * 200)
ioctl(jsdev, 0x80406a34, buf) # JSIOCGBTNMAP
for btn in buf[:num_buttons]:
btn_name = button_names.get(btn, 'unknown(0x%03x)' % btn)
button_map.append(btn_name)
button_states[btn_name] = 0
print('%d axes found: %s' % (num_axes, ', '.join(axis_map)))
print('%d buttons found: %s' % (num_buttons, ', '.join(button_map)))
# Enable FF
import evdev
from evdev import ecodes, InputDevice, ff
device = evdev.list_devices()[0]
evtdev = InputDevice(device)
val = 24000
evtdev.write(ecodes.EV_FF, ecodes.FF_AUTOCENTER, val)
# Set up zmq sock
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ) # block on send
socket.bind('tcp://127.0.0.1:4444')
while True:
evbuf = jsdev.read(8)
time, value, mtype, number = struct.unpack('IhBB', evbuf)
# print(mtype, number, value)
if mtype & 0x02: # wheel & paddles
axis = axis_map[number]
if axis == "z": # gas
fvalue = value / 32767.0
axis_states[axis] = fvalue
normalized = (1 - fvalue) * 50
socket.send_string(str("throttle_%f" % normalized))
_ = socket.recv()
if axis == "rz": # brake
fvalue = value / 32767.0
axis_states[axis] = fvalue
normalized = (1 - fvalue) * 50
socket.send_string(str("brake_%f" % normalized))
_ = socket.recv()
if axis == "x": # steer angle
fvalue = value / 32767.0
axis_states[axis] = fvalue
normalized = fvalue
socket.send_string(str("steer_%f" % normalized))
_ = socket.recv()
if mtype & 0x01: # buttons
if number in [0,19]: # X
if value == 1: # press down
socket.send_string(str("cruise_down"))
_ = socket.recv()
if number in [3,18]: # triangle
if value == 1: # press down
socket.send_string(str("cruise_up"))
_ = socket.recv()
if number in [1,6]: # square
if value == 1: # press down
socket.send_string(str("cruise_cancel"))
_ = socket.recv()
if number in [10,21]: # R3
if value == 1: # press down
socket.send_string(str("reverse_switch"))
_ = socket.recv()
if __name__ == '__main__':
from multiprocessing import Process
p = Process(target=wheel_poll_thread)
p.start()