#!/usr/bin/env python from multiprocessing import Queue from metadrive.component.sensors.base_camera import _cuda_enable from metadrive.component.map.pg_map import MapGenerateMethod from openpilot.tools.sim.bridge.common import SimulatorBridge from openpilot.tools.sim.bridge.metadrive.metadrive_common import RGBCameraRoad, RGBCameraWide from openpilot.tools.sim.bridge.metadrive.metadrive_world import MetaDriveWorld from openpilot.tools.sim.lib.camerad import W, H def create_map(): return dict( type=MapGenerateMethod.PG_MAP_FILE, lane_num=2, lane_width=3.5, config=[ { "id": "S", "pre_block_socket_index": 0, "length": 60, }, { "id": "C", "pre_block_socket_index": 0, "length": 60, "radius": 600, "angle": 45, "dir": 0, }, ] ) class MetaDriveBridge(SimulatorBridge): TICKS_PER_FRAME = 5 def __init__(self, world_status_q, dual_camera, high_quality): self.world_status_q = world_status_q self.should_render = False super().__init__(dual_camera, high_quality) def spawn_world(self): sensors = { "rgb_road": (RGBCameraRoad, W, H, ) } if self.dual_camera: sensors["rgb_wide"] = (RGBCameraWide, W, H) config = dict( use_render=self.should_render, vehicle_config=dict( enable_reverse=False, image_source="rgb_road", ), sensors=sensors, image_on_cuda=_cuda_enable, image_observation=True, interface_panel=[], out_of_route_done=True, on_continuous_line_done=True, crash_vehicle_done=True, crash_object_done=True, arrive_dest_done=True, traffic_density=0.0, map_config=create_map(), map_region_size=2048, decision_repeat=1, physics_world_step_size=self.TICKS_PER_FRAME/100, preload_models=False ) return MetaDriveWorld(world_status_q, config, self.dual_camera) if __name__ == "__main__": command_queue: Queue = Queue() world_status_q: Queue = Queue() simulator_bridge = MetaDriveBridge(world_status_q, True, False) simulator_process = simulator_bridge.run(command_queue) while True: world_status = world_status_q.get() print(f"World Status: {str(world_status)}") if world_status["status"] == "terminating": break simulator_process.join()