import  math 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  multiprocessing  import  Queue 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  metadrive . component . sensors . base_camera  import  _cuda_enable 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  metadrive . component . map . pg_map  import  MapGenerateMethod 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . sim . bridge . common  import  SimulatorBridge 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . sim . bridge . metadrive . metadrive_common  import  RGBCameraRoad ,  RGBCameraWide 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . sim . bridge . metadrive . metadrive_world  import  MetaDriveWorld 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . sim . lib . camerad  import  W ,  H 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  straight_block ( length ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " id " :  " S " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " pre_block_socket_index " :  0 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " length " :  length 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  curve_block ( length ,  angle = 45 ,  direction = 0 ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " id " :  " C " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " pre_block_socket_index " :  0 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " length " :  length , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " radius " :  length , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " angle " :  angle , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    " dir " :  direction 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  create_map ( track_size = 60 ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  curve_len  =  track_size  *  2 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  return  dict ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    type = MapGenerateMethod . PG_MAP_FILE , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lane_num = 2 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lane_width = 4.5 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    config = [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      None , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      straight_block ( track_size ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      curve_block ( curve_len ,  90 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      straight_block ( track_size ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      curve_block ( curve_len ,  90 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      straight_block ( track_size ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      curve_block ( curve_len ,  90 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      straight_block ( track_size ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      curve_block ( curve_len ,  90 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  MetaDriveBridge ( SimulatorBridge ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  TICKS_PER_FRAME  =  5 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  dual_camera ,  high_quality ,  test_duration = math . inf ,  test_run = False ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    super ( ) . __init__ ( dual_camera ,  high_quality ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . should_render  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . test_run  =  test_run 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . test_duration  =  test_duration  if  self . test_run  else  math . inf 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  spawn_world ( self ,  queue :  Queue ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sensors  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      " rgb_road " :  ( RGBCameraRoad ,  W ,  H ,  ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . dual_camera : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sensors [ " rgb_wide " ]  =  ( RGBCameraWide ,  W ,  H ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    config  =  dict ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      use_render = self . should_render , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      vehicle_config = dict ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        enable_reverse = False , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        render_vehicle = False , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        image_source = " rgb_road " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sensors = sensors , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      image_on_cuda = _cuda_enable , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      image_observation = True , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      interface_panel = [ ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      out_of_route_done = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      on_continuous_line_done = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      crash_vehicle_done = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      crash_object_done = False , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      arrive_dest_done = False , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      traffic_density = 0.0 ,  # traffic is incredibly expensive 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      map_config = create_map ( ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      decision_repeat = 1 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      physics_world_step_size = self . TICKS_PER_FRAME / 100 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      preload_models = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      show_logo = False , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      anisotropic_filtering = False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  MetaDriveWorld ( queue ,  config ,  self . test_duration ,  self . test_run ,  self . dual_camera )