|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import os
 | 
					
						
							|  |  |  | import unittest
 | 
					
						
							|  |  |  | import requests
 | 
					
						
							|  |  |  | import threading
 | 
					
						
							|  |  |  | import http.server
 | 
					
						
							|  |  |  | import cereal.messaging as messaging
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from typing import Any
 | 
					
						
							|  |  |  | from cereal.visionipc import VisionIpcClient, VisionStreamType
 | 
					
						
							|  |  |  | from openpilot.selfdrive.manager.process_config import managed_processes
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | LLK_DECIMATION = 10
 | 
					
						
							|  |  |  | CACHE_PATH = "/data/mbgl-cache-navd.db"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | LOCATION1 = (32.7174, -117.16277)
 | 
					
						
							|  |  |  | LOCATION2 = (32.7558, -117.2037)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def gen_llk(location=LOCATION1):
 | 
					
						
							|  |  |  |   msg = messaging.new_message('liveLocationKalman')
 | 
					
						
							|  |  |  |   msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True}
 | 
					
						
							|  |  |  |   msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True}
 | 
					
						
							|  |  |  |   msg.liveLocationKalman.status = 'valid'
 | 
					
						
							|  |  |  |   return msg
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class MapBoxInternetDisabledRequestHandler(http.server.BaseHTTPRequestHandler):
 | 
					
						
							|  |  |  |   INTERNET_ACTIVE = True
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def setup(self):
 | 
					
						
							|  |  |  |     if self.INTERNET_ACTIVE:
 | 
					
						
							|  |  |  |       super().setup()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def handle(self):
 | 
					
						
							|  |  |  |     if self.INTERNET_ACTIVE:
 | 
					
						
							|  |  |  |       super().handle()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def finish(self):
 | 
					
						
							|  |  |  |     if self.INTERNET_ACTIVE:
 | 
					
						
							|  |  |  |       super().finish()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def do_GET(self):
 | 
					
						
							|  |  |  |     url = f'https://api.mapbox.com{self.path}'
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     headers = dict(self.headers)
 | 
					
						
							|  |  |  |     headers["Host"] = "api.mapbox.com"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     r = requests.get(url, headers=headers, timeout=5)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.send_response(r.status_code)
 | 
					
						
							|  |  |  |     self.end_headers()
 | 
					
						
							|  |  |  |     self.wfile.write(r.content)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def log_message(self, *args: Any) -> None:
 | 
					
						
							|  |  |  |     return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def log_error(self, *args: Any) -> None:
 | 
					
						
							|  |  |  |     return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class MapBoxInternetDisabledServer(threading.Thread):
 | 
					
						
							|  |  |  |   def run(self):
 | 
					
						
							|  |  |  |     self.server = http.server.HTTPServer(("127.0.0.1", 5000), MapBoxInternetDisabledRequestHandler)
 | 
					
						
							|  |  |  |     self.server.serve_forever()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def stop(self):
 | 
					
						
							|  |  |  |     self.server.shutdown()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def disable_internet(self):
 | 
					
						
							|  |  |  |     MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def enable_internet(self):
 | 
					
						
							|  |  |  |     MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = True
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestMapRenderer(unittest.TestCase):
 | 
					
						
							|  |  |  |   server = MapBoxInternetDisabledServer()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @classmethod
 | 
					
						
							|  |  |  |   def setUpClass(cls):
 | 
					
						
							|  |  |  |     assert "MAPBOX_TOKEN" in os.environ
 | 
					
						
							|  |  |  |     cls.original_token = os.environ["MAPBOX_TOKEN"]
 | 
					
						
							|  |  |  |     cls.server.start()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   @classmethod
 | 
					
						
							|  |  |  |   def tearDownClass(cls) -> None:
 | 
					
						
							|  |  |  |     cls.server.stop()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def setUp(self):
 | 
					
						
							|  |  |  |     self.server.enable_internet()
 | 
					
						
							|  |  |  |     os.environ['MAPS_HOST'] = 'http://localhost:5000'
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.sm = messaging.SubMaster(['mapRenderState'])
 | 
					
						
							|  |  |  |     self.pm = messaging.PubMaster(['liveLocationKalman'])
 | 
					
						
							|  |  |  |     self.vipc = VisionIpcClient("navd", VisionStreamType.VISION_STREAM_MAP, True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if os.path.exists(CACHE_PATH):
 | 
					
						
							|  |  |  |       os.remove(CACHE_PATH)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def tearDown(self):
 | 
					
						
							|  |  |  |     managed_processes['mapsd'].stop()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def _setup_test(self):
 | 
					
						
							|  |  |  |     # start + sync up
 | 
					
						
							|  |  |  |     managed_processes['mapsd'].start()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert self.pm.wait_for_readers_to_update("liveLocationKalman", 10)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     assert VisionIpcClient.available_streams("navd", False) == {VisionStreamType.VISION_STREAM_MAP, }
 | 
					
						
							|  |  |  |     assert self.vipc.connect(False)
 | 
					
						
							|  |  |  |     self.vipc.recv()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def _run_test(self, expect_valid, location=LOCATION1):
 | 
					
						
							|  |  |  |     starting_frame_id = None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.location = location
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # run test
 | 
					
						
							|  |  |  |     prev_frame_id = -1
 | 
					
						
							|  |  |  |     for i in range(30*LLK_DECIMATION):
 | 
					
						
							|  |  |  |       frame_expected = (i+1) % LLK_DECIMATION == 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if self.sm.logMonoTime['mapRenderState'] == 0:
 | 
					
						
							|  |  |  |         prev_valid = False
 | 
					
						
							|  |  |  |         prev_frame_id = -1
 | 
					
						
							|  |  |  |       else:
 | 
					
						
							|  |  |  |         prev_valid = self.sm.valid['mapRenderState']
 | 
					
						
							|  |  |  |         prev_frame_id = self.sm['mapRenderState'].frameId
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if starting_frame_id is None:
 | 
					
						
							|  |  |  |         starting_frame_id = prev_frame_id
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       llk = gen_llk(self.location)
 | 
					
						
							|  |  |  |       self.pm.send("liveLocationKalman", llk)
 | 
					
						
							|  |  |  |       self.pm.wait_for_readers_to_update("liveLocationKalman", 10)
 | 
					
						
							|  |  |  |       self.sm.update(1000 if frame_expected else 0)
 | 
					
						
							|  |  |  |       assert self.sm.updated['mapRenderState'] == frame_expected, "renderer running at wrong frequency"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if not frame_expected:
 | 
					
						
							|  |  |  |         continue
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       frames_since_test_start = self.sm['mapRenderState'].frameId - starting_frame_id
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # give a few frames to switch from valid to invalid, or vice versa
 | 
					
						
							|  |  |  |       invalid_and_not_previously_valid = (expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid)
 | 
					
						
							|  |  |  |       valid_and_not_previously_invalid = (not expect_valid and self.sm.valid['mapRenderState'] and prev_valid)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if (invalid_and_not_previously_valid or valid_and_not_previously_invalid) and frames_since_test_start < 5:
 | 
					
						
							|  |  |  |         continue
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # check output
 | 
					
						
							|  |  |  |       assert self.sm.valid['mapRenderState'] == expect_valid
 | 
					
						
							|  |  |  |       assert self.sm['mapRenderState'].frameId == (prev_frame_id + 1)
 | 
					
						
							|  |  |  |       assert self.sm['mapRenderState'].locationMonoTime == llk.logMonoTime
 | 
					
						
							|  |  |  |       if not expect_valid:
 | 
					
						
							|  |  |  |         assert self.sm['mapRenderState'].renderTime == 0.
 | 
					
						
							|  |  |  |       else:
 | 
					
						
							|  |  |  |         assert 0. < self.sm['mapRenderState'].renderTime < 0.1
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # check vision ipc output
 | 
					
						
							|  |  |  |       assert self.vipc.recv() is not None
 | 
					
						
							|  |  |  |       assert self.vipc.valid == expect_valid
 | 
					
						
							|  |  |  |       assert self.vipc.timestamp_sof == llk.logMonoTime
 | 
					
						
							|  |  |  |       assert self.vipc.frame_id == self.sm['mapRenderState'].frameId
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_with_internet(self):
 | 
					
						
							|  |  |  |     self._setup_test()
 | 
					
						
							|  |  |  |     self._run_test(True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_with_no_internet(self):
 | 
					
						
							|  |  |  |     self.server.disable_internet()
 | 
					
						
							|  |  |  |     self._setup_test()
 | 
					
						
							|  |  |  |     self._run_test(False)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def test_recover_from_no_internet(self):
 | 
					
						
							|  |  |  |     self._setup_test()
 | 
					
						
							|  |  |  |     self._run_test(True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.server.disable_internet()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # change locations to force mapsd to refetch
 | 
					
						
							|  |  |  |     self._run_test(False, LOCATION2)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.server.enable_internet()
 | 
					
						
							|  |  |  |     self._run_test(True, LOCATION2)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.location = LOCATION1
 | 
					
						
							|  |  |  |     self._run_test(True, LOCATION2)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   unittest.main()
 |