import math import time import numpy as np import wave from typing import Dict, Optional, Tuple from cereal import car, messaging from openpilot.common.basedir import BASEDIR from openpilot.common.realtime import Ratekeeper from openpilot.system.hardware import PC from openpilot.system.swaglog import cloudlog SAMPLE_RATE = 48000 MAX_VOLUME = 1.0 MIN_VOLUME = 0.1 CONTROLS_TIMEOUT = 5 # 5 seconds AMBIENT_DB = 30 # DB where MIN_VOLUME is applied DB_SCALE = 30 # AMBIENT_DB + DB_SCALE is where MAX_VOLUME is applied AudibleAlert = car.CarControl.HUDControl.AudibleAlert sound_list: Dict[int, Tuple[str, Optional[int], float]] = { # AudibleAlert, file name, play count (none for infinite) AudibleAlert.engage: ("engage.wav", 1, MAX_VOLUME), AudibleAlert.disengage: ("disengage.wav", 1, MAX_VOLUME), AudibleAlert.refuse: ("refuse.wav", 1, MAX_VOLUME), AudibleAlert.prompt: ("prompt.wav", 1, MAX_VOLUME), AudibleAlert.promptRepeat: ("prompt.wav", None, MAX_VOLUME), AudibleAlert.promptDistracted: ("prompt_distracted.wav", None, MAX_VOLUME), AudibleAlert.warningSoft: ("warning_soft.wav", None, MAX_VOLUME), AudibleAlert.warningImmediate: ("warning_immediate.wav", None, MAX_VOLUME), } def check_controls_timeout_alert(sm): controls_missing = time.monotonic() - sm.rcv_time['controlsState'] if controls_missing > CONTROLS_TIMEOUT: if sm['controlsState'].enabled and (controls_missing - CONTROLS_TIMEOUT) < 10: return True return False class Soundd: def __init__(self): self.load_sounds() self.current_alert = AudibleAlert.none self.current_volume = MIN_VOLUME self.current_sound_frame = 0 self.controls_timeout_alert = False def load_sounds(self): self.loaded_sounds: Dict[int, np.ndarray] = {} # Load all sounds for sound in sound_list: filename, play_count, volume = sound_list[sound] wavefile = wave.open(BASEDIR + "/selfdrive/assets/sounds/" + filename, 'r') assert wavefile.getnchannels() == 1 assert wavefile.getsampwidth() == 2 assert wavefile.getframerate() == SAMPLE_RATE length = wavefile.getnframes() self.loaded_sounds[sound] = np.frombuffer(wavefile.readframes(length), dtype=np.int16).astype(np.float32) / (2**16/2) def get_sound_data(self, frames): # get "frames" worth of data from the current alert sound, looping when required ret = np.zeros(frames, dtype=np.float32) if self.current_alert != AudibleAlert.none: num_loops = sound_list[self.current_alert][1] sound_data = self.loaded_sounds[self.current_alert] written_frames = 0 current_sound_frame = self.current_sound_frame % len(sound_data) loops = self.current_sound_frame // len(sound_data) while written_frames < frames and (num_loops is None or loops < num_loops): available_frames = sound_data.shape[0] - current_sound_frame frames_to_write = min(available_frames, frames - written_frames) ret[written_frames:written_frames+frames_to_write] = sound_data[current_sound_frame:current_sound_frame+frames_to_write] written_frames += frames_to_write self.current_sound_frame += frames_to_write return ret * self.current_volume def callback(self, data_out: np.ndarray, frames: int, time, status) -> None: if status: cloudlog.warning(f"soundd stream over/underflow: {status}") data_out[:frames, 0] = self.get_sound_data(frames) def update_alert(self, new_alert): current_alert_played_once = self.current_alert == AudibleAlert.none or self.current_sound_frame > len(self.loaded_sounds[self.current_alert]) if self.current_alert != new_alert and (new_alert != AudibleAlert.none or current_alert_played_once): self.current_alert = new_alert self.current_sound_frame = 0 def get_audible_alert(self, sm): if sm.updated['controlsState']: new_alert = sm['controlsState'].alertSound.raw self.update_alert(new_alert) elif check_controls_timeout_alert(sm): self.update_alert(AudibleAlert.warningImmediate) self.controls_timeout_alert = True elif self.controls_timeout_alert: self.update_alert(AudibleAlert.none) self.controls_timeout_alert = False def calculate_volume(self, weighted_db): volume = ((weighted_db - AMBIENT_DB) / DB_SCALE) * (MAX_VOLUME - MIN_VOLUME) + MIN_VOLUME return math.pow(10, (np.clip(volume, MIN_VOLUME, MAX_VOLUME) - 1)) def soundd_thread(self): # sounddevice must be imported after forking processes import sounddevice as sd rk = Ratekeeper(20) sm = messaging.SubMaster(['controlsState', 'microphone']) if PC: device = None else: device = "sdm845-tavil-snd-card: - (hw:0,0)" with sd.OutputStream(device=device, channels=1, samplerate=SAMPLE_RATE, callback=self.callback) as stream: cloudlog.info(f"soundd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}") while True: sm.update(0) if sm.updated['microphone']: self.current_volume = self.calculate_volume(sm["microphone"].filteredSoundPressureWeightedDb) self.get_audible_alert(sm) rk.keep_time() assert stream.active def main(): s = Soundd() s.soundd_thread() if __name__ == "__main__": main()