#!/usr/bin/env python3 import time import numpy as np import os import pytest import unittest import requests import threading import http.server import cereal.messaging as messaging from typing import Any from cereal.visionipc import VisionIpcClient, VisionStreamType from openpilot.selfdrive.test.helpers import with_processes LLK_DECIMATION = 10 CACHE_PATH = "/data/mbgl-cache-navd.db" LOCATION1 = (32.7174, -117.16277) LOCATION2 = (32.7558, -117.2037) RENDER_FRAMES = 15 DEFAULT_ITERATIONS = RENDER_FRAMES * LLK_DECIMATION LOCATION1_REPEATED = [LOCATION1] * DEFAULT_ITERATIONS LOCATION2_REPEATED = [LOCATION2] * DEFAULT_ITERATIONS def gen_llk(location=LOCATION1): msg = messaging.new_message('liveLocationKalman') msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True} msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True} msg.liveLocationKalman.status = 'valid' return msg class MapBoxInternetDisabledRequestHandler(http.server.BaseHTTPRequestHandler): INTERNET_ACTIVE = True def do_GET(self): if not self.INTERNET_ACTIVE: self.send_response(500) self.end_headers() return url = f'https://api.mapbox.com{self.path}' headers = dict(self.headers) headers["Host"] = "api.mapbox.com" r = requests.get(url, headers=headers, timeout=5) self.send_response(r.status_code) self.end_headers() self.wfile.write(r.content) def log_message(self, *args: Any) -> None: return def log_error(self, *args: Any) -> None: return class MapBoxInternetDisabledServer(threading.Thread): def run(self): self.server = http.server.HTTPServer(("127.0.0.1", 0), MapBoxInternetDisabledRequestHandler) self.port = self.server.server_port self.server.serve_forever() def stop(self): self.server.shutdown() def disable_internet(self): MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = False def enable_internet(self): MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = True class TestMapRenderer(unittest.TestCase): server: MapBoxInternetDisabledServer @classmethod def setUpClass(cls): assert "MAPBOX_TOKEN" in os.environ cls.original_token = os.environ["MAPBOX_TOKEN"] cls.server = MapBoxInternetDisabledServer() cls.server.start() time.sleep(0.5) # wait for server to startup @classmethod def tearDownClass(cls) -> None: cls.server.stop() def setUp(self): self.server.enable_internet() os.environ['MAPS_HOST'] = f'http://localhost:{self.server.port}' self.sm = messaging.SubMaster(['mapRenderState']) self.pm = messaging.PubMaster(['liveLocationKalman']) self.vipc = VisionIpcClient("navd", VisionStreamType.VISION_STREAM_MAP, True) if os.path.exists(CACHE_PATH): os.remove(CACHE_PATH) def _setup_test(self): assert self.pm.wait_for_readers_to_update("liveLocationKalman", 10) time.sleep(0.5) assert VisionIpcClient.available_streams("navd", False) == {VisionStreamType.VISION_STREAM_MAP, } assert self.vipc.connect(False) self.vipc.recv() def _run_test(self, expect_valid, locations=LOCATION1_REPEATED): starting_frame_id = None render_times = [] # run test prev_frame_id = -1 for i, location in enumerate(locations): frame_expected = (i+1) % LLK_DECIMATION == 0 if self.sm.logMonoTime['mapRenderState'] == 0: prev_valid = False prev_frame_id = -1 else: prev_valid = self.sm.valid['mapRenderState'] prev_frame_id = self.sm['mapRenderState'].frameId if starting_frame_id is None: starting_frame_id = prev_frame_id llk = gen_llk(location) self.pm.send("liveLocationKalman", llk) self.pm.wait_for_readers_to_update("liveLocationKalman", 10) self.sm.update(1000 if frame_expected else 0) assert self.sm.updated['mapRenderState'] == frame_expected, "renderer running at wrong frequency" if not frame_expected: continue frames_since_test_start = self.sm['mapRenderState'].frameId - starting_frame_id # give a few frames to switch from valid to invalid, or vice versa invalid_and_not_previously_valid = (expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid) valid_and_not_previously_invalid = (not expect_valid and self.sm.valid['mapRenderState'] and prev_valid) if (invalid_and_not_previously_valid or valid_and_not_previously_invalid) and frames_since_test_start < 5: continue # check output assert self.sm.valid['mapRenderState'] == expect_valid assert self.sm['mapRenderState'].frameId == (prev_frame_id + 1) assert self.sm['mapRenderState'].locationMonoTime == llk.logMonoTime if not expect_valid: assert self.sm['mapRenderState'].renderTime == 0. else: assert 0. < self.sm['mapRenderState'].renderTime < 0.1 render_times.append(self.sm['mapRenderState'].renderTime) # check vision ipc output assert self.vipc.recv() is not None assert self.vipc.valid == expect_valid assert self.vipc.timestamp_sof == llk.logMonoTime assert self.vipc.frame_id == self.sm['mapRenderState'].frameId assert frames_since_test_start >= RENDER_FRAMES return render_times @with_processes(["mapsd"]) def test_with_internet(self): self._setup_test() self._run_test(True) @with_processes(["mapsd"]) def test_with_no_internet(self): self.server.disable_internet() self._setup_test() self._run_test(False) @with_processes(["mapsd"]) @pytest.mark.skip(reason="slow, flaky, and unlikely to break") def test_recover_from_no_internet(self): self._setup_test() self._run_test(True) self.server.disable_internet() # change locations to force mapsd to refetch self._run_test(False, LOCATION2_REPEATED) self.server.enable_internet() self._run_test(True, LOCATION2_REPEATED) @with_processes(["mapsd"]) @pytest.mark.tici def test_render_time_distribution(self): self._setup_test() # from location1 -> location2 and back locations = np.array([*np.linspace(LOCATION1, LOCATION2, 2000), *np.linspace(LOCATION2, LOCATION1, 2000)]).tolist() render_times = self._run_test(True, locations) _min = np.min(render_times) _max = np.max(render_times) _mean = np.mean(render_times) _median = np.median(render_times) _stddev = np.std(render_times) print(f"Stats: min: {_min}, max: {_max}, mean: {_mean}, median: {_median}, stddev: {_stddev}, count: {len(render_times)}") def assert_stat(stat, nominal, tol=0.3): tol = (nominal / (1+tol)), (nominal * (1+tol)) self.assertTrue(tol[0] < stat < tol[1], f"{stat} not in tolerance {tol}") assert_stat(_mean, 0.030) assert_stat(_median, 0.027) assert_stat(_stddev, 0.0078) self.assertLess(_max, 0.065) self.assertGreater(_min, 0.015) if __name__ == "__main__": unittest.main()