#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  requests 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  http . server 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  typing  import  Any 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal . visionipc  import  VisionIpcClient ,  VisionStreamType 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . manager . process_config  import  managed_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LLK_DECIMATION  =  10 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CACHE_PATH  =  " /data/mbgl-cache-navd.db " 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								LOCATION1  =  ( 32.7174 ,  - 117.16277 ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LOCATION2  =  ( 32.7558 ,  - 117.2037 ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  gen_llk ( location = LOCATION1 ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  msg  =  messaging . new_message ( ' liveLocationKalman ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  msg . liveLocationKalman . positionGeodetic  =  { ' value ' :  [ * location ,  0 ] ,  ' std ' :  [ 0. ,  0. ,  0. ] ,  ' valid ' :  True } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  msg . liveLocationKalman . calibratedOrientationNED  =  { ' value ' :  [ 0. ,  0. ,  0. ] ,  ' std ' :  [ 0. ,  0. ,  0. ] ,  ' valid ' :  True } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  msg . liveLocationKalman . status  =  ' valid ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  msg 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  MapBoxInternetDisabledRequestHandler ( http . server . BaseHTTPRequestHandler ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  INTERNET_ACTIVE  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setup ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . INTERNET_ACTIVE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      super ( ) . setup ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  handle ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . INTERNET_ACTIVE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      super ( ) . handle ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  finish ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . INTERNET_ACTIVE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      super ( ) . finish ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  do_GET ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    url  =  f ' https://api.mapbox.com { self . path } ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    headers  =  dict ( self . headers ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    headers [ " Host " ]  =  " api.mapbox.com " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    r  =  requests . get ( url ,  headers = headers ,  timeout = 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . send_response ( r . status_code ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . end_headers ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . wfile . write ( r . content ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  log_message ( self ,  * args :  Any )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  log_error ( self ,  * args :  Any )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  MapBoxInternetDisabledServer ( threading . Thread ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  run ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . server  =  http . server . HTTPServer ( ( " 127.0.0.1 " ,  5000 ) ,  MapBoxInternetDisabledRequestHandler ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . server . serve_forever ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  stop ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . server . shutdown ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  disable_internet ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    MapBoxInternetDisabledRequestHandler . INTERNET_ACTIVE  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  enable_internet ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    MapBoxInternetDisabledRequestHandler . INTERNET_ACTIVE  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestMapRenderer ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  server  =  MapBoxInternetDisabledServer ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  " MAPBOX_TOKEN "  in  os . environ 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cls . original_token  =  os . environ [ " MAPBOX_TOKEN " ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . server . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tearDownClass ( cls )  - >  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cls . server . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . server . enable_internet ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ ' MAPS_HOST ' ]  =  ' http://localhost:5000 ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . sm  =  messaging . SubMaster ( [ ' mapRenderState ' ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . pm  =  messaging . PubMaster ( [ ' liveLocationKalman ' ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . vipc  =  VisionIpcClient ( " navd " ,  VisionStreamType . VISION_STREAM_MAP ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  os . path . exists ( CACHE_PATH ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      os . remove ( CACHE_PATH ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tearDown ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ ' mapsd ' ] . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _setup_test ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # start + sync up 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ ' mapsd ' ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  self . pm . wait_for_readers_to_update ( " liveLocationKalman " ,  10 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  VisionIpcClient . available_streams ( " navd " ,  False )  ==  { VisionStreamType . VISION_STREAM_MAP ,  } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  self . vipc . connect ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . vipc . recv ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _run_test ( self ,  expect_valid ,  location = LOCATION1 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    starting_frame_id  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . location  =  location 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # run test 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    prev_frame_id  =  - 1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i  in  range ( 30 * LLK_DECIMATION ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      frame_expected  =  ( i + 1 )  %  LLK_DECIMATION  ==  0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . sm . logMonoTime [ ' mapRenderState ' ]  ==  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        prev_valid  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        prev_frame_id  =  - 1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        prev_valid  =  self . sm . valid [ ' mapRenderState ' ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        prev_frame_id  =  self . sm [ ' mapRenderState ' ] . frameId 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  starting_frame_id  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        starting_frame_id  =  prev_frame_id 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      llk  =  gen_llk ( self . location ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . pm . send ( " liveLocationKalman " ,  llk ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . pm . wait_for_readers_to_update ( " liveLocationKalman " ,  10 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . sm . update ( 1000  if  frame_expected  else  0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  self . sm . updated [ ' mapRenderState ' ]  ==  frame_expected ,  " renderer running at wrong frequency " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  frame_expected : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      frames_since_test_start  =  self . sm [ ' mapRenderState ' ] . frameId  -  starting_frame_id 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # give a few frames to switch from valid to invalid, or vice versa 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      invalid_and_not_previously_valid  =  ( expect_valid  and  not  self . sm . valid [ ' mapRenderState ' ]  and  not  prev_valid ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      valid_and_not_previously_invalid  =  ( not  expect_valid  and  self . sm . valid [ ' mapRenderState ' ]  and  prev_valid ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  ( invalid_and_not_previously_valid  or  valid_and_not_previously_invalid )  and  frames_since_test_start  <  5 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # check output 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  self . sm . valid [ ' mapRenderState ' ]  ==  expect_valid 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  self . sm [ ' mapRenderState ' ] . frameId  ==  ( prev_frame_id  +  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  self . sm [ ' mapRenderState ' ] . locationMonoTime  ==  llk . logMonoTime 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  not  expect_valid : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  self . sm [ ' mapRenderState ' ] . renderTime  ==  0. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  0.  <  self . sm [ ' mapRenderState ' ] . renderTime  <  0.1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # check vision ipc output 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  self . vipc . recv ( )  is  not  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  self . vipc . valid  ==  expect_valid 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  self . vipc . timestamp_sof  ==  llk . logMonoTime 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  self . vipc . frame_id  ==  self . sm [ ' mapRenderState ' ] . frameId 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_with_internet ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . _setup_test ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_with_no_internet ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . server . disable_internet ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _setup_test ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_recover_from_no_internet ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _setup_test ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . server . disable_internet ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # change locations to force mapsd to refetch 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( False ,  LOCATION2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . server . enable_internet ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( True ,  LOCATION2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . location  =  LOCATION1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _run_test ( True ,  LOCATION2 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )