You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102 lines
3.5 KiB
102 lines
3.5 KiB
import ctypes
|
|
import functools
|
|
import multiprocessing
|
|
import numpy as np
|
|
import time
|
|
|
|
from multiprocessing import Pipe, Array
|
|
from openpilot.tools.sim.bridge.metadrive.metadrive_process import (metadrive_process, metadrive_simulation_state,
|
|
metadrive_vehicle_state)
|
|
from openpilot.tools.sim.lib.common import SimulatorState, World
|
|
from openpilot.tools.sim.lib.camerad import W, H
|
|
|
|
|
|
class MetaDriveWorld(World):
|
|
def __init__(self, status_q, config, dual_camera = False):
|
|
super().__init__(dual_camera)
|
|
self.status_q = status_q
|
|
self.camera_array = Array(ctypes.c_uint8, W*H*3)
|
|
self.road_image = np.frombuffer(self.camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
|
|
self.wide_camera_array = None
|
|
if dual_camera:
|
|
self.wide_camera_array = Array(ctypes.c_uint8, W*H*3)
|
|
self.wide_road_image = np.frombuffer(self.wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
|
|
|
|
self.controls_send, self.controls_recv = Pipe()
|
|
self.simulation_state_send, self.simulation_state_recv = Pipe()
|
|
self.vehicle_state_send, self.vehicle_state_recv = Pipe()
|
|
|
|
self.exit_event = multiprocessing.Event()
|
|
|
|
self.metadrive_process = multiprocessing.Process(name="metadrive process", target=
|
|
functools.partial(metadrive_process, dual_camera, config,
|
|
self.camera_array, self.wide_camera_array, self.image_lock,
|
|
self.controls_recv, self.simulation_state_send,
|
|
self.vehicle_state_send, self.exit_event))
|
|
|
|
self.metadrive_process.start()
|
|
self.status_q.put({"status": "starting"})
|
|
|
|
print("----------------------------------------------------------")
|
|
print("---- Spawning Metadrive world, this might take awhile ----")
|
|
print("----------------------------------------------------------")
|
|
|
|
self.vehicle_state_recv.recv() # wait for a state message to ensure metadrive is launched
|
|
self.status_q.put({"status": "started"})
|
|
|
|
self.steer_ratio = 15
|
|
self.vc = [0.0,0.0]
|
|
self.reset_time = 0
|
|
self.should_reset = False
|
|
|
|
def apply_controls(self, steer_angle, throttle_out, brake_out):
|
|
if (time.monotonic() - self.reset_time) > 2:
|
|
self.vc[0] = steer_angle
|
|
|
|
if throttle_out:
|
|
self.vc[1] = throttle_out
|
|
else:
|
|
self.vc[1] = -brake_out
|
|
else:
|
|
self.vc[0] = 0
|
|
self.vc[1] = 0
|
|
|
|
self.controls_send.send([*self.vc, self.should_reset])
|
|
self.should_reset = False
|
|
|
|
def read_state(self):
|
|
while self.simulation_state_recv.poll(0):
|
|
md_state: metadrive_simulation_state = self.simulation_state_recv.recv()
|
|
if md_state.done:
|
|
self.status_q.put({
|
|
"status": "terminating",
|
|
"reason": "done",
|
|
"done_info": md_state.done_info
|
|
})
|
|
self.exit_event.set()
|
|
|
|
def read_sensors(self, state: SimulatorState):
|
|
while self.vehicle_state_recv.poll(0):
|
|
md_vehicle: metadrive_vehicle_state = self.vehicle_state_recv.recv()
|
|
state.velocity = md_vehicle.velocity
|
|
state.bearing = md_vehicle.bearing
|
|
state.steering_angle = md_vehicle.steering_angle
|
|
state.gps.from_xy(md_vehicle.position)
|
|
state.valid = True
|
|
|
|
def read_cameras(self):
|
|
pass
|
|
|
|
def tick(self):
|
|
pass
|
|
|
|
def reset(self):
|
|
self.should_reset = True
|
|
|
|
def close(self, reason: str):
|
|
self.status_q.put({
|
|
"status": "terminating",
|
|
"reason": reason,
|
|
})
|
|
self.exit_event.set()
|
|
self.metadrive_process.join()
|
|
|