diff --git a/selfdrive/monitoring/dmonitoringd.py b/selfdrive/monitoring/dmonitoringd.py index 8321d37347..f11d72266c 100755 --- a/selfdrive/monitoring/dmonitoringd.py +++ b/selfdrive/monitoring/dmonitoringd.py @@ -3,7 +3,7 @@ from cereal import car from common.params import Params import cereal.messaging as messaging from selfdrive.controls.lib.events import Events -from selfdrive.monitoring.driver_monitor import DriverStatus, MAX_TERMINAL_ALERTS, MAX_TERMINAL_DURATION +from selfdrive.monitoring.driver_monitor import DriverStatus from selfdrive.locationd.calibrationd import Calibration @@ -50,7 +50,8 @@ def dmonitoringd_thread(sm=None, pm=None): driver_status.get_pose(sm['driverState'], sm['liveCalibration'].rpyCalib, sm['carState'].vEgo, sm['controlsState'].enabled) # Block engaging after max number of distrations - if driver_status.terminal_alert_cnt >= MAX_TERMINAL_ALERTS or driver_status.terminal_time >= MAX_TERMINAL_DURATION: + if driver_status.terminal_alert_cnt >= driver_status.settings._MAX_TERMINAL_ALERTS or \ + driver_status.terminal_time >= driver_status.settings._MAX_TERMINAL_DURATION: events.add(car.CarEvent.EventName.tooDistracted) # Update events from driver state diff --git a/selfdrive/monitoring/driver_monitor.py b/selfdrive/monitoring/driver_monitor.py index c7e2d147ef..1e45295f3f 100644 --- a/selfdrive/monitoring/driver_monitor.py +++ b/selfdrive/monitoring/driver_monitor.py @@ -15,43 +15,46 @@ EventName = car.CarEvent.EventName # We recommend that you do not change these numbers from the defaults. # ****************************************************************************************** -_AWARENESS_TIME = 35. # passive wheel touch total timeout -_AWARENESS_PRE_TIME_TILL_TERMINAL = 12. -_AWARENESS_PROMPT_TIME_TILL_TERMINAL = 6. -_DISTRACTED_TIME = 11. -_DISTRACTED_PRE_TIME_TILL_TERMINAL = 8. -_DISTRACTED_PROMPT_TIME_TILL_TERMINAL = 6. - -_FACE_THRESHOLD = 0.5 -_PARTIAL_FACE_THRESHOLD = 0.75 if TICI else 0.5 -_EYE_THRESHOLD = 0.5 -_SG_THRESHOLD = 0.5 -_BLINK_THRESHOLD = 0.88 if TICI else 0.5 -_BLINK_THRESHOLD_SLACK = 0.98 if TICI else 0.65 -_BLINK_THRESHOLD_STRICT = 0.88 if TICI else 0.5 -_PITCH_WEIGHT = 1.175 if TICI else 1.35 # pitch matters a lot more -_POSESTD_THRESHOLD = 0.318 if TICI else 0.14 -_E2E_POSE_THRESHOLD = 0.95 if TICI else 0.9 -_E2E_EYES_THRESHOLD = 0.75 -_METRIC_THRESHOLD = 0.5 if TICI else 0.4 -_METRIC_THRESHOLD_SLACK = 0.6875 if TICI else 0.55 -_METRIC_THRESHOLD_STRICT = 0.5 if TICI else 0.4 -_PITCH_POS_ALLOWANCE = 0.12 # rad, to not be too sensitive on positive pitch -_PITCH_NATURAL_OFFSET = 0.02 # people don't seem to look straight when they drive relaxed, rather a bit up -_YAW_NATURAL_OFFSET = 0.08 # people don't seem to look straight when they drive relaxed, rather a bit to the right (center of car) - -_HI_STD_FALLBACK_TIME = 10 # fall back to wheel touch if model is uncertain for a long time -_DISTRACTED_FILTER_TS = 0.25 # 0.6Hz - -_POSE_CALIB_MIN_SPEED = 13 # 30 mph -_POSE_OFFSET_MIN_COUNT = 600 # valid data counts before calibration completes, 1 seg is 600 counts -_POSE_OFFSET_MAX_COUNT = 3600 # stop deweighting new data after 6 min, aka "short term memory" - -_RECOVERY_FACTOR_MAX = 5. # relative to minus step change -_RECOVERY_FACTOR_MIN = 1.25 # relative to minus step change - -MAX_TERMINAL_ALERTS = 3 # not allowed to engage after 3 terminal alerts -MAX_TERMINAL_DURATION = 300 # 30s +class DRIVER_MONITOR_SETTINGS(): + _AWARENESS_TIME = 35. # passive wheeltouch total timeout + _AWARENESS_PRE_TIME_TILL_TERMINAL = 12. + _AWARENESS_PROMPT_TIME_TILL_TERMINAL = 6. + _DISTRACTED_TIME = 11. # active monitoring total timeout + _DISTRACTED_PRE_TIME_TILL_TERMINAL = 8. + _DISTRACTED_PROMPT_TIME_TILL_TERMINAL = 6. + + _FACE_THRESHOLD = 0.5 + _PARTIAL_FACE_THRESHOLD = 0.75 if TICI else 0.5 + _EYE_THRESHOLD = 0.5 + _SG_THRESHOLD = 0.5 + _BLINK_THRESHOLD = 0.88 if TICI else 0.5 + _BLINK_THRESHOLD_SLACK = 0.98 if TICI else 0.65 + _BLINK_THRESHOLD_STRICT = 0.88 if TICI else 0.5 + _PITCH_WEIGHT = 1.175 if TICI else 1.35 # pitch matters a lot more + _POSESTD_THRESHOLD = 0.318 if TICI else 0.14 + _E2E_POSE_THRESHOLD = 0.95 if TICI else 0.9 + _E2E_EYES_THRESHOLD = 0.75 + + _METRIC_THRESHOLD = 0.5 if TICI else 0.4 + _METRIC_THRESHOLD_SLACK = 0.6875 if TICI else 0.55 + _METRIC_THRESHOLD_STRICT = 0.5 if TICI else 0.4 + _PITCH_POS_ALLOWANCE = 0.12 # rad, to not be too sensitive on positive pitch + _PITCH_NATURAL_OFFSET = 0.02 # people don't seem to look straight when they drive relaxed, rather a bit up + _YAW_NATURAL_OFFSET = 0.08 # people don't seem to look straight when they drive relaxed, rather a bit to the right (center of car) + + _HI_STD_FALLBACK_TIME = int(10 / DT_DMON) # fall back to wheel touch if model is uncertain for 10s + _DISTRACTED_FILTER_TS = 0.25 # 0.6Hz + + _POSE_CALIB_MIN_SPEED = 13 # 30 mph + _POSE_OFFSET_MIN_COUNT = int(60 / DT_DMON) # valid data counts before calibration completes, 1min cumulative + _POSE_OFFSET_MAX_COUNT = int(360 / DT_DMON) # stop deweighting new data after 6 min, aka "short term memory" + + _RECOVERY_FACTOR_MAX = 5. # relative to minus step change + _RECOVERY_FACTOR_MIN = 1.25 # relative to minus step change + + _MAX_TERMINAL_ALERTS = 3 # not allowed to engage after 3 terminal alerts + _MAX_TERMINAL_DURATION = int(30 / DT_DMON) # not allowed to engage after 30s of terminal alerts + # model output refers to center of cropped image, so need to apply the x displacement offset RESIZED_FOCAL = 320.0 @@ -81,15 +84,15 @@ def face_orientation_from_net(angles_desc, pos_desc, rpy_calib, is_rhd): return roll_net, pitch, yaw class DriverPose(): - def __init__(self): + def __init__(self, max_trackable): self.yaw = 0. self.pitch = 0. self.roll = 0. self.yaw_std = 0. self.pitch_std = 0. self.roll_std = 0. - self.pitch_offseter = RunningStatFilter(max_trackable=_POSE_OFFSET_MAX_COUNT) - self.yaw_offseter = RunningStatFilter(max_trackable=_POSE_OFFSET_MAX_COUNT) + self.pitch_offseter = RunningStatFilter(max_trackable=max_trackable) + self.yaw_offseter = RunningStatFilter(max_trackable=max_trackable) self.low_std = True self.cfactor = 1. @@ -100,17 +103,20 @@ class DriverBlink(): self.cfactor = 1. class DriverStatus(): - def __init__(self, rhd=False): + def __init__(self, rhd=False, settings=DRIVER_MONITOR_SETTINGS): + # init policy settings + self.settings = settings + + # init driver status self.is_rhd_region = rhd - self.pose = DriverPose() - self.pose_calibrated = self.pose.pitch_offseter.filtered_stat.n > _POSE_OFFSET_MIN_COUNT and \ - self.pose.yaw_offseter.filtered_stat.n > _POSE_OFFSET_MIN_COUNT + self.pose = DriverPose(self.settings._POSE_OFFSET_MAX_COUNT) + self.pose_calibrated = False self.blink = DriverBlink() self.awareness = 1. self.awareness_active = 1. self.awareness_passive = 1. self.driver_distracted = False - self.driver_distraction_filter = FirstOrderFilter(0., _DISTRACTED_FILTER_TS, DT_DMON) + self.driver_distraction_filter = FirstOrderFilter(0., self.settings._DISTRACTED_FILTER_TS, DT_DMON) self.face_detected = False self.face_partial = False self.terminal_alert_cnt = 0 @@ -119,14 +125,15 @@ class DriverStatus(): self.active_monitoring_mode = True self.is_model_uncertain = False self.hi_stds = 0 - self.threshold_prompt = _DISTRACTED_PROMPT_TIME_TILL_TERMINAL / _DISTRACTED_TIME + self.threshold_pre = self.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL / self.settings._DISTRACTED_TIME + self.threshold_prompt = self.settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL / self.settings._DISTRACTED_TIME self._set_timers(active_monitoring=True) def _set_timers(self, active_monitoring): if self.active_monitoring_mode and self.awareness <= self.threshold_prompt: if active_monitoring: - self.step_change = DT_DMON / _DISTRACTED_TIME + self.step_change = DT_DMON / self.settings._DISTRACTED_TIME else: self.step_change = 0. return # no exploit after orange alert @@ -139,79 +146,85 @@ class DriverStatus(): self.awareness_passive = self.awareness self.awareness = self.awareness_active - self.threshold_pre = _DISTRACTED_PRE_TIME_TILL_TERMINAL / _DISTRACTED_TIME - self.threshold_prompt = _DISTRACTED_PROMPT_TIME_TILL_TERMINAL / _DISTRACTED_TIME - self.step_change = DT_DMON / _DISTRACTED_TIME + self.threshold_pre = self.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL / self.settings._DISTRACTED_TIME + self.threshold_prompt = self.settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL / self.settings._DISTRACTED_TIME + self.step_change = DT_DMON / self.settings._DISTRACTED_TIME self.active_monitoring_mode = True else: if self.active_monitoring_mode: self.awareness_active = self.awareness self.awareness = self.awareness_passive - self.threshold_pre = _AWARENESS_PRE_TIME_TILL_TERMINAL / _AWARENESS_TIME - self.threshold_prompt = _AWARENESS_PROMPT_TIME_TILL_TERMINAL / _AWARENESS_TIME - self.step_change = DT_DMON / _AWARENESS_TIME + self.threshold_pre = self.settings._AWARENESS_PRE_TIME_TILL_TERMINAL / self.settings._AWARENESS_TIME + self.threshold_prompt = self.settings._AWARENESS_PROMPT_TIME_TILL_TERMINAL / self.settings._AWARENESS_TIME + self.step_change = DT_DMON / self.settings._AWARENESS_TIME self.active_monitoring_mode = False def _is_driver_distracted(self, pose, blink): if not self.pose_calibrated: - pitch_error = pose.pitch - _PITCH_NATURAL_OFFSET - yaw_error = pose.yaw - _YAW_NATURAL_OFFSET + pitch_error = pose.pitch - self.settings._PITCH_NATURAL_OFFSET + yaw_error = pose.yaw - self.settings._YAW_NATURAL_OFFSET else: pitch_error = pose.pitch - self.pose.pitch_offseter.filtered_stat.mean() yaw_error = pose.yaw - self.pose.yaw_offseter.filtered_stat.mean() # positive pitch allowance if pitch_error > 0.: - pitch_error = max(pitch_error - _PITCH_POS_ALLOWANCE, 0.) - pitch_error *= _PITCH_WEIGHT + pitch_error = max(pitch_error - self.settings._PITCH_POS_ALLOWANCE, 0.) + pitch_error *= self.settings._PITCH_WEIGHT pose_metric = sqrt(yaw_error**2 + pitch_error**2) - if pose_metric > _METRIC_THRESHOLD*pose.cfactor: + if pose_metric > self.settings._METRIC_THRESHOLD*pose.cfactor: return DistractedType.BAD_POSE - elif (blink.left_blink + blink.right_blink)*0.5 > _BLINK_THRESHOLD*blink.cfactor: + elif (blink.left_blink + blink.right_blink)*0.5 > self.settings._BLINK_THRESHOLD*blink.cfactor: return DistractedType.BAD_BLINK else: return DistractedType.NOT_DISTRACTED def set_policy(self, model_data): ep = min(model_data.meta.engagedProb, 0.8) / 0.8 - self.pose.cfactor = interp(ep, [0, 0.5, 1], [_METRIC_THRESHOLD_STRICT, _METRIC_THRESHOLD, _METRIC_THRESHOLD_SLACK])/_METRIC_THRESHOLD - self.blink.cfactor = interp(ep, [0, 0.5, 1], [_BLINK_THRESHOLD_STRICT, _BLINK_THRESHOLD, _BLINK_THRESHOLD_SLACK])/_BLINK_THRESHOLD + self.pose.cfactor = interp(ep, [0, 0.5, 1], + [self.settings._METRIC_THRESHOLD_STRICT, + self.settings. _METRIC_THRESHOLD, + self.settings._METRIC_THRESHOLD_SLACK]) / self.settings._METRIC_THRESHOLD + self.blink.cfactor = interp(ep, [0, 0.5, 1], + [self.settings._BLINK_THRESHOLD_STRICT, + self.settings._BLINK_THRESHOLD, + self.settings._BLINK_THRESHOLD_SLACK]) / self.settings._BLINK_THRESHOLD def get_pose(self, driver_state, cal_rpy, car_speed, op_engaged): if not all(len(x) > 0 for x in [driver_state.faceOrientation, driver_state.facePosition, driver_state.faceOrientationStd, driver_state.facePositionStd]): return - self.face_partial = driver_state.partialFace > _PARTIAL_FACE_THRESHOLD - self.face_detected = driver_state.faceProb > _FACE_THRESHOLD or self.face_partial + self.face_partial = driver_state.partialFace > self.settings._PARTIAL_FACE_THRESHOLD + self.face_detected = driver_state.faceProb > self.settings._FACE_THRESHOLD or self.face_partial self.pose.roll, self.pose.pitch, self.pose.yaw = face_orientation_from_net(driver_state.faceOrientation, driver_state.facePosition, cal_rpy, self.is_rhd_region) self.pose.pitch_std = driver_state.faceOrientationStd[0] self.pose.yaw_std = driver_state.faceOrientationStd[1] # self.pose.roll_std = driver_state.faceOrientationStd[2] model_std_max = max(self.pose.pitch_std, self.pose.yaw_std) - self.pose.low_std = model_std_max < _POSESTD_THRESHOLD and not self.face_partial - self.blink.left_blink = driver_state.leftBlinkProb * (driver_state.leftEyeProb > _EYE_THRESHOLD) * (driver_state.sunglassesProb < _SG_THRESHOLD) - self.blink.right_blink = driver_state.rightBlinkProb * (driver_state.rightEyeProb > _EYE_THRESHOLD) * (driver_state.sunglassesProb < _SG_THRESHOLD) - - distracted_normal = (self._is_driver_distracted(self.pose, self.blink) > 0 and - driver_state.faceProb > _FACE_THRESHOLD and self.pose.low_std) - distracted_E2E = ((driver_state.distractedPose > _E2E_POSE_THRESHOLD or driver_state.distractedEyes > _E2E_EYES_THRESHOLD) and - (self.face_detected and not self.face_partial)) + self.pose.low_std = model_std_max < self.settings._POSESTD_THRESHOLD and not self.face_partial + self.blink.left_blink = driver_state.leftBlinkProb * (driver_state.leftEyeProb > self.settings._EYE_THRESHOLD) * (driver_state.sunglassesProb < self.settings._SG_THRESHOLD) + self.blink.right_blink = driver_state.rightBlinkProb * (driver_state.rightEyeProb > self.settings._EYE_THRESHOLD) * (driver_state.sunglassesProb < self.settings._SG_THRESHOLD) + + distracted_normal = self._is_driver_distracted(self.pose, self.blink) > 0 and \ + driver_state.faceProb > self.settings._FACE_THRESHOLD and self.pose.low_std + distracted_E2E = (driver_state.distractedPose > self.settings._E2E_POSE_THRESHOLD or driver_state.distractedEyes > self.settings._E2E_EYES_THRESHOLD) and \ + (self.face_detected and not self.face_partial) self.driver_distracted = distracted_normal or distracted_E2E self.driver_distraction_filter.update(self.driver_distracted) # update offseter # only update when driver is actively driving the car above a certain speed - if self.face_detected and car_speed > _POSE_CALIB_MIN_SPEED and self.pose.low_std and (not op_engaged or not self.driver_distracted): + if self.face_detected and car_speed > self.settings._POSE_CALIB_MIN_SPEED and self.pose.low_std and (not op_engaged or not self.driver_distracted): self.pose.pitch_offseter.push_and_update(self.pose.pitch) self.pose.yaw_offseter.push_and_update(self.pose.yaw) - self.pose_calibrated = self.pose.pitch_offseter.filtered_stat.n > _POSE_OFFSET_MIN_COUNT and \ - self.pose.yaw_offseter.filtered_stat.n > _POSE_OFFSET_MIN_COUNT + self.pose_calibrated = self.pose.pitch_offseter.filtered_stat.n > self.settings._POSE_OFFSET_MIN_COUNT and \ + self.pose.yaw_offseter.filtered_stat.n > self.settings._POSE_OFFSET_MIN_COUNT - self.is_model_uncertain = self.hi_stds * DT_DMON > _HI_STD_FALLBACK_TIME + self.is_model_uncertain = self.hi_stds > self.settings._HI_STD_FALLBACK_TIME self._set_timers(self.face_detected and not self.is_model_uncertain) if self.face_detected and not self.pose.low_std and not self.driver_distracted: self.hi_stds += 1 @@ -231,7 +244,7 @@ class DriverStatus(): if (driver_attentive and self.face_detected and self.pose.low_std and self.awareness > 0): # only restore awareness when paying attention and alert is not red - self.awareness = min(self.awareness + ((_RECOVERY_FACTOR_MAX-_RECOVERY_FACTOR_MIN)*(1.-self.awareness)+_RECOVERY_FACTOR_MIN)*self.step_change, 1.) + self.awareness = min(self.awareness + ((self.settings._RECOVERY_FACTOR_MAX-self.settings._RECOVERY_FACTOR_MIN)*(1.-self.awareness)+self.settings._RECOVERY_FACTOR_MIN)*self.step_change, 1.) if self.awareness == 1.: self.awareness_passive = min(self.awareness_passive + self.step_change, 1.) # don't display alert banner when awareness is recovering and has cleared orange @@ -240,7 +253,7 @@ class DriverStatus(): standstill_exemption = standstill and self.awareness - self.step_change <= self.threshold_prompt certainly_distracted = self.driver_distraction_filter.x > 0.63 and self.driver_distracted and self.face_detected - maybe_distracted = self.hi_stds * DT_DMON > _HI_STD_FALLBACK_TIME or not self.face_detected + maybe_distracted = self.hi_stds > self.settings._HI_STD_FALLBACK_TIME or not self.face_detected if certainly_distracted or maybe_distracted: # should always be counting if distracted unless at standstill and reaching orange if not standstill_exemption: diff --git a/selfdrive/monitoring/test_monitoring.py b/selfdrive/monitoring/test_monitoring.py index 9d660496af..11155a30ce 100755 --- a/selfdrive/monitoring/test_monitoring.py +++ b/selfdrive/monitoring/test_monitoring.py @@ -5,19 +5,16 @@ import numpy as np from cereal import car, log from common.realtime import DT_DMON from selfdrive.controls.lib.events import Events -from selfdrive.monitoring.driver_monitor import DriverStatus, \ - _AWARENESS_TIME, _AWARENESS_PRE_TIME_TILL_TERMINAL, \ - _AWARENESS_PROMPT_TIME_TILL_TERMINAL, _DISTRACTED_TIME, \ - _DISTRACTED_PRE_TIME_TILL_TERMINAL, _DISTRACTED_PROMPT_TIME_TILL_TERMINAL, \ - _POSESTD_THRESHOLD, _HI_STD_FALLBACK_TIME +from selfdrive.monitoring.driver_monitor import DriverStatus, DRIVER_MONITOR_SETTINGS EventName = car.CarEvent.EventName +dm_settings = DRIVER_MONITOR_SETTINGS() -_TEST_TIMESPAN = 120 # seconds -_DISTRACTED_SECONDS_TO_ORANGE = _DISTRACTED_TIME - _DISTRACTED_PROMPT_TIME_TILL_TERMINAL + 1 -_DISTRACTED_SECONDS_TO_RED = _DISTRACTED_TIME + 1 -_INVISIBLE_SECONDS_TO_ORANGE = _AWARENESS_TIME - _AWARENESS_PROMPT_TIME_TILL_TERMINAL + 1 -_INVISIBLE_SECONDS_TO_RED = _AWARENESS_TIME + 1 +TEST_TIMESPAN = 120 # seconds +DISTRACTED_SECONDS_TO_ORANGE = dm_settings._DISTRACTED_TIME - dm_settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL + 1 +DISTRACTED_SECONDS_TO_RED = dm_settings._DISTRACTED_TIME + 1 +INVISIBLE_SECONDS_TO_ORANGE = dm_settings._AWARENESS_TIME - dm_settings._AWARENESS_PROMPT_TIME_TILL_TERMINAL + 1 +INVISIBLE_SECONDS_TO_RED = dm_settings._AWARENESS_TIME + 1 def make_msg(face_detected, distracted=False, model_uncertain=False): ds = log.DriverState.new_message() @@ -39,21 +36,22 @@ msg_ATTENTIVE = make_msg(True) msg_DISTRACTED = make_msg(True, distracted=True) msg_ATTENTIVE_UNCERTAIN = make_msg(True, model_uncertain=True) msg_DISTRACTED_UNCERTAIN = make_msg(True, distracted=True, model_uncertain=True) -msg_DISTRACTED_BUT_SOMEHOW_UNCERTAIN = make_msg(True, distracted=True, model_uncertain=_POSESTD_THRESHOLD*1.5) +msg_DISTRACTED_BUT_SOMEHOW_UNCERTAIN = make_msg(True, distracted=True, model_uncertain=dm_settings._POSESTD_THRESHOLD*1.5) # driver interaction with car car_interaction_DETECTED = True car_interaction_NOT_DETECTED = False # some common state vectors -always_no_face = [msg_NO_FACE_DETECTED] * int(_TEST_TIMESPAN/DT_DMON) -always_attentive = [msg_ATTENTIVE] * int(_TEST_TIMESPAN/DT_DMON) -always_distracted = [msg_DISTRACTED] * int(_TEST_TIMESPAN/DT_DMON) -always_true = [True] * int(_TEST_TIMESPAN/DT_DMON) -always_false = [False] * int(_TEST_TIMESPAN/DT_DMON) +always_no_face = [msg_NO_FACE_DETECTED] * int(TEST_TIMESPAN / DT_DMON) +always_attentive = [msg_ATTENTIVE] * int(TEST_TIMESPAN / DT_DMON) +always_distracted = [msg_DISTRACTED] * int(TEST_TIMESPAN / DT_DMON) +always_true = [True] * int(TEST_TIMESPAN / DT_DMON) +always_false = [False] * int(TEST_TIMESPAN / DT_DMON) # TODO: this only tests DriverStatus class TestMonitoring(unittest.TestCase): + # pylint: disable=no-member def _run_seq(self, msgs, interaction, engaged, standstill): DS = DriverStatus() events = [] @@ -79,40 +77,40 @@ class TestMonitoring(unittest.TestCase): # engaged, driver is distracted and does nothing def test_fully_distracted_driver(self): events, d_status = self._run_seq(always_distracted, always_false, always_true, always_false) - self.assertEqual(len(events[int((_DISTRACTED_TIME-_DISTRACTED_PRE_TIME_TILL_TERMINAL)/2/DT_DMON)]), 0) - self.assertEqual(events[int((_DISTRACTED_TIME-_DISTRACTED_PRE_TIME_TILL_TERMINAL + - ((_DISTRACTED_PRE_TIME_TILL_TERMINAL-_DISTRACTED_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.preDriverDistracted) - self.assertEqual(events[int((_DISTRACTED_TIME-_DISTRACTED_PROMPT_TIME_TILL_TERMINAL + - ((_DISTRACTED_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.promptDriverDistracted) - self.assertEqual(events[int((_DISTRACTED_TIME + - ((_TEST_TIMESPAN-10-_DISTRACTED_TIME)/2))/DT_DMON)].names[0], EventName.driverDistracted) + self.assertEqual(len(events[int((d_status.settings._DISTRACTED_TIME-d_status.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL)/2/DT_DMON)]), 0) + self.assertEqual(events[int((d_status.settings._DISTRACTED_TIME-d_status.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL + + ((d_status.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL-d_status.settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.preDriverDistracted) + self.assertEqual(events[int((d_status.settings._DISTRACTED_TIME-d_status.settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL + + ((d_status.settings._DISTRACTED_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.promptDriverDistracted) + self.assertEqual(events[int((d_status.settings._DISTRACTED_TIME + + ((TEST_TIMESPAN-10-d_status.settings._DISTRACTED_TIME)/2))/DT_DMON)].names[0], EventName.driverDistracted) self.assertIs(type(d_status.awareness), float) # engaged, no face detected the whole time, no action def test_fully_invisible_driver(self): - events = self._run_seq(always_no_face, always_false, always_true, always_false)[0] - self.assertTrue(len(events[int((_AWARENESS_TIME-_AWARENESS_PRE_TIME_TILL_TERMINAL)/2/DT_DMON)]) == 0) - self.assertEqual(events[int((_AWARENESS_TIME-_AWARENESS_PRE_TIME_TILL_TERMINAL + - ((_AWARENESS_PRE_TIME_TILL_TERMINAL-_AWARENESS_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.preDriverUnresponsive) - self.assertEqual(events[int((_AWARENESS_TIME-_AWARENESS_PROMPT_TIME_TILL_TERMINAL + - ((_AWARENESS_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.promptDriverUnresponsive) - self.assertEqual(events[int((_AWARENESS_TIME + - ((_TEST_TIMESPAN-10-_AWARENESS_TIME)/2))/DT_DMON)].names[0], EventName.driverUnresponsive) + events, d_status = self._run_seq(always_no_face, always_false, always_true, always_false) + self.assertTrue(len(events[int((d_status.settings._AWARENESS_TIME-d_status.settings._AWARENESS_PRE_TIME_TILL_TERMINAL)/2/DT_DMON)]) == 0) + self.assertEqual(events[int((d_status.settings._AWARENESS_TIME-d_status.settings._AWARENESS_PRE_TIME_TILL_TERMINAL + + ((d_status.settings._AWARENESS_PRE_TIME_TILL_TERMINAL-d_status.settings._AWARENESS_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.preDriverUnresponsive) + self.assertEqual(events[int((d_status.settings._AWARENESS_TIME-d_status.settings._AWARENESS_PROMPT_TIME_TILL_TERMINAL + + ((d_status.settings._AWARENESS_PROMPT_TIME_TILL_TERMINAL)/2))/DT_DMON)].names[0], EventName.promptDriverUnresponsive) + self.assertEqual(events[int((d_status.settings._AWARENESS_TIME + + ((TEST_TIMESPAN-10-d_status.settings._AWARENESS_TIME)/2))/DT_DMON)].names[0], EventName.driverUnresponsive) # engaged, down to orange, driver pays attention, back to normal; then down to orange, driver touches wheel # - should have short orange recovery time and no green afterwards; should recover rightaway on wheel touch def test_normal_driver(self): - ds_vector = [msg_DISTRACTED] * int(_DISTRACTED_SECONDS_TO_ORANGE/DT_DMON) + \ - [msg_ATTENTIVE] * int(_DISTRACTED_SECONDS_TO_ORANGE/DT_DMON) + \ - [msg_DISTRACTED] * (int(_TEST_TIMESPAN/DT_DMON)-int(_DISTRACTED_SECONDS_TO_ORANGE*2/DT_DMON)) - interaction_vector = [car_interaction_NOT_DETECTED] * int(_DISTRACTED_SECONDS_TO_ORANGE*3/DT_DMON) + \ - [car_interaction_DETECTED] * (int(_TEST_TIMESPAN/DT_DMON)-int(_DISTRACTED_SECONDS_TO_ORANGE*3/DT_DMON)) - events = self._run_seq(ds_vector, interaction_vector, always_true, always_false)[0] - self.assertEqual(len(events[int(_DISTRACTED_SECONDS_TO_ORANGE*0.5/DT_DMON)]), 0) - self.assertEqual(events[int((_DISTRACTED_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverDistracted) - self.assertEqual(len(events[int(_DISTRACTED_SECONDS_TO_ORANGE*1.5/DT_DMON)]), 0) - self.assertEqual(events[int((_DISTRACTED_SECONDS_TO_ORANGE*3-0.1)/DT_DMON)].names[0], EventName.promptDriverDistracted) - self.assertEqual(len(events[int((_DISTRACTED_SECONDS_TO_ORANGE*3+0.1)/DT_DMON)]), 0) + ds_vector = [msg_DISTRACTED] * int(DISTRACTED_SECONDS_TO_ORANGE/DT_DMON) + \ + [msg_ATTENTIVE] * int(DISTRACTED_SECONDS_TO_ORANGE/DT_DMON) + \ + [msg_DISTRACTED] * (int(TEST_TIMESPAN/DT_DMON)-int(DISTRACTED_SECONDS_TO_ORANGE*2/DT_DMON)) + interaction_vector = [car_interaction_NOT_DETECTED] * int(DISTRACTED_SECONDS_TO_ORANGE*3/DT_DMON) + \ + [car_interaction_DETECTED] * (int(TEST_TIMESPAN/DT_DMON)-int(DISTRACTED_SECONDS_TO_ORANGE*3/DT_DMON)) + events, _ = self._run_seq(ds_vector, interaction_vector, always_true, always_false) + self.assertEqual(len(events[int(DISTRACTED_SECONDS_TO_ORANGE*0.5/DT_DMON)]), 0) + self.assertEqual(events[int((DISTRACTED_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverDistracted) + self.assertEqual(len(events[int(DISTRACTED_SECONDS_TO_ORANGE*1.5/DT_DMON)]), 0) + self.assertEqual(events[int((DISTRACTED_SECONDS_TO_ORANGE*3-0.1)/DT_DMON)].names[0], EventName.promptDriverDistracted) + self.assertEqual(len(events[int((DISTRACTED_SECONDS_TO_ORANGE*3+0.1)/DT_DMON)]), 0) # engaged, down to orange, driver dodges camera, then comes back still distracted, down to red, \ # driver dodges, and then touches wheel to no avail, disengages and reengages @@ -122,15 +120,15 @@ class TestMonitoring(unittest.TestCase): ds_vector = always_distracted[:] interaction_vector = always_false[:] op_vector = always_true[:] - ds_vector[int(_DISTRACTED_SECONDS_TO_ORANGE/DT_DMON):int((_DISTRACTED_SECONDS_TO_ORANGE+_invisible_time)/DT_DMON)] = [msg_NO_FACE_DETECTED] * int(_invisible_time/DT_DMON) - ds_vector[int((_DISTRACTED_SECONDS_TO_RED+_invisible_time)/DT_DMON):int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time)/DT_DMON)] = [msg_NO_FACE_DETECTED] * int(_invisible_time/DT_DMON) - interaction_vector[int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+0.5)/DT_DMON):int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+1.5)/DT_DMON)] = [True] * int(1/DT_DMON) - op_vector[int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+2.5)/DT_DMON):int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+3)/DT_DMON)] = [False] * int(0.5/DT_DMON) - events = self._run_seq(ds_vector, interaction_vector, op_vector, always_false)[0] - self.assertEqual(events[int((_DISTRACTED_SECONDS_TO_ORANGE+0.5*_invisible_time)/DT_DMON)].names[0], EventName.promptDriverDistracted) - self.assertEqual(events[int((_DISTRACTED_SECONDS_TO_RED+1.5*_invisible_time)/DT_DMON)].names[0], EventName.driverDistracted) - self.assertEqual(events[int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+1.5)/DT_DMON)].names[0], EventName.driverDistracted) - self.assertTrue(len(events[int((_DISTRACTED_SECONDS_TO_RED+2*_invisible_time+3.5)/DT_DMON)]) == 0) + ds_vector[int(DISTRACTED_SECONDS_TO_ORANGE/DT_DMON):int((DISTRACTED_SECONDS_TO_ORANGE+_invisible_time)/DT_DMON)] = [msg_NO_FACE_DETECTED] * int(_invisible_time/DT_DMON) + ds_vector[int((DISTRACTED_SECONDS_TO_RED+_invisible_time)/DT_DMON):int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time)/DT_DMON)] = [msg_NO_FACE_DETECTED] * int(_invisible_time/DT_DMON) + interaction_vector[int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+0.5)/DT_DMON):int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+1.5)/DT_DMON)] = [True] * int(1/DT_DMON) + op_vector[int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+2.5)/DT_DMON):int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+3)/DT_DMON)] = [False] * int(0.5/DT_DMON) + events, _ = self._run_seq(ds_vector, interaction_vector, op_vector, always_false) + self.assertEqual(events[int((DISTRACTED_SECONDS_TO_ORANGE+0.5*_invisible_time)/DT_DMON)].names[0], EventName.promptDriverDistracted) + self.assertEqual(events[int((DISTRACTED_SECONDS_TO_RED+1.5*_invisible_time)/DT_DMON)].names[0], EventName.driverDistracted) + self.assertEqual(events[int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+1.5)/DT_DMON)].names[0], EventName.driverDistracted) + self.assertTrue(len(events[int((DISTRACTED_SECONDS_TO_RED+2*_invisible_time+3.5)/DT_DMON)]) == 0) # engaged, invisible driver, down to orange, driver touches wheel; then down to orange again, driver appears # - both actions should clear the alert, but momentary appearence should not @@ -138,18 +136,18 @@ class TestMonitoring(unittest.TestCase): _visible_time = np.random.choice([0.5, 10]) ds_vector = always_no_face[:]*2 interaction_vector = always_false[:]*2 - ds_vector[int((2*_INVISIBLE_SECONDS_TO_ORANGE+1)/DT_DMON):int((2*_INVISIBLE_SECONDS_TO_ORANGE+1+_visible_time)/DT_DMON)] = [msg_ATTENTIVE] * int(_visible_time/DT_DMON) - interaction_vector[int((_INVISIBLE_SECONDS_TO_ORANGE)/DT_DMON):int((_INVISIBLE_SECONDS_TO_ORANGE+1)/DT_DMON)] = [True] * int(1/DT_DMON) - events = self._run_seq(ds_vector, interaction_vector, 2*always_true, 2*always_false)[0] - self.assertTrue(len(events[int(_INVISIBLE_SECONDS_TO_ORANGE*0.5/DT_DMON)]) == 0) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) - self.assertTrue(len(events[int((_INVISIBLE_SECONDS_TO_ORANGE+0.1)/DT_DMON)]) == 0) + ds_vector[int((2*INVISIBLE_SECONDS_TO_ORANGE+1)/DT_DMON):int((2*INVISIBLE_SECONDS_TO_ORANGE+1+_visible_time)/DT_DMON)] = [msg_ATTENTIVE] * int(_visible_time/DT_DMON) + interaction_vector[int((INVISIBLE_SECONDS_TO_ORANGE)/DT_DMON):int((INVISIBLE_SECONDS_TO_ORANGE+1)/DT_DMON)] = [True] * int(1/DT_DMON) + events, _ = self._run_seq(ds_vector, interaction_vector, 2*always_true, 2*always_false) + self.assertTrue(len(events[int(INVISIBLE_SECONDS_TO_ORANGE*0.5/DT_DMON)]) == 0) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) + self.assertTrue(len(events[int((INVISIBLE_SECONDS_TO_ORANGE+0.1)/DT_DMON)]) == 0) if _visible_time == 0.5: - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_ORANGE*2+1-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_ORANGE*2+1+0.1+_visible_time)/DT_DMON)].names[0], EventName.preDriverUnresponsive) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_ORANGE*2+1-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_ORANGE*2+1+0.1+_visible_time)/DT_DMON)].names[0], EventName.preDriverUnresponsive) elif _visible_time == 10: - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_ORANGE*2+1-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) - self.assertTrue(len(events[int((_INVISIBLE_SECONDS_TO_ORANGE*2+1+0.1+_visible_time)/DT_DMON)]) == 0) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_ORANGE*2+1-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) + self.assertTrue(len(events[int((INVISIBLE_SECONDS_TO_ORANGE*2+1+0.1+_visible_time)/DT_DMON)]) == 0) # engaged, invisible driver, down to red, driver appears and then touches wheel, then disengages/reengages # - only disengage will clear the alert @@ -158,21 +156,21 @@ class TestMonitoring(unittest.TestCase): ds_vector = always_no_face[:] interaction_vector = always_false[:] op_vector = always_true[:] - ds_vector[int(_INVISIBLE_SECONDS_TO_RED/DT_DMON):int((_INVISIBLE_SECONDS_TO_RED+_visible_time)/DT_DMON)] = [msg_ATTENTIVE] * int(_visible_time/DT_DMON) - interaction_vector[int((_INVISIBLE_SECONDS_TO_RED+_visible_time)/DT_DMON):int((_INVISIBLE_SECONDS_TO_RED+_visible_time+1)/DT_DMON)] = [True] * int(1/DT_DMON) - op_vector[int((_INVISIBLE_SECONDS_TO_RED+_visible_time+1)/DT_DMON):int((_INVISIBLE_SECONDS_TO_RED+_visible_time+0.5)/DT_DMON)] = [False] * int(0.5/DT_DMON) - events = self._run_seq(ds_vector, interaction_vector, op_vector, always_false)[0] - self.assertTrue(len(events[int(_INVISIBLE_SECONDS_TO_ORANGE*0.5/DT_DMON)]) == 0) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_RED-0.1)/DT_DMON)].names[0], EventName.driverUnresponsive) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_RED+0.5*_visible_time)/DT_DMON)].names[0], EventName.driverUnresponsive) - self.assertEqual(events[int((_INVISIBLE_SECONDS_TO_RED+_visible_time+0.5)/DT_DMON)].names[0], EventName.driverUnresponsive) - self.assertTrue(len(events[int((_INVISIBLE_SECONDS_TO_RED+_visible_time+1+0.1)/DT_DMON)]) == 0) + ds_vector[int(INVISIBLE_SECONDS_TO_RED/DT_DMON):int((INVISIBLE_SECONDS_TO_RED+_visible_time)/DT_DMON)] = [msg_ATTENTIVE] * int(_visible_time/DT_DMON) + interaction_vector[int((INVISIBLE_SECONDS_TO_RED+_visible_time)/DT_DMON):int((INVISIBLE_SECONDS_TO_RED+_visible_time+1)/DT_DMON)] = [True] * int(1/DT_DMON) + op_vector[int((INVISIBLE_SECONDS_TO_RED+_visible_time+1)/DT_DMON):int((INVISIBLE_SECONDS_TO_RED+_visible_time+0.5)/DT_DMON)] = [False] * int(0.5/DT_DMON) + events, _ = self._run_seq(ds_vector, interaction_vector, op_vector, always_false) + self.assertTrue(len(events[int(INVISIBLE_SECONDS_TO_ORANGE*0.5/DT_DMON)]) == 0) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_ORANGE-0.1)/DT_DMON)].names[0], EventName.promptDriverUnresponsive) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_RED-0.1)/DT_DMON)].names[0], EventName.driverUnresponsive) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_RED+0.5*_visible_time)/DT_DMON)].names[0], EventName.driverUnresponsive) + self.assertEqual(events[int((INVISIBLE_SECONDS_TO_RED+_visible_time+0.5)/DT_DMON)].names[0], EventName.driverUnresponsive) + self.assertTrue(len(events[int((INVISIBLE_SECONDS_TO_RED+_visible_time+1+0.1)/DT_DMON)]) == 0) # disengaged, always distracted driver # - dm should stay quiet when not engaged def test_pure_dashcam_user(self): - events = self._run_seq(always_distracted, always_false, always_false, always_false)[0] + events, _ = self._run_seq(always_distracted, always_false, always_false, always_false) self.assertTrue(np.sum([len(event) for event in events]) == 0) # engaged, car stops at traffic light, down to orange, no action, then car starts moving @@ -180,21 +178,21 @@ class TestMonitoring(unittest.TestCase): def test_long_traffic_light_victim(self): _redlight_time = 60 # seconds standstill_vector = always_true[:] - standstill_vector[int(_redlight_time/DT_DMON):] = [False] * int((_TEST_TIMESPAN-_redlight_time)/DT_DMON) - events = self._run_seq(always_distracted, always_false, always_true, standstill_vector)[0] - self.assertEqual(events[int((_DISTRACTED_TIME-_DISTRACTED_PRE_TIME_TILL_TERMINAL+1)/DT_DMON)].names[0], EventName.preDriverDistracted) + standstill_vector[int(_redlight_time/DT_DMON):] = [False] * int((TEST_TIMESPAN-_redlight_time)/DT_DMON) + events, d_status = self._run_seq(always_distracted, always_false, always_true, standstill_vector) + self.assertEqual(events[int((d_status.settings._DISTRACTED_TIME-d_status.settings._DISTRACTED_PRE_TIME_TILL_TERMINAL+1)/DT_DMON)].names[0], EventName.preDriverDistracted) self.assertEqual(events[int((_redlight_time-0.1)/DT_DMON)].names[0], EventName.preDriverDistracted) self.assertEqual(events[int((_redlight_time+0.5)/DT_DMON)].names[0], EventName.promptDriverDistracted) # engaged, model is somehow uncertain and driver is distracted # - should fall back to wheel touch after uncertain alert def test_somehow_indecisive_model(self): - ds_vector = [msg_DISTRACTED_BUT_SOMEHOW_UNCERTAIN] * int(_TEST_TIMESPAN/DT_DMON) + ds_vector = [msg_DISTRACTED_BUT_SOMEHOW_UNCERTAIN] * int(TEST_TIMESPAN/DT_DMON) interaction_vector = always_false[:] - events = self._run_seq(ds_vector, interaction_vector, always_true, always_false)[0] - self.assertTrue(EventName.preDriverUnresponsive in events[int((_INVISIBLE_SECONDS_TO_ORANGE-1+_HI_STD_FALLBACK_TIME-0.1)/DT_DMON)].names) - self.assertTrue(EventName.promptDriverUnresponsive in events[int((_INVISIBLE_SECONDS_TO_ORANGE-1+_HI_STD_FALLBACK_TIME+0.1)/DT_DMON)].names) - self.assertTrue(EventName.driverUnresponsive in events[int((_INVISIBLE_SECONDS_TO_RED-1+_HI_STD_FALLBACK_TIME+0.1)/DT_DMON)].names) + events, d_status = self._run_seq(ds_vector, interaction_vector, always_true, always_false) + self.assertTrue(EventName.preDriverUnresponsive in events[int((INVISIBLE_SECONDS_TO_ORANGE-1+DT_DMON*d_status.settings._HI_STD_FALLBACK_TIME-0.1)/DT_DMON)].names) + self.assertTrue(EventName.promptDriverUnresponsive in events[int((INVISIBLE_SECONDS_TO_ORANGE-1+DT_DMON*d_status.settings._HI_STD_FALLBACK_TIME+0.1)/DT_DMON)].names) + self.assertTrue(EventName.driverUnresponsive in events[int((INVISIBLE_SECONDS_TO_RED-1+DT_DMON*d_status.settings._HI_STD_FALLBACK_TIME+0.1)/DT_DMON)].names) if __name__ == "__main__":