|  |  |  | import math
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from collections import namedtuple
 | 
					
						
							|  |  |  | from panda3d.core import Vec3
 | 
					
						
							|  |  |  | from multiprocessing.connection import Connection
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from metadrive.engine.core.engine_core import EngineCore
 | 
					
						
							|  |  |  | from metadrive.engine.core.image_buffer import ImageBuffer
 | 
					
						
							|  |  |  | from metadrive.envs.metadrive_env import MetaDriveEnv
 | 
					
						
							|  |  |  | from metadrive.obs.image_obs import ImageObservation
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from openpilot.common.realtime import Ratekeeper
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from openpilot.tools.sim.lib.common import vec3
 | 
					
						
							|  |  |  | from openpilot.tools.sim.lib.camerad import W, H
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | C3_POSITION = Vec3(0.0, 0, 1.22)
 | 
					
						
							|  |  |  | C3_HPR = Vec3(0, 0,0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"])
 | 
					
						
							|  |  |  | metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def apply_metadrive_patches(arrive_dest_done=True):
 | 
					
						
							|  |  |  |   # By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in
 | 
					
						
							|  |  |  |   def add_image_sensor_patched(self, name: str, cls, args):
 | 
					
						
							|  |  |  |     if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]:
 | 
					
						
							|  |  |  |         sensor = cls(*args, self, cuda=True)
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |         sensor = cls(*args, self, cuda=False)
 | 
					
						
							|  |  |  |     assert isinstance(sensor, ImageBuffer), "This API is for adding image sensor"
 | 
					
						
							|  |  |  |     self.sensors[name] = sensor
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   EngineCore.add_image_sensor = add_image_sensor_patched
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # we aren't going to use the built-in observation stack, so disable it to save time
 | 
					
						
							|  |  |  |   def observe_patched(self, *args, **kwargs):
 | 
					
						
							|  |  |  |     return self.state
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   ImageObservation.observe = observe_patched
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # disable destination, we want to loop forever
 | 
					
						
							|  |  |  |   def arrive_destination_patch(self, *args, **kwargs):
 | 
					
						
							|  |  |  |     return False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if not arrive_dest_done:
 | 
					
						
							|  |  |  |     MetaDriveEnv._is_arrive_destination = arrive_destination_patch
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock,
 | 
					
						
							|  |  |  |                       controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection,
 | 
					
						
							|  |  |  |                       exit_event, op_engaged, test_duration, test_run):
 | 
					
						
							|  |  |  |   arrive_dest_done = config.pop("arrive_dest_done", True)
 | 
					
						
							|  |  |  |   apply_metadrive_patches(arrive_dest_done)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
 | 
					
						
							|  |  |  |   if dual_camera:
 | 
					
						
							|  |  |  |     assert wide_camera_array is not None
 | 
					
						
							|  |  |  |     wide_road_image = np.frombuffer(wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   env = MetaDriveEnv(config)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_current_lane_info(vehicle):
 | 
					
						
							|  |  |  |     _, lane_info, on_lane = vehicle.navigation._get_current_lane(vehicle)
 | 
					
						
							|  |  |  |     lane_idx = lane_info[2] if lane_info is not None else None
 | 
					
						
							|  |  |  |     return lane_idx, on_lane
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def reset():
 | 
					
						
							|  |  |  |     env.reset()
 | 
					
						
							|  |  |  |     env.vehicle.config["max_speed_km_h"] = 1000
 | 
					
						
							|  |  |  |     lane_idx_prev, _ = get_current_lane_info(env.vehicle)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     simulation_state = metadrive_simulation_state(
 | 
					
						
							|  |  |  |       running=True,
 | 
					
						
							|  |  |  |       done=False,
 | 
					
						
							|  |  |  |       done_info=None,
 | 
					
						
							|  |  |  |     )
 | 
					
						
							|  |  |  |     simulation_state_send.send(simulation_state)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return lane_idx_prev
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   lane_idx_prev = reset()
 | 
					
						
							|  |  |  |   start_time = None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_cam_as_rgb(cam):
 | 
					
						
							|  |  |  |     cam = env.engine.sensors[cam]
 | 
					
						
							|  |  |  |     cam.get_cam().reparentTo(env.vehicle.origin)
 | 
					
						
							|  |  |  |     cam.get_cam().setPos(C3_POSITION)
 | 
					
						
							|  |  |  |     cam.get_cam().setHpr(C3_HPR)
 | 
					
						
							|  |  |  |     img = cam.perceive(to_float=False)
 | 
					
						
							|  |  |  |     if type(img) != np.ndarray:
 | 
					
						
							|  |  |  |       img = img.get() # convert cupy array to numpy
 | 
					
						
							|  |  |  |     return img
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   rk = Ratekeeper(100, None)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   steer_ratio = 8
 | 
					
						
							|  |  |  |   vc = [0,0]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   while not exit_event.is_set():
 | 
					
						
							|  |  |  |     vehicle_state = metadrive_vehicle_state(
 | 
					
						
							|  |  |  |       velocity=vec3(x=float(env.vehicle.velocity[0]), y=float(env.vehicle.velocity[1]), z=0),
 | 
					
						
							|  |  |  |       position=env.vehicle.position,
 | 
					
						
							|  |  |  |       bearing=float(math.degrees(env.vehicle.heading_theta)),
 | 
					
						
							|  |  |  |       steering_angle=env.vehicle.steering * env.vehicle.MAX_STEERING
 | 
					
						
							|  |  |  |     )
 | 
					
						
							|  |  |  |     vehicle_state_send.send(vehicle_state)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if controls_recv.poll(0):
 | 
					
						
							|  |  |  |       while controls_recv.poll(0):
 | 
					
						
							|  |  |  |         steer_angle, gas, should_reset = controls_recv.recv()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       steer_metadrive = steer_angle * 1 / (env.vehicle.MAX_STEERING * steer_ratio)
 | 
					
						
							|  |  |  |       steer_metadrive = np.clip(steer_metadrive, -1, 1)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       vc = [steer_metadrive, gas]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if should_reset:
 | 
					
						
							|  |  |  |         lane_idx_prev = reset()
 | 
					
						
							|  |  |  |         start_time = None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     is_engaged = op_engaged.is_set()
 | 
					
						
							|  |  |  |     if is_engaged and start_time is None:
 | 
					
						
							|  |  |  |       start_time = time.monotonic()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if rk.frame % 5 == 0:
 | 
					
						
							|  |  |  |       _, _, terminated, _, _ = env.step(vc)
 | 
					
						
							|  |  |  |       timeout = True if start_time is not None and time.monotonic() - start_time >= test_duration else False
 | 
					
						
							|  |  |  |       lane_idx_curr, on_lane = get_current_lane_info(env.vehicle)
 | 
					
						
							|  |  |  |       out_of_lane = lane_idx_curr != lane_idx_prev or not on_lane
 | 
					
						
							|  |  |  |       lane_idx_prev = lane_idx_curr
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if terminated or ((out_of_lane or timeout) and test_run):
 | 
					
						
							|  |  |  |         if terminated:
 | 
					
						
							|  |  |  |           done_result = env.done_function("default_agent")
 | 
					
						
							|  |  |  |         elif out_of_lane:
 | 
					
						
							|  |  |  |           done_result = (True, {"out_of_lane" : True})
 | 
					
						
							|  |  |  |         elif timeout:
 | 
					
						
							|  |  |  |           done_result = (True, {"timeout" : True})
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         simulation_state = metadrive_simulation_state(
 | 
					
						
							|  |  |  |           running=False,
 | 
					
						
							|  |  |  |           done=done_result[0],
 | 
					
						
							|  |  |  |           done_info=done_result[1],
 | 
					
						
							|  |  |  |         )
 | 
					
						
							|  |  |  |         simulation_state_send.send(simulation_state)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if dual_camera:
 | 
					
						
							|  |  |  |         wide_road_image[...] = get_cam_as_rgb("rgb_wide")
 | 
					
						
							|  |  |  |       road_image[...] = get_cam_as_rgb("rgb_road")
 | 
					
						
							|  |  |  |       image_lock.release()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     rk.keep_time()
 |