import numpy as np
from common . realtime import sec_since_boot
from selfdrive . controls . lib . drive_helpers import create_event , EventTypes as ET
_DT = 0.01 # update runs at 100Hz
_AWARENESS_TIME = 180 # 3 minutes limit without user touching steering wheels make the car enter a terminal status
_AWARENESS_PRE_TIME = 20. # a first alert is issued 20s before expiration
_AWARENESS_PROMPT_TIME = 5. # a second alert is issued 5s before start decelerating the car
_DISTRACTED_TIME = 6.
_DISTRACTED_PRE_TIME = 4.
_DISTRACTED_PROMPT_TIME = 2.
# measured 1 rad in x FOV. 1152x864 is original image, 160x320 is a right crop for model
_CAMERA_FOV_X = 1. # rad
_CAMERA_FOV_Y = 0.75 # 4/3 aspect ratio
# model output refers to center of cropped image, so need to apply the x displacement offset
_CAMERA_OFFSET_X = 0.3125 #(1152/2 - 0.5*(160*864/320))/1152
_CAMERA_X_CONV = 0.375 # 160*864/320/1152
_PITCH_WEIGHT = 1.5 # pitch matters a lot more
_METRIC_THRESHOLD = 0.4
_PITCH_POS_ALLOWANCE = 0.08 # rad, to not be too sensitive on positive pitch
_DTM = 0.1 # driver monitor runs at 10Hz
_PITCH_NATURAL_OFFSET = 0.1 # people don't seem to look straight when they drive relaxed, rather a bit up
_STD_THRESHOLD = 0.1 # above this standard deviation consider the measurement invalid
_DISTRACTED_FILTER_F = 0.6 # 0.6Hz, 0.25s ts
_DISTRACTED_FILTER_K = 2 * np . pi * _DISTRACTED_FILTER_F * _DTM / ( 1 + 2 * np . pi * _DISTRACTED_FILTER_F * _DTM )
_VARIANCE_FILTER_F = 0.008 # 0.008Hz, 20s ts
_VARIANCE_FILTER_K = 2 * np . pi * _VARIANCE_FILTER_F * _DTM / ( 1 + 2 * np . pi * _VARIANCE_FILTER_F * _DTM )
class MonitorChangeEvent :
NO_CHANGE = 0
PARAM_ON = 1
PARAM_OFF = 2
VARIANCE_HIGH = 3
VARIANCE_LOW = 4
class _DriverPose ( ) :
def __init__ ( self ) :
self . yaw = 0.
self . pitch = 0.
self . roll = 0.
self . yaw_offset = 0.
self . pitch_offset = 0.
def _monitor_hysteresys ( variance_level , monitor_valid_prev ) :
var_thr = 0.63 if monitor_valid_prev else 0.37
return variance_level < var_thr
class DriverStatus ( ) :
def __init__ ( self , monitor_on = False ) :
self . pose = _DriverPose ( )
self . monitor_on = monitor_on
self . monitor_param_on = monitor_on
self . monitor_valid = True # variance needs to be low
self . monitor_change_event = MonitorChangeEvent . NO_CHANGE
self . awareness = 1.
self . driver_distracted = False
self . driver_distraction_level = 0.
self . variance_high = False
self . variance_level = 0.
self . ts_last_check = 0.
self . _set_timers ( )
def _reset_filters ( self ) :
self . driver_distraction_level = 0.
self . variance_level = 0.
self . monitor_valid = True
def _set_timers ( self ) :
if self . monitor_on :
self . threshold_pre = _DISTRACTED_PRE_TIME / _DISTRACTED_TIME
self . threshold_prompt = _DISTRACTED_PROMPT_TIME / _DISTRACTED_TIME
self . step_change = _DT / _DISTRACTED_TIME
else :
self . threshold_pre = _AWARENESS_PRE_TIME / _AWARENESS_TIME
self . threshold_prompt = _AWARENESS_PROMPT_TIME / _AWARENESS_TIME
self . step_change = _DT / _AWARENESS_TIME
def _is_driver_distracted ( self , pose ) :
# to be tuned and to learn the driver's normal pose
yaw_error = pose . yaw - pose . yaw_offset
pitch_error = pose . pitch - pose . pitch_offset - _PITCH_NATURAL_OFFSET
# add positive pitch allowance
if pitch_error > 0. :
pitch_error = max ( pitch_error - _PITCH_POS_ALLOWANCE , 0. )
pitch_error * = _PITCH_WEIGHT
metric = np . sqrt ( yaw_error * * 2 + pitch_error * * 2 )
#print "%02.4f" % np.degrees(pose.pitch), "%02.4f" % np.degrees(pitch_error), "%03.4f" % np.degrees(pose.pitch_offset), metric
return 1 if metric > _METRIC_THRESHOLD else 0
def _monitor_change_check ( self , monitor_param_on_prev , monitor_valid_prev , monitor_on_prev ) :
if not monitor_param_on_prev and self . monitor_param_on :
self . _reset_filters ( )
return MonitorChangeEvent . PARAM_ON
elif monitor_param_on_prev and not self . monitor_param_on :
self . _reset_filters ( )
return MonitorChangeEvent . PARAM_OFF
elif monitor_on_prev and not self . monitor_valid :
return MonitorChangeEvent . VARIANCE_HIGH
elif self . monitor_param_on and not monitor_valid_prev and self . monitor_valid :
return MonitorChangeEvent . VARIANCE_LOW
else :
return MonitorChangeEvent . NO_CHANGE
def get_pose ( self , driver_monitoring , params ) :
self . pose . pitch = driver_monitoring . descriptor [ 0 ]
self . pose . yaw = driver_monitoring . descriptor [ 1 ]
self . pose . roll = driver_monitoring . descriptor [ 2 ]
self . pose . yaw_offset = ( driver_monitoring . descriptor [ 3 ] * _CAMERA_X_CONV + _CAMERA_OFFSET_X ) * _CAMERA_FOV_X
self . pose . pitch_offset = - driver_monitoring . descriptor [ 4 ] * _CAMERA_FOV_Y # positive y is down
self . driver_distracted = self . _is_driver_distracted ( self . pose )
# first order filters
self . driver_distraction_level = ( 1. - _DISTRACTED_FILTER_K ) * self . driver_distraction_level + \
_DISTRACTED_FILTER_K * self . driver_distracted
self . variance_high = driver_monitoring . std > _STD_THRESHOLD
self . variance_level = ( 1. - _VARIANCE_FILTER_K ) * self . variance_level + \
_VARIANCE_FILTER_K * self . variance_high
monitor_param_on_prev = self . monitor_param_on
monitor_valid_prev = self . monitor_valid
monitor_on_prev = self . monitor_on
# don't check for param too often as it's a kernel call
ts = sec_since_boot ( )
if ts - self . ts_last_check > 1. :
self . monitor_param_on = params . get ( " IsDriverMonitoringEnabled " ) == " 1 "
self . ts_last_check = ts
self . monitor_valid = _monitor_hysteresys ( self . variance_level , monitor_valid_prev )
self . monitor_on = self . monitor_valid and self . monitor_param_on
self . monitor_change_event = self . _monitor_change_check ( monitor_param_on_prev , monitor_valid_prev , monitor_on_prev )
self . _set_timers ( )
def update ( self , events , driver_engaged , ctrl_active , standstill ) :
driver_engaged | = ( self . driver_distraction_level < 0.37 and self . monitor_on )
if ( driver_engaged and self . awareness > 0. ) or not ctrl_active :
# always reset if driver is in control (unless we are in red alert state) or op isn't active
self . awareness = 1.
if not ctrl_active :
# hold monitor_level constant when control isn't active
self . variance_level = 0. if self . monitor_valid else 1.
if ( not self . monitor_on or ( self . driver_distraction_level > 0.63 and self . driver_distracted ) ) and \
not ( standstill and self . awareness - self . step_change < = self . threshold_prompt ) :
self . awareness = max ( self . awareness - self . step_change , - 0.1 )
alert = None
if self . awareness < = 0. :
# terminal red alert: disengagement required
alert = ' driverDistracted ' if self . monitor_on else ' driverUnresponsive '
elif self . awareness < = self . threshold_prompt :
# prompt orange alert
alert = ' promptDriverDistracted ' if self . monitor_on else ' promptDriverUnresponsive '
elif self . awareness < = self . threshold_pre :
# pre green alert
alert = ' preDriverDistracted ' if self . monitor_on else ' preDriverUnresponsive '
if alert is not None :
events . append ( create_event ( alert , [ ET . WARNING ] ) )
self . monitor_change_event = MonitorChangeEvent . NO_CHANGE
return events
if __name__ == " __main__ " :
ds = DriverStatus ( True )
ds . driver_distraction_level = 1.
ds . driver_distracted = 1
for i in range ( 1000 ) :
ds . update ( [ ] , False , True , True )
print ( ds . awareness , ds . driver_distracted , ds . driver_distraction_level )
ds . update ( [ ] , True , True , False )
print ( ds . awareness , ds . driver_distracted , ds . driver_distraction_level )