#!/usr/bin/env python3
import gc
import math
import time
import ctypes
import numpy as np
from pathlib import Path
from typing import Tuple , Dict
from cereal import messaging
from cereal . messaging import PubMaster , SubMaster
from cereal . visionipc import VisionIpcClient , VisionStreamType
from openpilot . system . swaglog import cloudlog
from openpilot . common . params import Params
from openpilot . common . realtime import set_realtime_priority
from openpilot . selfdrive . modeld . constants import IDX_N
from openpilot . selfdrive . modeld . runners import ModelRunner , Runtime
NAV_INPUT_SIZE = 256 * 256
NAV_FEATURE_LEN = 256
NAV_DESIRE_LEN = 32
NAV_OUTPUT_SIZE = 2 * 2 * IDX_N + NAV_DESIRE_LEN + NAV_FEATURE_LEN
MODEL_PATHS = {
ModelRunner . SNPE : Path ( __file__ ) . parent / ' models/navmodel_q.dlc ' ,
ModelRunner . ONNX : Path ( __file__ ) . parent / ' models/navmodel.onnx ' }
class NavModelOutputXY ( ctypes . Structure ) :
_fields_ = [
( " x " , ctypes . c_float ) ,
( " y " , ctypes . c_float ) ]
class NavModelOutputPlan ( ctypes . Structure ) :
_fields_ = [
( " mean " , NavModelOutputXY * IDX_N ) ,
( " std " , NavModelOutputXY * IDX_N ) ]
class NavModelResult ( ctypes . Structure ) :
_fields_ = [
( " plan " , NavModelOutputPlan ) ,
( " desire_pred " , ctypes . c_float * NAV_DESIRE_LEN ) ,
( " features " , ctypes . c_float * NAV_FEATURE_LEN ) ]
class ModelState :
inputs : Dict [ str , np . ndarray ]
output : np . ndarray
model : ModelRunner
def __init__ ( self ) :
assert ctypes . sizeof ( NavModelResult ) == NAV_OUTPUT_SIZE * ctypes . sizeof ( ctypes . c_float )
self . output = np . zeros ( NAV_OUTPUT_SIZE , dtype = np . float32 )
self . inputs = { ' input_img ' : np . zeros ( NAV_INPUT_SIZE , dtype = np . uint8 ) }
self . model = ModelRunner ( MODEL_PATHS , self . output , Runtime . DSP , True , None )
self . model . addInput ( " input_img " , None )
def run ( self , buf : np . ndarray ) - > Tuple [ np . ndarray , float ] :
self . inputs [ ' input_img ' ] [ : ] = buf
t1 = time . perf_counter ( )
self . model . setInputBuffer ( " input_img " , self . inputs [ ' input_img ' ] . view ( np . float32 ) )
self . model . execute ( )
t2 = time . perf_counter ( )
return self . output , t2 - t1
def get_navmodel_packet ( model_output : np . ndarray , valid : bool , frame_id : int , location_ts : int , execution_time : float , dsp_execution_time : float ) :
model_result = ctypes . cast ( model_output . ctypes . data , ctypes . POINTER ( NavModelResult ) ) . contents
msg = messaging . new_message ( ' navModel ' )
msg . valid = valid
msg . navModel . frameId = frame_id
msg . navModel . locationMonoTime = location_ts
msg . navModel . modelExecutionTime = execution_time
msg . navModel . dspExecutionTime = dsp_execution_time
msg . navModel . features = model_result . features [ : ]
msg . navModel . desirePrediction = model_result . desire_pred [ : ]
msg . navModel . position . x = [ p . x for p in model_result . plan . mean ]
msg . navModel . position . y = [ p . y for p in model_result . plan . mean ]
msg . navModel . position . xStd = [ math . exp ( p . x ) for p in model_result . plan . std ]
msg . navModel . position . yStd = [ math . exp ( p . y ) for p in model_result . plan . std ]
return msg
def main ( ) :
gc . disable ( )
set_realtime_priority ( 1 )
# there exists a race condition when two processes try to create a
# SNPE model runner at the same time, wait for dmonitoringmodeld to finish
cloudlog . warning ( " waiting for dmonitoringmodeld to initialize " )
if not Params ( ) . get_bool ( " DmModelInitialized " , True ) :
return
model = ModelState ( )
cloudlog . warning ( " models loaded, navmodeld starting " )
vipc_client = VisionIpcClient ( " navd " , VisionStreamType . VISION_STREAM_MAP , True )
while not vipc_client . connect ( False ) :
time . sleep ( 0.1 )
assert vipc_client . is_connected ( )
cloudlog . warning ( f " connected with buffer size: { vipc_client . buffer_len } " )
sm = SubMaster ( [ " navInstruction " ] )
pm = PubMaster ( [ " navModel " ] )
while True :
buf = vipc_client . recv ( )
if buf is None :
continue
sm . update ( 0 )
t1 = time . perf_counter ( )
model_output , dsp_execution_time = model . run ( buf . data [ : buf . uv_offset ] )
t2 = time . perf_counter ( )
valid = vipc_client . valid and sm . valid [ " navInstruction " ]
pm . send ( " navModel " , get_navmodel_packet ( model_output , valid , vipc_client . frame_id , vipc_client . timestamp_sof , t2 - t1 , dsp_execution_time ) )
if __name__ == " __main__ " :
main ( )