|
|
|
@ -22,7 +22,7 @@ C3_HPR = Vec3(0, 0,0) |
|
|
|
|
metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"]) |
|
|
|
|
metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"]) |
|
|
|
|
|
|
|
|
|
def apply_metadrive_patches(): |
|
|
|
|
def apply_metadrive_patches(arrive_dest_done=True): |
|
|
|
|
# By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in |
|
|
|
|
def add_image_sensor_patched(self, name: str, cls, args): |
|
|
|
|
if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]: |
|
|
|
@ -44,12 +44,14 @@ def apply_metadrive_patches(): |
|
|
|
|
def arrive_destination_patch(self, *args, **kwargs): |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
MetaDriveEnv._is_arrive_destination = arrive_destination_patch |
|
|
|
|
if not arrive_dest_done: |
|
|
|
|
MetaDriveEnv._is_arrive_destination = arrive_destination_patch |
|
|
|
|
|
|
|
|
|
def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock, |
|
|
|
|
controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection, |
|
|
|
|
exit_event): |
|
|
|
|
apply_metadrive_patches() |
|
|
|
|
arrive_dest_done = config.pop("arrive_dest_done", True) |
|
|
|
|
apply_metadrive_patches(arrive_dest_done) |
|
|
|
|
|
|
|
|
|
road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3)) |
|
|
|
|
if dual_camera: |
|
|
|
|