|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from cereal import messaging
 | 
					
						
							|  |  |  | from openpilot.common.realtime import Ratekeeper
 | 
					
						
							|  |  |  | from openpilot.common.retry import retry
 | 
					
						
							|  |  |  | from openpilot.common.swaglog import cloudlog
 | 
					
						
							|  |  |  | from openpilot.system.hardware import TICI
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | RATE = 10
 | 
					
						
							|  |  |  | FFT_SAMPLES = 4096
 | 
					
						
							|  |  |  | REFERENCE_SPL = 2e-5  # newtons/m^2
 | 
					
						
							|  |  |  | SAMPLE_RATE = 44100
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @retry(attempts=7, delay=3)
 | 
					
						
							|  |  |  | def wait_for_devices(sd):
 | 
					
						
							|  |  |  |   # reload sounddevice to reinitialize portaudio
 | 
					
						
							|  |  |  |   sd._terminate()
 | 
					
						
							|  |  |  |   sd._initialize()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   devices = sd.query_devices()
 | 
					
						
							|  |  |  |   cloudlog.info(f"sounddevice available devices: {list(devices)}")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   assert len(devices) > 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def calculate_spl(measurements):
 | 
					
						
							|  |  |  |   # https://www.engineeringtoolbox.com/sound-pressure-d_711.html
 | 
					
						
							|  |  |  |   sound_pressure = np.sqrt(np.mean(measurements ** 2))  # RMS of amplitudes
 | 
					
						
							|  |  |  |   if sound_pressure > 0:
 | 
					
						
							|  |  |  |     sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL)  # dB
 | 
					
						
							|  |  |  |   else:
 | 
					
						
							|  |  |  |     sound_pressure_level = 0
 | 
					
						
							|  |  |  |   return sound_pressure, sound_pressure_level
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def apply_a_weighting(measurements: np.ndarray) -> np.ndarray:
 | 
					
						
							|  |  |  |   # Generate a Hanning window of the same length as the audio measurements
 | 
					
						
							|  |  |  |   measurements_windowed = measurements * np.hanning(len(measurements))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # Calculate the frequency axis for the signal
 | 
					
						
							|  |  |  |   freqs = np.fft.fftfreq(measurements_windowed.size, d=1 / SAMPLE_RATE)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # Calculate the A-weighting filter
 | 
					
						
							|  |  |  |   # https://en.wikipedia.org/wiki/A-weighting
 | 
					
						
							|  |  |  |   A = 12194 ** 2 * freqs ** 4 / ((freqs ** 2 + 20.6 ** 2) * (freqs ** 2 + 12194 ** 2) * np.sqrt((freqs ** 2 + 107.7 ** 2) * (freqs ** 2 + 737.9 ** 2)))
 | 
					
						
							|  |  |  |   A /= np.max(A)  # Normalize the filter
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # Apply the A-weighting filter to the signal
 | 
					
						
							|  |  |  |   return np.abs(np.fft.ifft(np.fft.fft(measurements_windowed) * A))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Mic:
 | 
					
						
							|  |  |  |   def __init__(self):
 | 
					
						
							|  |  |  |     self.rk = Ratekeeper(RATE)
 | 
					
						
							|  |  |  |     self.pm = messaging.PubMaster(['microphone'])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.measurements = np.empty(0)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.sound_pressure = 0
 | 
					
						
							|  |  |  |     self.sound_pressure_weighted = 0
 | 
					
						
							|  |  |  |     self.sound_pressure_level_weighted = 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update(self):
 | 
					
						
							|  |  |  |     msg = messaging.new_message('microphone', valid=True)
 | 
					
						
							|  |  |  |     msg.microphone.soundPressure = float(self.sound_pressure)
 | 
					
						
							|  |  |  |     msg.microphone.soundPressureWeighted = float(self.sound_pressure_weighted)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     msg.microphone.soundPressureWeightedDb = float(self.sound_pressure_level_weighted)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.pm.send('microphone', msg)
 | 
					
						
							|  |  |  |     self.rk.keep_time()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def callback(self, indata, frames, time, status):
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level.
 | 
					
						
							|  |  |  |     Then apply A-weighting to the raw amplitudes and run the same calculations again.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Logged A-weighted equivalents are rough approximations of the human-perceived loudness.
 | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.measurements = np.concatenate((self.measurements, indata[:, 0]))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     while self.measurements.size >= FFT_SAMPLES:
 | 
					
						
							|  |  |  |       measurements = self.measurements[:FFT_SAMPLES]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       self.sound_pressure, _ = calculate_spl(measurements)
 | 
					
						
							|  |  |  |       measurements_weighted = apply_a_weighting(measurements)
 | 
					
						
							|  |  |  |       self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       self.measurements = self.measurements[FFT_SAMPLES:]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def micd_thread(self):
 | 
					
						
							|  |  |  |     # sounddevice must be imported after forking processes
 | 
					
						
							|  |  |  |     import sounddevice as sd
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if TICI:
 | 
					
						
							|  |  |  |       wait_for_devices(sd) # wait for alsa to be initialized on device
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     with sd.InputStream(channels=1, samplerate=SAMPLE_RATE, callback=self.callback)  as stream:
 | 
					
						
							|  |  |  |       cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}")
 | 
					
						
							|  |  |  |       while True:
 | 
					
						
							|  |  |  |         self.update()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main():
 | 
					
						
							|  |  |  |   mic = Mic()
 | 
					
						
							|  |  |  |   mic.micd_thread()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   main()
 |