@ -17,6 +17,7 @@ SELECT_COMPARE_FIELDS = {
' sensors_flag ' : [ ' sensorsOK ' ] ,
}
JUNK_IDX = 100
CONSISTENT_SPIKES_COUNT = 10
class Scenario ( Enum ) :
@ -25,6 +26,8 @@ class Scenario(Enum):
GYRO_SPIKE_MIDWAY = ' gyro_spike_midway '
ACCEL_OFF = ' accel_off '
ACCEL_SPIKE_MIDWAY = ' accel_spike_midway '
SENSOR_TIMING_SPIKE_MIDWAY = ' timing_spikes '
SENSOR_TIMING_CONSISTENT_SPIKES = ' timing_consistent_spikes '
def get_select_fields_data ( logs ) :
@ -43,6 +46,17 @@ def get_select_fields_data(logs):
return data
def modify_logs_midway ( logs , which , count , fn ) :
non_which = [ x for x in logs if x . which ( ) != which ]
which = [ x for x in logs if x . which ( ) == which ]
temps = which [ len ( which ) / / 2 : len ( which ) / / 2 + count ]
for i , temp in enumerate ( temps ) :
temp = temp . as_builder ( )
fn ( temp )
which [ len ( which ) / / 2 + i ] = temp . as_reader ( )
return sorted ( non_which + which , key = lambda x : x . logMonoTime )
def run_scenarios ( scenario , logs ) :
if scenario == Scenario . BASE :
pass
@ -51,23 +65,23 @@ def run_scenarios(scenario, logs):
logs = sorted ( [ x for x in logs if x . which ( ) != ' gyroscope ' ] , key = lambda x : x . logMonoTime )
elif scenario == Scenario . GYRO_SPIKE_MIDWAY :
non_gyro = [ x for x in logs if x . which ( ) not in ' gyroscope ' ]
gyro = [ x for x in logs if x . which ( ) in ' gyroscope ' ]
temp = gyro [ len ( gyro ) / / 2 ] . as_builder ( )
temp . gyroscope . gyroUncalibrated . v [ 0 ] + = 3.0
gyro [ len ( gyro ) / / 2 ] = temp . as_reader ( )
logs = sorted ( non_gyro + gyro , key = lambda x : x . logMonoTime )
def gyro_spike ( msg ) :
msg . gyroscope . gyroUncalibrated . v [ 0 ] + = 3.0
logs = modify_logs_midway ( logs , ' gyroscope ' , 1 , gyro_spike )
elif scenario == Scenario . ACCEL_OFF :
logs = sorted ( [ x for x in logs if x . which ( ) != ' accelerometer ' ] , key = lambda x : x . logMonoTime )
elif scenario == Scenario . ACCEL_SPIKE_MIDWAY :
non_accel = [ x for x in logs if x . which ( ) not in ' accelerometer ' ]
accel = [ x for x in logs if x . which ( ) in ' accelerometer ' ]
temp = accel [ len ( accel ) / / 2 ] . as_builder ( )
temp . accelerometer . acceleration . v [ 0 ] + = 10.0
accel [ len ( accel ) / / 2 ] = temp . as_reader ( )
logs = sorted ( non_accel + accel , key = lambda x : x . logMonoTime )
def acc_spike ( msg ) :
msg . accelerometer . acceleration . v [ 0 ] + = 10.0
logs = modify_logs_midway ( logs , ' accelerometer ' , 1 , acc_spike )
elif scenario == Scenario . SENSOR_TIMING_SPIKE_MIDWAY or scenario == Scenario . SENSOR_TIMING_CONSISTENT_SPIKES :
def timing_spike ( msg ) :
msg . accelerometer . timestamp - = int ( 0.150 * 1e9 )
count = 1 if scenario == Scenario . SENSOR_TIMING_SPIKE_MIDWAY else CONSISTENT_SPIKES_COUNT
logs = modify_logs_midway ( logs , ' accelerometer ' , count , timing_spike )
replayed_logs = replay_process_with_name ( name = ' locationd ' , lr = logs )
return get_select_fields_data ( logs ) , get_select_fields_data ( replayed_logs )
@ -122,7 +136,7 @@ class TestLocationdScenarios:
assert np . allclose ( orig_data [ ' yaw_rate ' ] , replayed_data [ ' yaw_rate ' ] , atol = np . radians ( 0.35 ) )
assert np . allclose ( orig_data [ ' roll ' ] , replayed_data [ ' roll ' ] , atol = np . radians ( 0.55 ) )
assert np . diff ( replayed_data [ ' inputs_flag ' ] ) [ 499 ] == - 1.0
assert np . diff ( replayed_data [ ' inputs_flag ' ] ) [ 696 ] == 1.0
assert np . diff ( replayed_data [ ' inputs_flag ' ] ) [ 704 ] == 1.0
def test_accel_off ( self ) :
"""
@ -146,3 +160,21 @@ class TestLocationdScenarios:
orig_data , replayed_data = run_scenarios ( Scenario . ACCEL_SPIKE_MIDWAY , self . logs )
assert np . allclose ( orig_data [ ' yaw_rate ' ] , replayed_data [ ' yaw_rate ' ] , atol = np . radians ( 0.35 ) )
assert np . allclose ( orig_data [ ' roll ' ] , replayed_data [ ' roll ' ] , atol = np . radians ( 0.55 ) )
def test_single_timing_spike ( self ) :
"""
Test : timing of 150 ms off for the single accelerometer message in the middle of the segment
Expected Result : the message is ignored , and inputsOK is False for that time
"""
orig_data , replayed_data = run_scenarios ( Scenario . SENSOR_TIMING_SPIKE_MIDWAY , self . logs )
assert np . all ( replayed_data [ ' inputs_flag ' ] == orig_data [ ' inputs_flag ' ] )
assert np . all ( replayed_data [ ' sensors_flag ' ] == orig_data [ ' sensors_flag ' ] )
def test_consistent_timing_spikes ( self ) :
"""
Test : consistent timing spikes for N accelerometer messages in the middle of the segment
Expected Result : inputsOK becomes False after N of bad measurements
"""
orig_data , replayed_data = run_scenarios ( Scenario . SENSOR_TIMING_CONSISTENT_SPIKES , self . logs )
assert np . diff ( replayed_data [ ' inputs_flag ' ] ) [ 500 ] == - 1.0
assert np . diff ( replayed_data [ ' inputs_flag ' ] ) [ 787 ] == 1.0