Create message mocking tools (#31249)

* add mocking tools

* fix map renderer

* use for power draw

* fix those

* whitespace

* rename to services

* fix the rate

* remove
old-commit-hash: 086c509fde
chrysler-long2
Justin Newberry 1 year ago committed by GitHub
parent c999573836
commit 05204fbde3
  1. 51
      common/mock/__init__.py
  2. 20
      common/mock/generators.py
  3. 4
      selfdrive/controls/lib/tests/test_latcontrol.py
  4. 18
      selfdrive/navd/tests/test_map_renderer.py
  5. 6
      selfdrive/ui/tests/test_ui/run.py
  6. 16
      system/hardware/tici/tests/test_power_draw.py

@ -0,0 +1,51 @@
"""
Utilities for generating mock messages for testing.
example in common/tests/test_mock.py
"""
import functools
import threading
from typing import List, Union
from cereal.messaging import PubMaster
from cereal.services import SERVICE_LIST
from openpilot.common.mock.generators import generate_liveLocationKalman
from openpilot.common.realtime import Ratekeeper
MOCK_GENERATOR = {
"liveLocationKalman": generate_liveLocationKalman
}
def generate_messages_loop(services: List[str], done: threading.Event):
pm = PubMaster(services)
rk = Ratekeeper(100)
i = 0
while not done.is_set():
for s in services:
should_send = i % (100/SERVICE_LIST[s].frequency) == 0
if should_send:
message = MOCK_GENERATOR[s]()
pm.send(s, message)
i += 1
rk.keep_time()
def mock_messages(services: Union[List[str], str]):
if isinstance(services, str):
services = [services]
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
done = threading.Event()
t = threading.Thread(target=generate_messages_loop, args=(services, done))
t.start()
try:
return func(*args, **kwargs)
finally:
done.set()
t.join()
return wrapper
return decorator

@ -0,0 +1,20 @@
from cereal import messaging
LOCATION1 = (32.7174, -117.16277)
LOCATION2 = (32.7558, -117.2037)
LLK_DECIMATION = 10
RENDER_FRAMES = 15
DEFAULT_ITERATIONS = RENDER_FRAMES * LLK_DECIMATION
def generate_liveLocationKalman(location=LOCATION1):
msg = messaging.new_message('liveLocationKalman')
msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.positionECEF = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.velocityCalibrated = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.status = 'valid'
msg.liveLocationKalman.gpsOK = True
return msg

@ -12,7 +12,7 @@ from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
from openpilot.selfdrive.navd.tests.test_map_renderer import gen_llk
from openpilot.common.mock.generators import generate_liveLocationKalman
class TestLatControl(unittest.TestCase):
@ -32,7 +32,7 @@ class TestLatControl(unittest.TestCase):
params = log.LiveParametersData.new_message()
llk = gen_llk()
llk = generate_liveLocationKalman()
for _ in range(1000):
_, _, lac_log = controller.update(True, CS, VM, params, False, 1, llk)

@ -11,30 +11,16 @@ import cereal.messaging as messaging
from typing import Any
from cereal.visionipc import VisionIpcClient, VisionStreamType
from openpilot.common.mock.generators import LLK_DECIMATION, LOCATION1, LOCATION2, generate_liveLocationKalman
from openpilot.selfdrive.test.helpers import with_processes
LLK_DECIMATION = 10
CACHE_PATH = "/data/mbgl-cache-navd.db"
LOCATION1 = (32.7174, -117.16277)
LOCATION2 = (32.7558, -117.2037)
RENDER_FRAMES = 15
DEFAULT_ITERATIONS = RENDER_FRAMES * LLK_DECIMATION
LOCATION1_REPEATED = [LOCATION1] * DEFAULT_ITERATIONS
LOCATION2_REPEATED = [LOCATION2] * DEFAULT_ITERATIONS
def gen_llk(location=LOCATION1):
msg = messaging.new_message('liveLocationKalman')
msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.positionECEF = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.velocityCalibrated = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
msg.liveLocationKalman.status = 'valid'
msg.liveLocationKalman.gpsOK = True
return msg
class MapBoxInternetDisabledRequestHandler(http.server.BaseHTTPRequestHandler):
INTERNET_ACTIVE = True
@ -134,7 +120,7 @@ class TestMapRenderer(unittest.TestCase):
if starting_frame_id is None:
starting_frame_id = prev_frame_id
llk = gen_llk(location)
llk = generate_liveLocationKalman(location)
self.pm.send("liveLocationKalman", llk)
self.pm.wait_for_readers_to_update("liveLocationKalman", 10)
self.sm.update(1000 if frame_expected else 0)

@ -15,10 +15,10 @@ from cereal import messaging, car, log
from cereal.visionipc import VisionIpcServer, VisionStreamType
from cereal.messaging import SubMaster, PubMaster
from openpilot.common.mock import mock_messages
from openpilot.common.params import Params
from openpilot.common.realtime import DT_MDL
from openpilot.common.transformations.camera import tici_f_frame_size
from openpilot.selfdrive.navd.tests.test_map_renderer import gen_llk
from openpilot.selfdrive.test.helpers import with_processes
from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state
from openpilot.tools.webcam.camera import Camera
@ -94,12 +94,10 @@ def setup_onroad(click, pm: PubMaster):
pm.send(msg.which(), msg)
server.send(cam_meta.stream, IMG_BYTES, cs.frameId, cs.timestampSof, cs.timestampEof)
@mock_messages(['liveLocationKalman'])
def setup_onroad_map(click, pm: PubMaster):
setup_onroad(click, pm)
dat = gen_llk()
pm.send("liveLocationKalman", dat)
click(500, 500)
time.sleep(UI_DELAY) # give time for the map to render

@ -2,7 +2,6 @@
import pytest
import unittest
import time
import threading
import numpy as np
from dataclasses import dataclass
from tabulate import tabulate
@ -10,12 +9,12 @@ from typing import List
import cereal.messaging as messaging
from cereal.services import SERVICE_LIST
from openpilot.common.mock import mock_messages
from openpilot.selfdrive.car.car_helpers import write_car_param
from openpilot.system.hardware import HARDWARE
from openpilot.system.hardware.tici.power_monitor import get_power
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.selfdrive.manager.manager import manager_cleanup
from openpilot.selfdrive.navd.tests.test_map_renderer import gen_llk
SAMPLE_TIME = 8 # seconds to sample power
@ -37,14 +36,6 @@ PROCS = [
Proc('navmodeld', 0.05, msgs=['navModel']),
]
def send_llk_msg(done):
# Send liveLocationKalman at 20Hz
pm = messaging.PubMaster(['liveLocationKalman'])
while not done.is_set():
msg = gen_llk()
pm.send('liveLocationKalman', msg)
time.sleep(1/20.)
@pytest.mark.tici
class TestPowerDraw(unittest.TestCase):
@ -60,11 +51,9 @@ class TestPowerDraw(unittest.TestCase):
def tearDown(self):
manager_cleanup()
@mock_messages(['liveLocationKalman'])
def test_camera_procs(self):
baseline = get_power()
done = threading.Event()
thread = threading.Thread(target=send_llk_msg, args=(done,), daemon=True)
thread.start()
prev = baseline
used = {}
@ -82,7 +71,6 @@ class TestPowerDraw(unittest.TestCase):
for msg,sock in socks.items():
msg_counts[msg] = len(messaging.drain_sock_raw(sock))
done.set()
manager_cleanup()
tab = [['process', 'expected (W)', 'measured (W)', '# msgs expected', '# msgs received']]

Loading…
Cancel
Save