|
|
|
@ -1,28 +1,95 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import os |
|
|
|
|
import unittest |
|
|
|
|
|
|
|
|
|
import requests |
|
|
|
|
import threading |
|
|
|
|
import http.server |
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
|
|
|
|
|
from typing import Any |
|
|
|
|
from cereal.visionipc import VisionIpcClient, VisionStreamType |
|
|
|
|
from selfdrive.manager.process_config import managed_processes |
|
|
|
|
|
|
|
|
|
LLK_DECIMATION = 10 |
|
|
|
|
CACHE_PATH = "/data/mbgl-cache-navd.db" |
|
|
|
|
|
|
|
|
|
def gen_llk(): |
|
|
|
|
LOCATION1 = (32.7174, -117.16277) |
|
|
|
|
LOCATION2 = (32.7558, -117.2037) |
|
|
|
|
|
|
|
|
|
def gen_llk(location=LOCATION1): |
|
|
|
|
msg = messaging.new_message('liveLocationKalman') |
|
|
|
|
msg.liveLocationKalman.positionGeodetic = {'value': [32.7174, -117.16277, 0], 'std': [0., 0., 0.], 'valid': True} |
|
|
|
|
msg.liveLocationKalman.positionGeodetic = {'value': [*location, 0], 'std': [0., 0., 0.], 'valid': True} |
|
|
|
|
msg.liveLocationKalman.calibratedOrientationNED = {'value': [0., 0., 0.], 'std': [0., 0., 0.], 'valid': True} |
|
|
|
|
msg.liveLocationKalman.status = 'valid' |
|
|
|
|
return msg |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MapBoxInternetDisabledRequestHandler(http.server.BaseHTTPRequestHandler): |
|
|
|
|
INTERNET_ACTIVE = True |
|
|
|
|
|
|
|
|
|
def setup(self): |
|
|
|
|
if self.INTERNET_ACTIVE: |
|
|
|
|
super().setup() |
|
|
|
|
|
|
|
|
|
def handle(self): |
|
|
|
|
if self.INTERNET_ACTIVE: |
|
|
|
|
super().handle() |
|
|
|
|
|
|
|
|
|
def finish(self): |
|
|
|
|
if self.INTERNET_ACTIVE: |
|
|
|
|
super().finish() |
|
|
|
|
|
|
|
|
|
def do_GET(self): |
|
|
|
|
url = f'https://api.mapbox.com{self.path}' |
|
|
|
|
|
|
|
|
|
headers = dict(self.headers) |
|
|
|
|
headers["Host"] = "api.mapbox.com" |
|
|
|
|
|
|
|
|
|
r = requests.get(url, headers=headers, timeout=5) |
|
|
|
|
|
|
|
|
|
self.send_response(r.status_code) |
|
|
|
|
self.end_headers() |
|
|
|
|
self.wfile.write(r.content) |
|
|
|
|
|
|
|
|
|
def log_message(self, *args: Any) -> None: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def log_error(self, *args: Any) -> None: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MapBoxInternetDisabledServer(threading.Thread): |
|
|
|
|
def run(self): |
|
|
|
|
self.server = http.server.HTTPServer(("127.0.0.1", 5000), MapBoxInternetDisabledRequestHandler) |
|
|
|
|
self.server.serve_forever() |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self.server.shutdown() |
|
|
|
|
|
|
|
|
|
def disable_internet(self): |
|
|
|
|
MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = False |
|
|
|
|
|
|
|
|
|
def enable_internet(self): |
|
|
|
|
MapBoxInternetDisabledRequestHandler.INTERNET_ACTIVE = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestMapRenderer(unittest.TestCase): |
|
|
|
|
server = MapBoxInternetDisabledServer() |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def setUpClass(cls): |
|
|
|
|
assert "MAPBOX_TOKEN" in os.environ |
|
|
|
|
cls.original_token = os.environ["MAPBOX_TOKEN"] |
|
|
|
|
cls.server.start() |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def tearDownClass(cls) -> None: |
|
|
|
|
cls.server.stop() |
|
|
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
|
self.server.enable_internet() |
|
|
|
|
os.environ['MAPS_HOST'] = 'http://localhost:5000' |
|
|
|
|
|
|
|
|
|
self.sm = messaging.SubMaster(['mapRenderState']) |
|
|
|
|
self.pm = messaging.PubMaster(['liveLocationKalman']) |
|
|
|
|
self.vipc = VisionIpcClient("navd", VisionStreamType.VISION_STREAM_MAP, True) |
|
|
|
@ -33,15 +100,22 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
def tearDown(self): |
|
|
|
|
managed_processes['mapsd'].stop() |
|
|
|
|
|
|
|
|
|
def _run_test(self, expect_valid): |
|
|
|
|
def _setup_test(self): |
|
|
|
|
# start + sync up |
|
|
|
|
managed_processes['mapsd'].start() |
|
|
|
|
|
|
|
|
|
assert self.pm.wait_for_readers_to_update("liveLocationKalman", 10) |
|
|
|
|
|
|
|
|
|
assert VisionIpcClient.available_streams("navd", False) == {VisionStreamType.VISION_STREAM_MAP, } |
|
|
|
|
assert self.vipc.connect(False) |
|
|
|
|
self.vipc.recv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_test(self, expect_valid, location=LOCATION1): |
|
|
|
|
starting_frame_id = None |
|
|
|
|
|
|
|
|
|
self.location = location |
|
|
|
|
|
|
|
|
|
# run test |
|
|
|
|
prev_frame_id = -1 |
|
|
|
|
for i in range(30*LLK_DECIMATION): |
|
|
|
@ -51,21 +125,28 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
prev_valid = False |
|
|
|
|
prev_frame_id = -1 |
|
|
|
|
else: |
|
|
|
|
prev_frame_id = self.sm['mapRenderState'].frameId |
|
|
|
|
prev_valid = self.sm.valid['mapRenderState'] |
|
|
|
|
prev_frame_id = self.sm['mapRenderState'].frameId |
|
|
|
|
|
|
|
|
|
if starting_frame_id is None: |
|
|
|
|
starting_frame_id = prev_frame_id |
|
|
|
|
|
|
|
|
|
llk = gen_llk() |
|
|
|
|
llk = gen_llk(self.location) |
|
|
|
|
self.pm.send("liveLocationKalman", llk) |
|
|
|
|
self.pm.wait_for_readers_to_update("liveLocationKalman", 10) |
|
|
|
|
self.sm.update(1000 if frame_expected else 0) |
|
|
|
|
assert self.sm.updated['mapRenderState'] == frame_expected, "renderer running at wrong frequency" |
|
|
|
|
|
|
|
|
|
if not frame_expected: |
|
|
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
# give a few frames to go valid |
|
|
|
|
if expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid and self.sm['mapRenderState'].frameId < 5: |
|
|
|
|
frames_since_test_start = self.sm['mapRenderState'].frameId - starting_frame_id |
|
|
|
|
|
|
|
|
|
# give a few frames to switch from valid to invalid, or vice versa |
|
|
|
|
invalid_and_not_previously_valid = (expect_valid and not self.sm.valid['mapRenderState'] and not prev_valid) |
|
|
|
|
valid_and_not_previously_invalid = (not expect_valid and self.sm.valid['mapRenderState'] and prev_valid) |
|
|
|
|
|
|
|
|
|
if (invalid_and_not_previously_valid or valid_and_not_previously_invalid) and frames_since_test_start < 5: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
# check output |
|
|
|
@ -84,15 +165,28 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
assert self.vipc.frame_id == self.sm['mapRenderState'].frameId |
|
|
|
|
|
|
|
|
|
def test_with_internet(self): |
|
|
|
|
self._setup_test() |
|
|
|
|
self._run_test(True) |
|
|
|
|
|
|
|
|
|
def test_with_no_internet(self): |
|
|
|
|
token = os.environ['MAPBOX_TOKEN'] |
|
|
|
|
try: |
|
|
|
|
os.environ['MAPBOX_TOKEN'] = 'invalid_token' |
|
|
|
|
self._run_test(False) |
|
|
|
|
finally: |
|
|
|
|
os.environ['MAPBOX_TOKEN'] = token |
|
|
|
|
self.server.disable_internet() |
|
|
|
|
self._setup_test() |
|
|
|
|
self._run_test(False) |
|
|
|
|
|
|
|
|
|
def test_recover_from_no_internet(self): |
|
|
|
|
self._setup_test() |
|
|
|
|
self._run_test(True) |
|
|
|
|
|
|
|
|
|
self.server.disable_internet() |
|
|
|
|
|
|
|
|
|
# change locations to force mapsd to refetch |
|
|
|
|
self._run_test(False, LOCATION2) |
|
|
|
|
|
|
|
|
|
self.server.enable_internet() |
|
|
|
|
self._run_test(True, LOCATION2) |
|
|
|
|
|
|
|
|
|
self.location = LOCATION1 |
|
|
|
|
self._run_test(True, LOCATION2) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
unittest.main() |
|
|
|
|