adding --local-partial-types mypy opt (#30491)

old-commit-hash: 5b2bcf6bf2
chrysler-long2
Logan Lasiter 1 year ago committed by GitHub
parent 54346eccee
commit 4f1ed42285
  1. 2
      .pre-commit-config.yaml
  2. 2
      scripts/code_stats.py
  3. 7
      tools/bodyteleop/web.py
  4. 20
      tools/sim/lib/manual_ctrl.py
  5. 4
      tools/sim/tests/test_carla_bridge.py

@ -38,7 +38,7 @@ repos:
entry: mypy entry: mypy
language: system language: system
types: [python] types: [python]
args: ['--explicit-package-bases'] args: ['--explicit-package-bases', '--local-partial-types']
exclude: '^(third_party/)|(cereal/)|(opendbc/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)|(xx/)' exclude: '^(third_party/)|(cereal/)|(opendbc/)|(panda/)|(rednose/)|(rednose_repo/)|(tinygrad/)|(tinygrad_repo/)|(xx/)'
- repo: https://github.com/astral-sh/ruff-pre-commit - repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.5 rev: v0.1.5

@ -13,7 +13,7 @@ for d in ["cereal", "common", "scripts", "selfdrive", "tools"]:
if f.endswith(".py"): if f.endswith(".py"):
pyf.append(os.path.join(root, f)) pyf.append(os.path.join(root, f))
imps = set() imps: set[str] = set()
class Analyzer(ast.NodeVisitor): class Analyzer(ast.NodeVisitor):
def visit_Import(self, node): def visit_Import(self, node):

@ -18,11 +18,14 @@ import cereal.messaging as messaging
from openpilot.common.basedir import BASEDIR from openpilot.common.basedir import BASEDIR
from openpilot.tools.bodyteleop.bodyav import BodyMic, WebClientSpeaker, force_codec, play_sound, MediaBlackhole, EncodedBodyVideo from openpilot.tools.bodyteleop.bodyav import BodyMic, WebClientSpeaker, force_codec, play_sound, MediaBlackhole, EncodedBodyVideo
from typing import Optional
logger = logging.getLogger("pc") logger = logging.getLogger("pc")
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
pcs = set() pcs: set[RTCPeerConnection] = set()
pm, sm = None, None pm: Optional[messaging.PubMaster] = None
sm: Optional[messaging.SubMaster] = None
TELEOPDIR = f"{BASEDIR}/tools/bodyteleop" TELEOPDIR = f"{BASEDIR}/tools/bodyteleop"

@ -4,7 +4,7 @@ import array
import os import os
import struct import struct
from fcntl import ioctl from fcntl import ioctl
from typing import NoReturn from typing import NoReturn, Dict, List
# Iterate over the joystick devices. # Iterate over the joystick devices.
print('Available devices:') print('Available devices:')
@ -13,8 +13,8 @@ for fn in os.listdir('/dev/input'):
print(f' /dev/input/{fn}') print(f' /dev/input/{fn}')
# We'll store the states here. # We'll store the states here.
axis_states = {} axis_states: Dict[str, float] = {}
button_states = {} button_states: Dict[str, float] = {}
# These constants were borrowed from linux/input.h # These constants were borrowed from linux/input.h
axis_names = { axis_names = {
@ -88,8 +88,8 @@ button_names = {
0x2c3 : 'dpad_down', 0x2c3 : 'dpad_down',
} }
axis_map = [] axis_name_list: List[str] = []
button_map = [] button_name_list: List[str] = []
def wheel_poll_thread(q: 'Queue[str]') -> NoReturn: def wheel_poll_thread(q: 'Queue[str]') -> NoReturn:
# Open the joystick device. # Open the joystick device.
@ -119,7 +119,7 @@ def wheel_poll_thread(q: 'Queue[str]') -> NoReturn:
for _axis in buf[:num_axes]: for _axis in buf[:num_axes]:
axis_name = axis_names.get(_axis, f'unknown(0x{_axis:02x})') axis_name = axis_names.get(_axis, f'unknown(0x{_axis:02x})')
axis_map.append(axis_name) axis_name_list.append(axis_name)
axis_states[axis_name] = 0.0 axis_states[axis_name] = 0.0
# Get the button map. # Get the button map.
@ -128,11 +128,11 @@ def wheel_poll_thread(q: 'Queue[str]') -> NoReturn:
for btn in buf[:num_buttons]: for btn in buf[:num_buttons]:
btn_name = button_names.get(btn, f'unknown(0x{btn:03x})') btn_name = button_names.get(btn, f'unknown(0x{btn:03x})')
button_map.append(btn_name) button_name_list.append(btn_name)
button_states[btn_name] = 0 button_states[btn_name] = 0
print('%d axes found: %s' % (num_axes, ', '.join(axis_map))) print('%d axes found: %s' % (num_axes, ', '.join(axis_name_list)))
print('%d buttons found: %s' % (num_buttons, ', '.join(button_map))) print('%d buttons found: %s' % (num_buttons, ', '.join(button_name_list)))
# Enable FF # Enable FF
import evdev import evdev
@ -147,7 +147,7 @@ def wheel_poll_thread(q: 'Queue[str]') -> NoReturn:
value, mtype, number = struct.unpack('4xhBB', evbuf) value, mtype, number = struct.unpack('4xhBB', evbuf)
# print(mtype, number, value) # print(mtype, number, value)
if mtype & 0x02: # wheel & paddles if mtype & 0x02: # wheel & paddles
axis = axis_map[number] axis = axis_name_list[number]
if axis == "z": # gas if axis == "z": # gas
fvalue = value / 32767.0 fvalue = value / 32767.0

@ -2,18 +2,20 @@
import subprocess import subprocess
import time import time
import unittest import unittest
from subprocess import Popen
from openpilot.selfdrive.manager.helpers import unblock_stdout from openpilot.selfdrive.manager.helpers import unblock_stdout
from openpilot.tools.sim.run_bridge import parse_args from openpilot.tools.sim.run_bridge import parse_args
from openpilot.tools.sim.bridge.carla.carla_bridge import CarlaBridge from openpilot.tools.sim.bridge.carla.carla_bridge import CarlaBridge
from openpilot.tools.sim.tests.test_sim_bridge import SIM_DIR, TestSimBridgeBase from openpilot.tools.sim.tests.test_sim_bridge import SIM_DIR, TestSimBridgeBase
from typing import Optional
class TestCarlaBridge(TestSimBridgeBase): class TestCarlaBridge(TestSimBridgeBase):
""" """
Tests need Carla simulator to run Tests need Carla simulator to run
""" """
carla_process = None carla_process: Optional[Popen] = None
def setUp(self): def setUp(self):
super().setUp() super().setUp()

Loading…
Cancel
Save