#!/usr/bin/env python3
import os
import unittest
import requests
import threading
import http . server
import cereal . messaging as messaging
from typing import Any
from cereal . visionipc import VisionIpcClient , VisionStreamType
from openpilot . selfdrive . manager . process_config import managed_processes
LLK_DECIMATION = 10
CACHE_PATH = " /data/mbgl-cache-navd.db "
LOCATION1 = ( 32.7174 , - 117.16277 )
LOCATION2 = ( 32.7558 , - 117.2037 )
def gen_llk ( location = LOCATION1 ) :
msg = messaging . new_message ( ' liveLocationKalman ' )
msg . liveLocationKalman . positionGeodetic = { ' value ' : [ * location , 0 ] , ' std ' : [ 0. , 0. , 0. ] , ' valid ' : True }
msg . liveLocationKalman . calibratedOrientationNED = { ' value ' : [ 0. , 0. , 0. ] , ' std ' : [ 0. , 0. , 0. ] , ' valid ' : True }
msg . liveLocationKalman . status = ' valid '
return msg
class MapBoxInternetDisabledRequestHandler ( http . server . BaseHTTPRequestHandler ) :
INTERNET_ACTIVE = True
def setup ( self ) :
if self . INTERNET_ACTIVE :
super ( ) . setup ( )
def handle ( self ) :
if self . INTERNET_ACTIVE :
super ( ) . handle ( )
def finish ( self ) :
if self . INTERNET_ACTIVE :
super ( ) . finish ( )
def do_GET ( self ) :
url = f ' https://api.mapbox.com { self . path } '
headers = dict ( self . headers )
headers [ " Host " ] = " api.mapbox.com "
r = requests . get ( url , headers = headers , timeout = 5 )
self . send_response ( r . status_code )
self . end_headers ( )
self . wfile . write ( r . content )
def log_message ( self , * args : Any ) - > None :
return
def log_error ( self , * args : Any ) - > None :
return
class MapBoxInternetDisabledServer ( threading . Thread ) :
def run ( self ) :
self . server = http . server . HTTPServer ( ( " 127.0.0.1 " , 5000 ) , MapBoxInternetDisabledRequestHandler )
self . server . serve_forever ( )
def stop ( self ) :
self . server . shutdown ( )
def disable_internet ( self ) :
MapBoxInternetDisabledRequestHandler . INTERNET_ACTIVE = False
def enable_internet ( self ) :
MapBoxInternetDisabledRequestHandler . INTERNET_ACTIVE = True
class TestMapRenderer ( unittest . TestCase ) :
server = MapBoxInternetDisabledServer ( )
@classmethod
def setUpClass ( cls ) :
assert " MAPBOX_TOKEN " in os . environ
cls . original_token = os . environ [ " MAPBOX_TOKEN " ]
cls . server . start ( )
@classmethod
def tearDownClass ( cls ) - > None :
cls . server . stop ( )
def setUp ( self ) :
self . server . enable_internet ( )
os . environ [ ' MAPS_HOST ' ] = ' http://localhost:5000 '
self . sm = messaging . SubMaster ( [ ' mapRenderState ' ] )
self . pm = messaging . PubMaster ( [ ' liveLocationKalman ' ] )
self . vipc = VisionIpcClient ( " navd " , VisionStreamType . VISION_STREAM_MAP , True )
if os . path . exists ( CACHE_PATH ) :
os . remove ( CACHE_PATH )
def tearDown ( self ) :
managed_processes [ ' mapsd ' ] . stop ( )
def _setup_test ( self ) :
# start + sync up
managed_processes [ ' mapsd ' ] . start ( )
assert self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
assert VisionIpcClient . available_streams ( " navd " , False ) == { VisionStreamType . VISION_STREAM_MAP , }
assert self . vipc . connect ( False )
self . vipc . recv ( )
def _run_test ( self , expect_valid , location = LOCATION1 ) :
starting_frame_id = None
self . location = location
# run test
prev_frame_id = - 1
for i in range ( 30 * LLK_DECIMATION ) :
frame_expected = ( i + 1 ) % LLK_DECIMATION == 0
if self . sm . logMonoTime [ ' mapRenderState ' ] == 0 :
prev_valid = False
prev_frame_id = - 1
else :
prev_valid = self . sm . valid [ ' mapRenderState ' ]
prev_frame_id = self . sm [ ' mapRenderState ' ] . frameId
if starting_frame_id is None :
starting_frame_id = prev_frame_id
llk = gen_llk ( self . location )
self . pm . send ( " liveLocationKalman " , llk )
self . pm . wait_for_readers_to_update ( " liveLocationKalman " , 10 )
self . sm . update ( 1000 if frame_expected else 0 )
assert self . sm . updated [ ' mapRenderState ' ] == frame_expected , " renderer running at wrong frequency "
if not frame_expected :
continue
frames_since_test_start = self . sm [ ' mapRenderState ' ] . frameId - starting_frame_id
# give a few frames to switch from valid to invalid, or vice versa
invalid_and_not_previously_valid = ( expect_valid and not self . sm . valid [ ' mapRenderState ' ] and not prev_valid )
valid_and_not_previously_invalid = ( not expect_valid and self . sm . valid [ ' mapRenderState ' ] and prev_valid )
if ( invalid_and_not_previously_valid or valid_and_not_previously_invalid ) and frames_since_test_start < 5 :
continue
# check output
assert self . sm . valid [ ' mapRenderState ' ] == expect_valid
assert self . sm [ ' mapRenderState ' ] . frameId == ( prev_frame_id + 1 )
assert self . sm [ ' mapRenderState ' ] . locationMonoTime == llk . logMonoTime
if not expect_valid :
assert self . sm [ ' mapRenderState ' ] . renderTime == 0.
else :
assert 0. < self . sm [ ' mapRenderState ' ] . renderTime < 0.1
# check vision ipc output
assert self . vipc . recv ( ) is not None
assert self . vipc . valid == expect_valid
assert self . vipc . timestamp_sof == llk . logMonoTime
assert self . vipc . frame_id == self . sm [ ' mapRenderState ' ] . frameId
def test_with_internet ( self ) :
self . _setup_test ( )
self . _run_test ( True )
def test_with_no_internet ( self ) :
self . server . disable_internet ( )
self . _setup_test ( )
self . _run_test ( False )
def test_recover_from_no_internet ( self ) :
self . _setup_test ( )
self . _run_test ( True )
self . server . disable_internet ( )
# change locations to force mapsd to refetch
self . _run_test ( False , LOCATION2 )
self . server . enable_internet ( )
self . _run_test ( True , LOCATION2 )
self . location = LOCATION1
self . _run_test ( True , LOCATION2 )
if __name__ == " __main__ " :
unittest . main ( )