|
|
|
@ -3,10 +3,11 @@ import os |
|
|
|
|
import unittest |
|
|
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
from cereal.visionipc import VisionIpcClient, VisionStreamType |
|
|
|
|
from selfdrive.manager.process_config import managed_processes |
|
|
|
|
|
|
|
|
|
LLK_DECIMATION = 10 |
|
|
|
|
|
|
|
|
|
CACHE_PATH = "/data/mbgl-cache-navd.db" |
|
|
|
|
|
|
|
|
|
def gen_llk(): |
|
|
|
|
msg = messaging.new_message('liveLocationKalman') |
|
|
|
@ -24,6 +25,10 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
def setUp(self): |
|
|
|
|
self.sm = messaging.SubMaster(['mapRenderState']) |
|
|
|
|
self.pm = messaging.PubMaster(['liveLocationKalman']) |
|
|
|
|
self.vipc = VisionIpcClient("navd", VisionStreamType.VISION_STREAM_MAP, True) |
|
|
|
|
|
|
|
|
|
if os.path.exists(CACHE_PATH): |
|
|
|
|
os.remove(CACHE_PATH) |
|
|
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
|
managed_processes['mapsd'].stop() |
|
|
|
@ -37,6 +42,11 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
if self.sm.updated['mapRenderState']: |
|
|
|
|
break |
|
|
|
|
assert self.sm.updated['mapRenderState'], "renderer didn't start" |
|
|
|
|
assert VisionIpcClient.available_streams("navd", False) == {VisionStreamType.VISION_STREAM_MAP, } |
|
|
|
|
|
|
|
|
|
# connect to vipc |
|
|
|
|
assert self.vipc.connect(False) |
|
|
|
|
self.vipc.recv() |
|
|
|
|
|
|
|
|
|
# run test |
|
|
|
|
for i in range(20*LLK_DECIMATION): |
|
|
|
@ -53,13 +63,29 @@ class TestMapRenderer(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
# check output |
|
|
|
|
assert self.sm.valid['mapRenderState'] == valid |
|
|
|
|
assert 0. < self.sm['mapRenderState'].renderTime < 0.1 |
|
|
|
|
assert self.sm['mapRenderState'].frameId == (prev_frame_id + 1) |
|
|
|
|
assert self.sm['mapRenderState'].locationMonoTime == llk.logMonoTime |
|
|
|
|
if not valid: |
|
|
|
|
assert self.sm['mapRenderState'].renderTime == 0. |
|
|
|
|
else: |
|
|
|
|
assert 0. < self.sm['mapRenderState'].renderTime < 0.1 |
|
|
|
|
|
|
|
|
|
# check vision ipc output |
|
|
|
|
assert self.vipc.recv() is not None |
|
|
|
|
assert self.vipc.valid == valid |
|
|
|
|
assert self.vipc.timestamp_sof == llk.logMonoTime |
|
|
|
|
assert self.vipc.frame_id == self.sm['mapRenderState'].frameId |
|
|
|
|
|
|
|
|
|
def test_with_internet(self): |
|
|
|
|
self._run_test(True) |
|
|
|
|
|
|
|
|
|
def test_with_no_internet(self): |
|
|
|
|
token = os.environ['MAPBOX_TOKEN'] |
|
|
|
|
try: |
|
|
|
|
os.environ['MAPBOX_TOKEN'] = 'invalid_token' |
|
|
|
|
self._run_test(False) |
|
|
|
|
finally: |
|
|
|
|
os.environ['MAPBOX_TOKEN'] = token |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
unittest.main() |
|
|
|
|