@ -2,10 +2,13 @@ import sys
import termios
import time
from multiprocessing import Queue
from termios import ( BRKINT , CS8 , CSIZE , ECHO , ICANON , ICRNL , IEXTEN , INPCK ,
ISTRIP , IXON , PARENB , VMIN , VTIME )
from typing import NoReturn
from openpilot . tools . sim . bridge . common import QueueMessage , control_cmd_gen
# Indexes for termios list.
IFLAG = 0
OFLAG = 1
@ -52,35 +55,35 @@ def getch() -> str:
def print_keyboard_help ( ) :
print ( f " Keyboard Commands: \n { KEYBOARD_HELP } " )
def keyboard_poll_thread ( q : ' Queue[str ] ' ) :
def keyboard_poll_thread ( q : ' Queue[QueueMessage ] ' ) :
print_keyboard_help ( )
while True :
c = getch ( )
if c == ' 1 ' :
q . put ( " cruise_up " )
q . put ( control_cmd_gen ( " cruise_up " ) )
elif c == ' 2 ' :
q . put ( " cruise_down " )
q . put ( control_cmd_gen ( " cruise_down " ) )
elif c == ' 3 ' :
q . put ( " cruise_cancel " )
q . put ( control_cmd_gen ( " cruise_cancel " ) )
elif c == ' w ' :
q . put ( " throttle_ %f " % 1.0 )
q . put ( control_cmd_gen ( f " throttle_ { 1.0 } " ) )
elif c == ' a ' :
q . put ( " steer_ %f " % - 0.15 )
q . put ( control_cmd_gen ( f " steer_ { - 0.15 } " ) )
elif c == ' s ' :
q . put ( " brake_ %f " % 1.0 )
q . put ( control_cmd_gen ( f " brake_ { 1.0 } " ) )
elif c == ' d ' :
q . put ( " steer_ %f " % 0.15 )
q . put ( control_cmd_gen ( f " steer_ { 0.15 } " ) )
elif c == ' z ' :
q . put ( " blinker_left " )
q . put ( control_cmd_gen ( " blinker_left " ) )
elif c == ' x ' :
q . put ( " blinker_right " )
q . put ( control_cmd_gen ( " blinker_right " ) )
elif c == ' i ' :
q . put ( " ignition " )
q . put ( control_cmd_gen ( " ignition " ) )
elif c == ' r ' :
q . put ( " reset " )
q . put ( control_cmd_gen ( " reset " ) )
elif c == ' q ' :
q . put ( " quit " )
q . put ( control_cmd_gen ( " quit " ) )
break
else :
print_keyboard_help ( )
@ -92,7 +95,7 @@ def test(q: 'Queue[str]') -> NoReturn:
if __name__ == ' __main__ ' :
from multiprocessing import Process , Queue
q : Queue [ str ] = Queue ( )
q : ' Queue[QueueMessage] ' = Queue ( )
p = Process ( target = test , args = ( q , ) )
p . daemon = True
p . start ( )