import ctypes
import functools
import multiprocessing
import numpy as np
import time
from multiprocessing import Pipe , Array
from openpilot . tools . sim . bridge . metadrive . metadrive_process import ( metadrive_process , metadrive_simulation_state ,
metadrive_vehicle_state )
from openpilot . tools . sim . lib . common import SimulatorState , World
from openpilot . tools . sim . lib . camerad import W , H
class MetaDriveWorld ( World ) :
def __init__ ( self , status_q , config , dual_camera = False ) :
super ( ) . __init__ ( dual_camera )
self . status_q = status_q
self . camera_array = Array ( ctypes . c_uint8 , W * H * 3 )
self . road_image = np . frombuffer ( self . camera_array . get_obj ( ) , dtype = np . uint8 ) . reshape ( ( H , W , 3 ) )
self . wide_camera_array = None
if dual_camera :
self . wide_camera_array = Array ( ctypes . c_uint8 , W * H * 3 )
self . wide_road_image = np . frombuffer ( self . wide_camera_array . get_obj ( ) , dtype = np . uint8 ) . reshape ( ( H , W , 3 ) )
self . controls_send , self . controls_recv = Pipe ( )
self . simulation_state_send , self . simulation_state_recv = Pipe ( )
self . vehicle_state_send , self . vehicle_state_recv = Pipe ( )
self . exit_event = multiprocessing . Event ( )
self . metadrive_process = multiprocessing . Process ( name = " metadrive process " , target =
functools . partial ( metadrive_process , dual_camera , config ,
self . camera_array , self . wide_camera_array , self . image_lock ,
self . controls_recv , self . simulation_state_send ,
self . vehicle_state_send , self . exit_event ) )
self . metadrive_process . start ( )
self . status_q . put ( { " status " : " starting " } )
print ( " ---------------------------------------------------------- " )
print ( " ---- Spawning Metadrive world, this might take awhile ---- " )
print ( " ---------------------------------------------------------- " )
self . vehicle_state_recv . recv ( ) # wait for a state message to ensure metadrive is launched
self . status_q . put ( { " status " : " started " } )
self . steer_ratio = 15
self . vc = [ 0.0 , 0.0 ]
self . reset_time = 0
self . should_reset = False
def apply_controls ( self , steer_angle , throttle_out , brake_out ) :
if ( time . monotonic ( ) - self . reset_time ) > 2 :
self . vc [ 0 ] = steer_angle
if throttle_out :
self . vc [ 1 ] = throttle_out
else :
self . vc [ 1 ] = - brake_out
else :
self . vc [ 0 ] = 0
self . vc [ 1 ] = 0
self . controls_send . send ( [ * self . vc , self . should_reset ] )
self . should_reset = False
def read_state ( self ) :
while self . simulation_state_recv . poll ( 0 ) :
md_state : metadrive_simulation_state = self . simulation_state_recv . recv ( )
if md_state . done :
self . status_q . put ( {
" status " : " terminating " ,
" reason " : " done " ,
" done_info " : md_state . done_info
} )
self . exit_event . set ( )
def read_sensors ( self , state : SimulatorState ) :
while self . vehicle_state_recv . poll ( 0 ) :
md_vehicle : metadrive_vehicle_state = self . vehicle_state_recv . recv ( )
state . velocity = md_vehicle . velocity
state . bearing = md_vehicle . bearing
state . steering_angle = md_vehicle . steering_angle
state . gps . from_xy ( md_vehicle . position )
state . valid = True
def read_cameras ( self ) :
pass
def tick ( self ) :
pass
def reset ( self ) :
self . should_reset = True
def close ( self , reason : str ) :
self . status_q . put ( {
" status " : " terminating " ,
" reason " : reason ,
} )
self . exit_event . set ( )
self . metadrive_process . join ( )