diff --git a/selfdrive/navd/tests/test_map_renderer.py b/selfdrive/navd/tests/test_map_renderer.py index 934377fed4..64e80f93d3 100755 --- a/selfdrive/navd/tests/test_map_renderer.py +++ b/selfdrive/navd/tests/test_map_renderer.py @@ -1,28 +1,95 @@ #!/usr/bin/env python3 import os import unittest - +import requests +import threading +import http.server import cereal.messaging as messaging + +from typing import Any from cereal.visionipc import VisionIpcClient, VisionStreamType from selfdrive.manager.process_config import managed_processes LLK_DECIMATION = 10 CACHE_PATH = "/data/mbgl-cache-navd.db" -def gen_llk(): +LOCATION1 = (32.7174, -117.16277) +LOCATION2 = (32.7558, -117.2037) + +def gen_llk(location=LOCATION1): msg = messaging.new_message('liveLocationKalman') - msg.liveLocationKalman.positionGeodetic = {'value': [32.7174, -117.16277, 0], 'std': [0., 0., 0.], 'valid': True} + msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True} msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True} msg.liveLocationKalman.status = 'valid' return msg +class MapBoxInternetDisabledRequestHandler(http.server.BaseHTTPRequestHandler): + INTERNET_ACTIVE = True + + def setup(self): + if self.INTERNET_ACTIVE: + super().setup() + + def handle(self): + if self.INTERNET_ACTIVE: + super().handle() + + def finish(self): + if self.INTERNET_ACTIVE: + super().finish() + + def do_GET(self): + url = f'https://api.mapbox.com{self.path}' + + headers = dict(self.headers) + headers["Host"] = "api.mapbox.com" + + r = requests.get(url, headers=headers, timeout=5) + + self.send_response(r.status_code) + self.end_headers() + self.wfile.write(r.content) + + def log_message(self, *args: Any) -> None: + return + + def log_error(self, *args: Any) -> None: + return + + +class MapBoxInternetDisabledServer(threading.Thread): + def run(self): + self.server = http.server.HTTPServer(("127.0.0.1", 5000), MapBoxInternetDisabledRequestHandler) + self.server.serve_forever() + + def stop(self): + self.server.shutdown() + + def disable_internet(self): + MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = False + + def enable_internet(self): + MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = True + + class TestMapRenderer(unittest.TestCase): + server = MapBoxInternetDisabledServer() + @classmethod def setUpClass(cls): assert "MAPBOX_TOKEN" in os.environ + cls.original_token = os.environ["MAPBOX_TOKEN"] + cls.server.start() + + @classmethod + def tearDownClass(cls) -> None: + cls.server.stop() def setUp(self): + self.server.enable_internet() + os.environ['MAPS_HOST'] = 'http://localhost:5000' + self.sm = messaging.SubMaster(['mapRenderState']) self.pm = messaging.PubMaster(['liveLocationKalman']) self.vipc = VisionIpcClient("navd", VisionStreamType.VISION_STREAM_MAP, True) @@ -33,15 +100,22 @@ class TestMapRenderer(unittest.TestCase): def tearDown(self): managed_processes['mapsd'].stop() - def _run_test(self, expect_valid): + def _setup_test(self): # start + sync up managed_processes['mapsd'].start() + assert self.pm.wait_for_readers_to_update("liveLocationKalman", 10) assert VisionIpcClient.available_streams("navd", False) == {VisionStreamType.VISION_STREAM_MAP, } assert self.vipc.connect(False) self.vipc.recv() + + def _run_test(self, expect_valid, location=LOCATION1): + starting_frame_id = None + + self.location = location + # run test prev_frame_id = -1 for i in range(30*LLK_DECIMATION): @@ -51,21 +125,28 @@ class TestMapRenderer(unittest.TestCase): prev_valid = False prev_frame_id = -1 else: - prev_frame_id = self.sm['mapRenderState'].frameId prev_valid = self.sm.valid['mapRenderState'] + prev_frame_id = self.sm['mapRenderState'].frameId + + if starting_frame_id is None: + starting_frame_id = prev_frame_id - llk = gen_llk() + llk = gen_llk(self.location) self.pm.send("liveLocationKalman", llk) self.pm.wait_for_readers_to_update("liveLocationKalman", 10) self.sm.update(1000 if frame_expected else 0) assert self.sm.updated['mapRenderState'] == frame_expected, "renderer running at wrong frequency" if not frame_expected: - continue - # give a few frames to go valid - if expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid and self.sm['mapRenderState'].frameId < 5: + frames_since_test_start = self.sm['mapRenderState'].frameId - starting_frame_id + + # give a few frames to switch from valid to invalid, or vice versa + invalid_and_not_previously_valid = (expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid) + valid_and_not_previously_invalid = (not expect_valid and self.sm.valid['mapRenderState'] and prev_valid) + + if (invalid_and_not_previously_valid or valid_and_not_previously_invalid) and frames_since_test_start < 5: continue # check output @@ -84,15 +165,28 @@ class TestMapRenderer(unittest.TestCase): assert self.vipc.frame_id == self.sm['mapRenderState'].frameId def test_with_internet(self): + self._setup_test() self._run_test(True) def test_with_no_internet(self): - token = os.environ['MAPBOX_TOKEN'] - try: - os.environ['MAPBOX_TOKEN'] = 'invalid_token' - self._run_test(False) - finally: - os.environ['MAPBOX_TOKEN'] = token + self.server.disable_internet() + self._setup_test() + self._run_test(False) + + def test_recover_from_no_internet(self): + self._setup_test() + self._run_test(True) + + self.server.disable_internet() + + # change locations to force mapsd to refetch + self._run_test(False, LOCATION2) + + self.server.enable_internet() + self._run_test(True, LOCATION2) + + self.location = LOCATION1 + self._run_test(True, LOCATION2) if __name__ == "__main__": unittest.main()