import math import time import numpy as np from collections import namedtuple from panda3d.core import Vec3 from multiprocessing.connection import Connection from metadrive.engine.core.engine_core import EngineCore from metadrive.engine.core.image_buffer import ImageBuffer from metadrive.envs.metadrive_env import MetaDriveEnv from metadrive.obs.image_obs import ImageObservation from openpilot.common.realtime import Ratekeeper from openpilot.tools.sim.lib.common import vec3 from openpilot.tools.sim.lib.camerad import W, H C3_POSITION = Vec3(0.0, 0, 1.22) C3_HPR = Vec3(0, 0,0) metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"]) metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"]) def apply_metadrive_patches(arrive_dest_done=True): # By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in def add_image_sensor_patched(self, name: str, cls, args): if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]: sensor = cls(*args, self, cuda=True) else: sensor = cls(*args, self, cuda=False) assert isinstance(sensor, ImageBuffer), "This API is for adding image sensor" self.sensors[name] = sensor EngineCore.add_image_sensor = add_image_sensor_patched # we aren't going to use the built-in observation stack, so disable it to save time def observe_patched(self, *args, **kwargs): return self.state ImageObservation.observe = observe_patched # disable destination, we want to loop forever def arrive_destination_patch(self, *args, **kwargs): return False if not arrive_dest_done: MetaDriveEnv._is_arrive_destination = arrive_destination_patch def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock, controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection, exit_event, op_engaged, test_duration, test_run): arrive_dest_done = config.pop("arrive_dest_done", True) apply_metadrive_patches(arrive_dest_done) road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3)) if dual_camera: assert wide_camera_array is not None wide_road_image = np.frombuffer(wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3)) env = MetaDriveEnv(config) def get_current_lane_info(vehicle): _, lane_info, on_lane = vehicle.navigation._get_current_lane(vehicle) lane_idx = lane_info[2] if lane_info is not None else None return lane_idx, on_lane def reset(): env.reset() env.vehicle.config["max_speed_km_h"] = 1000 lane_idx_prev, _ = get_current_lane_info(env.vehicle) simulation_state = metadrive_simulation_state( running=True, done=False, done_info=None, ) simulation_state_send.send(simulation_state) return lane_idx_prev lane_idx_prev = reset() start_time = None def get_cam_as_rgb(cam): cam = env.engine.sensors[cam] cam.get_cam().reparentTo(env.vehicle.origin) cam.get_cam().setPos(C3_POSITION) cam.get_cam().setHpr(C3_HPR) img = cam.perceive(to_float=False) if not isinstance(img, np.ndarray): img = img.get() # convert cupy array to numpy return img rk = Ratekeeper(100, None) steer_ratio = 8 vc = [0,0] while not exit_event.is_set(): vehicle_state = metadrive_vehicle_state( velocity=vec3(x=float(env.vehicle.velocity[0]), y=float(env.vehicle.velocity[1]), z=0), position=env.vehicle.position, bearing=float(math.degrees(env.vehicle.heading_theta)), steering_angle=env.vehicle.steering * env.vehicle.MAX_STEERING ) vehicle_state_send.send(vehicle_state) if controls_recv.poll(0): while controls_recv.poll(0): steer_angle, gas, should_reset = controls_recv.recv() steer_metadrive = steer_angle * 1 / (env.vehicle.MAX_STEERING * steer_ratio) steer_metadrive = np.clip(steer_metadrive, -1, 1) vc = [steer_metadrive, gas] if should_reset: lane_idx_prev = reset() start_time = None is_engaged = op_engaged.is_set() if is_engaged and start_time is None: start_time = time.monotonic() if rk.frame % 5 == 0: _, _, terminated, _, _ = env.step(vc) timeout = True if start_time is not None and time.monotonic() - start_time >= test_duration else False lane_idx_curr, on_lane = get_current_lane_info(env.vehicle) out_of_lane = lane_idx_curr != lane_idx_prev or not on_lane lane_idx_prev = lane_idx_curr if terminated or ((out_of_lane or timeout) and test_run): if terminated: done_result = env.done_function("default_agent") elif out_of_lane: done_result = (True, {"out_of_lane" : True}) elif timeout: done_result = (True, {"timeout" : True}) simulation_state = metadrive_simulation_state( running=False, done=done_result[0], done_info=done_result[1], ) simulation_state_send.send(simulation_state) if dual_camera: wide_road_image[...] = get_cam_as_rgb("rgb_wide") road_image[...] = get_cam_as_rgb("rgb_road") image_lock.release() rk.keep_time()