|  |  | @ -22,7 +22,7 @@ C3_HPR = Vec3(0, 0,0) | 
			
		
	
		
		
			
				
					
					|  |  |  | metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"]) |  |  |  | metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"]) | 
			
		
	
		
		
			
				
					
					|  |  |  | metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"]) |  |  |  | metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"]) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def apply_metadrive_patches(): |  |  |  | def apply_metadrive_patches(arrive_dest_done=True): | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   # By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in |  |  |  |   # By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in | 
			
		
	
		
		
			
				
					
					|  |  |  |   def add_image_sensor_patched(self, name: str, cls, args): |  |  |  |   def add_image_sensor_patched(self, name: str, cls, args): | 
			
		
	
		
		
			
				
					
					|  |  |  |     if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]: |  |  |  |     if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]: | 
			
		
	
	
		
		
			
				
					|  |  | @ -44,12 +44,14 @@ def apply_metadrive_patches(): | 
			
		
	
		
		
			
				
					
					|  |  |  |   def arrive_destination_patch(self, *args, **kwargs): |  |  |  |   def arrive_destination_patch(self, *args, **kwargs): | 
			
		
	
		
		
			
				
					
					|  |  |  |     return False |  |  |  |     return False | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   MetaDriveEnv._is_arrive_destination = arrive_destination_patch |  |  |  |   if not arrive_dest_done: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     MetaDriveEnv._is_arrive_destination = arrive_destination_patch | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock, |  |  |  | def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock, | 
			
		
	
		
		
			
				
					
					|  |  |  |                       controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection, |  |  |  |                       controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection, | 
			
		
	
		
		
			
				
					
					|  |  |  |                       exit_event): |  |  |  |                       exit_event): | 
			
		
	
		
		
			
				
					
					|  |  |  |   apply_metadrive_patches() |  |  |  |   arrive_dest_done = config.pop("arrive_dest_done", True) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   apply_metadrive_patches(arrive_dest_done) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3)) |  |  |  |   road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3)) | 
			
		
	
		
		
			
				
					
					|  |  |  |   if dual_camera: |  |  |  |   if dual_camera: | 
			
		
	
	
		
		
			
				
					|  |  | 
 |