#!/usr/bin/env python3 import numpy as np from functools import cache import threading from cereal import messaging from openpilot.common.realtime import Ratekeeper from openpilot.common.retry import retry from openpilot.common.swaglog import cloudlog from openpilot.common.params import Params RATE = 10 FFT_SAMPLES = 4096 REFERENCE_SPL = 2e-5 # newtons/m^2 SAMPLE_RATE = 16000 SAMPLE_BUFFER = 800 # 50ms @cache def get_a_weighting_filter(): # Calculate the A-weighting filter # https://en.wikipedia.org/wiki/A-weighting freqs = np.fft.fftfreq(FFT_SAMPLES, d=1 / SAMPLE_RATE) A = 12194 ** 2 * freqs ** 4 / ((freqs ** 2 + 20.6 ** 2) * (freqs ** 2 + 12194 ** 2) * np.sqrt((freqs ** 2 + 107.7 ** 2) * (freqs ** 2 + 737.9 ** 2))) return A / np.max(A) def calculate_spl(measurements): # https://www.engineeringtoolbox.com/sound-pressure-d_711.html sound_pressure = np.sqrt(np.mean(measurements ** 2)) # RMS of amplitudes if sound_pressure > 0: sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL) # dB else: sound_pressure_level = 0 return sound_pressure, sound_pressure_level def apply_a_weighting(measurements: np.ndarray) -> np.ndarray: # Generate a Hanning window of the same length as the audio measurements measurements_windowed = measurements * np.hanning(len(measurements)) # Apply the A-weighting filter to the signal return np.abs(np.fft.ifft(np.fft.fft(measurements_windowed) * get_a_weighting_filter())) class Mic: def __init__(self): self.rk = Ratekeeper(RATE) self.pm = messaging.PubMaster(['microphone', 'audioData', 'audioDataNoLog']) self.measurements = np.empty(0) self.sound_pressure = 0 self.sound_pressure_weighted = 0 self.sound_pressure_level_weighted = 0 self.lock = threading.Lock() def update(self): with self.lock: sound_pressure = self.sound_pressure sound_pressure_weighted = self.sound_pressure_weighted sound_pressure_level_weighted = self.sound_pressure_level_weighted msg = messaging.new_message('microphone', valid=True) msg.microphone.soundPressure = float(sound_pressure) msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) self.pm.send('microphone', msg) self.rk.keep_time() def callback(self, indata, frames, time, status): """ Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. Then apply A-weighting to the raw amplitudes and run the same calculations again. Logged A-weighted equivalents are rough approximations of the human-perceived loudness. """ with self.lock: self.measurements = np.concatenate((self.measurements, indata[:, 0])) while self.measurements.size >= FFT_SAMPLES: measurements = self.measurements[:FFT_SAMPLES] self.sound_pressure, _ = calculate_spl(measurements) measurements_weighted = apply_a_weighting(measurements) self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) self.measurements = self.measurements[FFT_SAMPLES:] audio_data_service = 'audioData' if Params().get_bool("RecordAudio") else 'audioDataNoLog' msg = messaging.new_message(audio_data_service, valid=True) audio_field = getattr(msg, audio_data_service) audio_field.sampleRate = SAMPLE_RATE audio_data_int_16 = (indata[:, 0] * 32767).astype(np.int16) audio_field.data = audio_data_int_16.tobytes() self.pm.send(audio_data_service, msg) @retry(attempts=7, delay=3) def get_stream(self, sd): # reload sounddevice to reinitialize portaudio sd._terminate() sd._initialize() return sd.InputStream(channels=1, samplerate=SAMPLE_RATE, callback=self.callback, blocksize=SAMPLE_BUFFER) def micd_thread(self): # sounddevice must be imported after forking processes import sounddevice as sd with self.get_stream(sd) as stream: cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}, {stream.blocksize=}") while True: self.update() def main(): mic = Mic() mic.micd_thread() if __name__ == "__main__": main()