You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							53 lines
						
					
					
						
							2.0 KiB
						
					
					
				
			
		
		
	
	
							53 lines
						
					
					
						
							2.0 KiB
						
					
					
				import pytest
 | 
						|
import cereal.messaging as messaging
 | 
						|
from cereal import car
 | 
						|
from openpilot.common.params import Params
 | 
						|
from openpilot.system.manager.process_config import managed_processes
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.skip("tmp disabled")
 | 
						|
class TestFeedbackd:
 | 
						|
  def setup_method(self):
 | 
						|
    self.pm = messaging.PubMaster(['carState', 'rawAudioData'])
 | 
						|
    self.sm = messaging.SubMaster(['audioFeedback'])
 | 
						|
 | 
						|
  def _send_lkas_button(self, pressed: bool):
 | 
						|
    msg = messaging.new_message('carState')
 | 
						|
    msg.carState.canValid = True
 | 
						|
    msg.carState.buttonEvents = [{'type': car.CarState.ButtonEvent.Type.lkas, 'pressed': pressed}]
 | 
						|
    self.pm.send('carState', msg)
 | 
						|
 | 
						|
  def _send_audio_data(self, count: int = 5):
 | 
						|
    for _ in range(count):
 | 
						|
      audio_msg = messaging.new_message('rawAudioData')
 | 
						|
      audio_msg.rawAudioData.data = bytes(1600)  # 800 samples of int16
 | 
						|
      audio_msg.rawAudioData.sampleRate = 16000
 | 
						|
      self.pm.send('rawAudioData', audio_msg)
 | 
						|
      self.sm.update(timeout=100)
 | 
						|
 | 
						|
  @pytest.mark.parametrize("record_feedback", [False, True])
 | 
						|
  def test_audio_feedback(self, record_feedback):
 | 
						|
    Params().put_bool("RecordAudioFeedback", record_feedback)
 | 
						|
 | 
						|
    managed_processes["feedbackd"].start()
 | 
						|
    assert self.pm.wait_for_readers_to_update('carState', timeout=5)
 | 
						|
    assert self.pm.wait_for_readers_to_update('rawAudioData', timeout=5)
 | 
						|
 | 
						|
    self._send_lkas_button(pressed=True)
 | 
						|
    self._send_audio_data()
 | 
						|
    self._send_lkas_button(pressed=False)
 | 
						|
    self._send_audio_data()
 | 
						|
 | 
						|
    if record_feedback:
 | 
						|
      assert self.sm.updated['audioFeedback'], "audioFeedback should be published when enabled"
 | 
						|
    else:
 | 
						|
      assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published when disabled"
 | 
						|
 | 
						|
    self._send_lkas_button(pressed=True)
 | 
						|
    self._send_audio_data()
 | 
						|
    self._send_lkas_button(pressed=False)
 | 
						|
    self._send_audio_data()
 | 
						|
 | 
						|
    assert not self.sm.updated['audioFeedback'], "audioFeedback should not be published after second press"
 | 
						|
 | 
						|
    managed_processes["feedbackd"].stop()
 | 
						|
 |