openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

74 lines
2.3 KiB

import ctypes
import functools
import multiprocessing
import numpy as np
import time
from multiprocessing import Pipe, Array
from openpilot.tools.sim.bridge.metadrive.metadrive_process import metadrive_process, metadrive_state
from openpilot.tools.sim.lib.common import SimulatorState, World
from openpilot.tools.sim.lib.camerad import W, H
class MetaDriveWorld(World):
def __init__(self, config, dual_camera = False):
super().__init__(dual_camera)
self.camera_array = Array(ctypes.c_uint8, W*H*3)
self.road_image = np.frombuffer(self.camera_array.get_obj(), dtype=np.uint8).reshape((H, W, 3))
self.controls_send, self.controls_recv = Pipe()
self.state_send, self.state_recv = Pipe()
self.exit_event = multiprocessing.Event()
self.metadrive_process = multiprocessing.Process(name="metadrive process", target=
functools.partial(metadrive_process, dual_camera, config,
self.camera_array, self.controls_recv, self.state_send, self.exit_event))
self.metadrive_process.start()
print("----------------------------------------------------------")
print("---- Spawning Metadrive world, this might take awhile ----")
print("----------------------------------------------------------")
self.state_recv.recv() # wait for a state message to ensure metadrive is launched
self.steer_ratio = 15
self.vc = [0.0,0.0]
self.reset_time = 0
self.should_reset = False
def apply_controls(self, steer_angle, throttle_out, brake_out):
if (time.monotonic() - self.reset_time) > 5:
self.vc[0] = steer_angle
if throttle_out:
self.vc[1] = throttle_out/10
else:
self.vc[1] = -brake_out
else:
self.vc[0] = 0
self.vc[1] = 0
self.controls_send.send([*self.vc, self.should_reset])
def read_sensors(self, state: SimulatorState):
while self.state_recv.poll(0):
md_state: metadrive_state = self.state_recv.recv()
state.velocity = md_state.velocity
state.bearing = md_state.bearing
state.steering_angle = md_state.steering_angle
state.gps.from_xy(md_state.position)
state.valid = True
def read_cameras(self):
pass
def tick(self):
pass
def reset(self):
self.should_reset = True
def close(self):
self.exit_event.set()
self.metadrive_process.join()