openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

154 lines
5.3 KiB

import math
import time
import numpy as np
from collections import namedtuple
from panda3d.core import Vec3
from multiprocessing.connection import Connection
from metadrive.engine.core.engine_core import EngineCore
from metadrive.engine.core.image_buffer import ImageBuffer
from metadrive.envs.metadrive_env import MetaDriveEnv
from metadrive.obs.image_obs import ImageObservation
from openpilot.common.realtime import Ratekeeper
from openpilot.tools.sim.lib.common import vec3
from openpilot.tools.sim.lib.camerad import W, H
C3_POSITION = Vec3(0.0, 0, 1.22)
C3_HPR = Vec3(0, 0,0)
metadrive_simulation_state = namedtuple("metadrive_simulation_state", ["running", "done", "done_info"])
metadrive_vehicle_state = namedtuple("metadrive_vehicle_state", ["velocity", "position", "bearing", "steering_angle"])
def apply_metadrive_patches(arrive_dest_done=True):
# By default, metadrive won't try to use cuda images unless it's used as a sensor for vehicles, so patch that in
def add_image_sensor_patched(self, name: str, cls, args):
if self.global_config["image_on_cuda"]:# and name == self.global_config["vehicle_config"]["image_source"]:
sensor = cls(*args, self, cuda=True)
else:
sensor = cls(*args, self, cuda=False)
assert isinstance(sensor, ImageBuffer), "This API is for adding image sensor"
self.sensors[name] = sensor
EngineCore.add_image_sensor = add_image_sensor_patched
# we aren't going to use the built-in observation stack, so disable it to save time
def observe_patched(self, *args, **kwargs):
return self.state
ImageObservation.observe = observe_patched
# disable destination, we want to loop forever
def arrive_destination_patch(self, *args, **kwargs):
return False
if not arrive_dest_done:
MetaDriveEnv._is_arrive_destination = arrive_destination_patch
def metadrive_process(dual_camera: bool, config: dict, camera_array, wide_camera_array, image_lock,
controls_recv: Connection, simulation_state_send: Connection, vehicle_state_send: Connection,
exit_event, op_engaged, test_duration, test_run):
arrive_dest_done = config.pop("arrive_dest_done", True)
apply_metadrive_patches(arrive_dest_done)
road_image = np.frombuffer(camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
if dual_camera:
assert wide_camera_array is not None
wide_road_image = np.frombuffer(wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
env = MetaDriveEnv(config)
def get_current_lane_info(vehicle):
_, lane_info, on_lane = vehicle.navigation._get_current_lane(vehicle)
lane_idx = lane_info[2] if lane_info is not None else None
return lane_idx, on_lane
def reset():
env.reset()
env.vehicle.config["max_speed_km_h"] = 1000
lane_idx_prev, _ = get_current_lane_info(env.vehicle)
simulation_state = metadrive_simulation_state(
running=True,
done=False,
done_info=None,
)
simulation_state_send.send(simulation_state)
return lane_idx_prev
lane_idx_prev = reset()
start_time = None
def get_cam_as_rgb(cam):
cam = env.engine.sensors[cam]
cam.get_cam().reparentTo(env.vehicle.origin)
cam.get_cam().setPos(C3_POSITION)
cam.get_cam().setHpr(C3_HPR)
img = cam.perceive(clip=False)
if type(img) != np.ndarray:
img = img.get() # convert cupy array to numpy
return img
rk = Ratekeeper(100, None)
steer_ratio = 8
vc = [0,0]
while not exit_event.is_set():
vehicle_state = metadrive_vehicle_state(
velocity=vec3(x=float(env.vehicle.velocity[0]), y=float(env.vehicle.velocity[1]), z=0),
position=env.vehicle.position,
bearing=float(math.degrees(env.vehicle.heading_theta)),
steering_angle=env.vehicle.steering * env.vehicle.MAX_STEERING
)
vehicle_state_send.send(vehicle_state)
if controls_recv.poll(0):
while controls_recv.poll(0):
steer_angle, gas, should_reset = controls_recv.recv()
steer_metadrive = steer_angle * 1 / (env.vehicle.MAX_STEERING * steer_ratio)
steer_metadrive = np.clip(steer_metadrive, -1, 1)
vc = [steer_metadrive, gas]
if should_reset:
lane_idx_prev = reset()
start_time = None
is_engaged = op_engaged.is_set()
if is_engaged and start_time is None:
start_time = time.monotonic()
if rk.frame % 5 == 0:
_, _, terminated, _, _ = env.step(vc)
timeout = True if start_time is not None and time.monotonic() - start_time >= test_duration else False
lane_idx_curr, on_lane = get_current_lane_info(env.vehicle)
out_of_lane = lane_idx_curr != lane_idx_prev or not on_lane
lane_idx_prev = lane_idx_curr
if terminated or ((out_of_lane or timeout) and test_run):
if terminated:
done_result = env.done_function("default_agent")
elif out_of_lane:
done_result = (True, {"out_of_lane" : True})
elif timeout:
done_result = (True, {"timeout" : True})
simulation_state = metadrive_simulation_state(
running=False,
done=done_result[0],
done_info=done_result[1],
)
simulation_state_send.send(simulation_state)
if dual_camera:
wide_road_image[...] = get_cam_as_rgb("rgb_wide")
road_image[...] = get_cam_as_rgb("rgb_road")
image_lock.release()
rk.keep_time()