@ -6,6 +6,7 @@ import math
import numpy as np
import time
import threading
from cereal import log
from typing import Any
import cereal . messaging as messaging
@ -29,13 +30,15 @@ REPEAT_COUNTER = 5
PRINT_DECIMATION = 100
STEER_RATIO = 15.
pm = messaging . PubMaster ( [ ' roadCameraState ' , ' sensorEvents ' , ' can ' ] )
pm = messaging . PubMaster ( [ ' roadCameraState ' , ' sensorEvents ' , ' can ' , " gpsLocationExternal " ] )
sm = messaging . SubMaster ( [ ' carControl ' , ' controlsState ' ] )
class VehicleState :
def __init__ ( self ) :
self . speed = 0
self . angle = 0
self . bearing_deg = 0.0
self . vel = carla . Vector3D ( )
self . cruise_button = 0
self . is_engaged = False
@ -65,7 +68,8 @@ def cam_callback(image):
pm . send ( ' roadCameraState ' , dat )
frame_id + = 1
def imu_callback ( imu ) :
def imu_callback ( imu , vehicle_state ) :
vehicle_state . bearing_deg = math . degrees ( imu . compass )
dat = messaging . new_message ( ' sensorEvents ' , 2 )
dat . sensorEvents [ 0 ] . sensor = 4
dat . sensorEvents [ 0 ] . type = 0x10
@ -92,13 +96,30 @@ def panda_state_function():
pm . send ( ' pandaState ' , dat )
time . sleep ( 0.5 )
def fake_gps ( ) :
# TODO: read GPS from CARLA
pm = messaging . PubMaster ( [ ' gpsLocationExternal ' ] )
while 1 :
dat = messaging . new_message ( ' gpsLocationExternal ' )
pm . send ( ' gpsLocationExternal ' , dat )
time . sleep ( 0.01 )
def gps_callback ( gps , vehicle_state ) :
dat = messaging . new_message ( ' gpsLocationExternal ' )
# transform vel from carla to NED
# north is -Y in CARLA
velNED = [
- vehicle_state . vel . y , # north/south component of NED is negative when moving south
vehicle_state . vel . x , # positive when moving east, which is x in carla
vehicle_state . vel . z ,
]
dat . gpsLocationExternal = {
" flags " : 1 , # valid fix
" verticalAccuracy " : 1.0 ,
" speedAccuracy " : 0.1 ,
" vNED " : velNED ,
" bearingDeg " : vehicle_state . bearing_deg ,
" latitude " : gps . latitude ,
" longitude " : gps . longitude ,
" altitude " : gps . altitude ,
" source " : log . GpsLocationData . SensorSource . ublox ,
}
pm . send ( ' gpsLocationExternal ' , dat )
def fake_driver_monitoring ( ) :
pm = messaging . PubMaster ( [ ' driverState ' , ' driverMonitoringState ' ] )
@ -175,10 +196,16 @@ def bridge(q):
camera = world . spawn_actor ( blueprint , transform , attach_to = vehicle )
camera . listen ( cam_callback )
vehicle_state = VehicleState ( )
# reenable IMU
imu_bp = blueprint_library . find ( ' sensor.other.imu ' )
imu = world . spawn_actor ( imu_bp , transform , attach_to = vehicle )
imu . listen ( imu_callback )
imu . listen ( lambda imu : imu_callback ( imu , vehicle_state ) )
gps_bp = blueprint_library . find ( ' sensor.other.gnss ' )
gps = world . spawn_actor ( gps_bp , transform , attach_to = vehicle )
gps . listen ( lambda gps : gps_callback ( gps , vehicle_state ) )
def destroy ( ) :
print ( " clean exit " )
@ -188,13 +215,9 @@ def bridge(q):
print ( " done " )
atexit . register ( destroy )
vehicle_state = VehicleState ( )
# launch fake car threads
threading . Thread ( target = panda_state_function ) . start ( )
threading . Thread ( target = fake_driver_monitoring ) . start ( )
threading . Thread ( target = fake_gps ) . start ( )
threading . Thread ( target = can_function_runner , args = ( vehicle_state , ) ) . start ( )
# can loop
@ -324,6 +347,7 @@ def bridge(q):
vel = vehicle . get_velocity ( )
speed = math . sqrt ( vel . x * * 2 + vel . y * * 2 + vel . z * * 2 ) # in m/s
vehicle_state . speed = speed
vehicle_state . vel = vel
vehicle_state . angle = steer_out
vehicle_state . cruise_button = cruise_button
vehicle_state . is_engaged = is_openpilot_engaged