open source driving agent

82 lines
2.7 KiB

import ctypes
import functools
import multiprocessing
import numpy as np
import time
from multiprocessing import Pipe, Array
from openpilot.tools.sim.bridge.metadrive.metadrive_process import metadrive_process, metadrive_state
from openpilot.tools.sim.lib.common import SimulatorState, World
from openpilot.tools.sim.lib.camerad import W, H
class MetaDriveWorld(World):
def __init__(self, config, dual_camera = False):
super().__init__(dual_camera)
self.camera_array = Array(ctypes.c_uint8, W*H*3)
self.road_image = np.frombuffer(self.camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
self.wide_camera_array = None
if dual_camera:
self.wide_camera_array = Array(ctypes.c_uint8, W*H*3)
self.wide_road_image = np.frombuffer(self.wide_camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
self.controls_send, self.controls_recv = Pipe()
self.state_send, self.state_recv = Pipe()
self.exit_event = multiprocessing.Event()
self.metadrive_process = multiprocessing.Process(name="metadrive process", target=
functools.partial(metadrive_process, dual_camera, config,
self.camera_array, self.wide_camera_array, self.image_lock,
self.controls_recv, self.state_send, self.exit_event))
self.metadrive_process.start()
print("----------------------------------------------------------")
print("---- Spawning Metadrive world, this might take awhile ----")
print("----------------------------------------------------------")
self.state_recv.recv() # wait for a state message to ensure metadrive is launched
self.steer_ratio = 15
self.vc = [0.0,0.0]
self.reset_time = 0
self.should_reset = False
def apply_controls(self, steer_angle, throttle_out, brake_out):
if (time.monotonic() - self.reset_time) > 2:
self.vc[0] = steer_angle
if throttle_out:
self.vc[1] = throttle_out
else:
self.vc[1] = -brake_out
else:
self.vc[0] = 0
self.vc[1] = 0
self.controls_send.send([*self.vc, self.should_reset])
self.should_reset = False
def read_sensors(self, state: SimulatorState):
while self.state_recv.poll(0):
md_state: metadrive_state = self.state_recv.recv()
state.velocity = md_state.velocity
state.bearing = md_state.bearing
state.steering_angle = md_state.steering_angle
state.gps.from_xy(md_state.position)
state.valid = True
def read_cameras(self):
pass
def tick(self):
pass
def reset(self):
self.should_reset = True
def close(self):
self.exit_event.set()
self.metadrive_process.join()