import  ctypes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  functools 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  multiprocessing 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  multiprocessing  import  Pipe ,  Array 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . sim . bridge . metadrive . metadrive_process  import  ( metadrive_process ,  metadrive_simulation_state , 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                                                    metadrive_vehicle_state ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . sim . lib . common  import  SimulatorState ,  World 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . tools . sim . lib . camerad  import  W ,  H 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  MetaDriveWorld ( World ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  status_q ,  config ,  dual_camera  =  False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    super ( ) . __init__ ( dual_camera ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . status_q  =  status_q 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . camera_array  =  Array ( ctypes . c_uint8 ,  W * H * 3 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . road_image  =  np . frombuffer ( self . camera_array . get_obj ( ) ,  dtype = np . uint8 ) . reshape ( ( H ,  W ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . wide_camera_array  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  dual_camera : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . wide_camera_array  =  Array ( ctypes . c_uint8 ,  W * H * 3 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . wide_road_image  =  np . frombuffer ( self . wide_camera_array . get_obj ( ) ,  dtype = np . uint8 ) . reshape ( ( H ,  W ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . controls_send ,  self . controls_recv  =  Pipe ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . simulation_state_send ,  self . simulation_state_recv  =  Pipe ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . vehicle_state_send ,  self . vehicle_state_recv  =  Pipe ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . exit_event  =  multiprocessing . Event ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . metadrive_process  =  multiprocessing . Process ( name = " metadrive process " ,  target = 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                              functools . partial ( metadrive_process ,  dual_camera ,  config , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                                                self . camera_array ,  self . wide_camera_array ,  self . image_lock , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                                                self . controls_recv ,  self . simulation_state_send , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                                self . vehicle_state_send ,  self . exit_event ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . metadrive_process . start ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . status_q . put ( { " status " :  " starting " } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    print ( " ---------------------------------------------------------- " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " ---- Spawning Metadrive world, this might take awhile ---- " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( " ---------------------------------------------------------- " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . vehicle_state_recv . recv ( )  # wait for a state message to ensure metadrive is launched 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . status_q . put ( { " status " :  " started " } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . steer_ratio  =  15 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . vc  =  [ 0.0 , 0.0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . reset_time  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . should_reset  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  apply_controls ( self ,  steer_angle ,  throttle_out ,  brake_out ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  ( time . monotonic ( )  -  self . reset_time )  >  2 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . vc [ 0 ]  =  steer_angle 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  throttle_out : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . vc [ 1 ]  =  throttle_out 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . vc [ 1 ]  =  - brake_out 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . vc [ 0 ]  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . vc [ 1 ]  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . controls_send . send ( [ * self . vc ,  self . should_reset ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . should_reset  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  read_state ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  self . simulation_state_recv . poll ( 0 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      md_state :  metadrive_simulation_state  =  self . simulation_state_recv . recv ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  md_state . done : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . status_q . put ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          " status " :  " terminating " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          " reason " :  " done " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          " done_info " :  md_state . done_info 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        } ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . exit_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  read_sensors ( self ,  state :  SimulatorState ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    while  self . vehicle_state_recv . poll ( 0 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      md_vehicle :  metadrive_vehicle_state  =  self . vehicle_state_recv . recv ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      state . velocity  =  md_vehicle . velocity 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      state . bearing  =  md_vehicle . bearing 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      state . steering_angle  =  md_vehicle . steering_angle 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      state . gps . from_xy ( md_vehicle . position ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      state . valid  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  read_cameras ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  tick ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  reset ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . should_reset  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  close ( self ,  reason :  str ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . status_q . put ( { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      " status " :  " terminating " , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      " reason " :  reason , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . exit_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . metadrive_process . join ( )