You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							53 lines
						
					
					
						
							2.0 KiB
						
					
					
				
			
		
		
	
	
							53 lines
						
					
					
						
							2.0 KiB
						
					
					
				| import pytest
 | |
| import cereal.messaging as messaging
 | |
| from cereal import car
 | |
| from openpilot.common.params import Params
 | |
| from openpilot.system.manager.process_config import managed_processes
 | |
| 
 | |
| 
 | |
| @pytest.mark.skip("tmp disabled")
 | |
| class TestFeedbackd:
 | |
|   def setup_method(self):
 | |
|     self.pm = messaging.PubMaster(['carState', 'rawAudioData'])
 | |
|     self.sm = messaging.SubMaster(['audioFeedback'])
 | |
| 
 | |
|   def _send_lkas_button(self, pressed: bool):
 | |
|     msg = messaging.new_message('carState')
 | |
|     msg.carState.canValid = True
 | |
|     msg.carState.buttonEvents = [{'type': car.CarState.ButtonEvent.Type.lkas, 'pressed': pressed}]
 | |
|     self.pm.send('carState', msg)
 | |
| 
 | |
|   def _send_audio_data(self, count: int = 5):
 | |
|     for _ in range(count):
 | |
|       audio_msg = messaging.new_message('rawAudioData')
 | |
|       audio_msg.rawAudioData.data = bytes(1600)  # 800 samples of int16
 | |
|       audio_msg.rawAudioData.sampleRate = 16000
 | |
|       self.pm.send('rawAudioData', audio_msg)
 | |
|       self.sm.update(timeout=100)
 | |
| 
 | |
|   @pytest.mark.parametrize("record_feedback", [False, True])
 | |
|   def test_audio_feedback(self, record_feedback):
 | |
|     Params().put_bool("RecordAudioFeedback", record_feedback)
 | |
| 
 | |
|     managed_processes["feedbackd"].start()
 | |
|     assert self.pm.wait_for_readers_to_update('carState', timeout=5)
 | |
|     assert self.pm.wait_for_readers_to_update('rawAudioData', timeout=5)
 | |
| 
 | |
|     self._send_lkas_button(pressed=True)
 | |
|     self._send_audio_data()
 | |
|     self._send_lkas_button(pressed=False)
 | |
|     self._send_audio_data()
 | |
| 
 | |
|     if record_feedback:
 | |
|       assert self.sm.updated['audioFeedback'], "audioFeedback should be published when enabled"
 | |
|     else:
 | |
|       assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published when disabled"
 | |
| 
 | |
|     self._send_lkas_button(pressed=True)
 | |
|     self._send_audio_data()
 | |
|     self._send_lkas_button(pressed=False)
 | |
|     self._send_audio_data()
 | |
| 
 | |
|     assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published after second press"
 | |
| 
 | |
|     managed_processes["feedbackd"].stop()
 | |
| 
 |