|
|
|
@ -1,6 +1,7 @@ |
|
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
import numpy as np |
|
|
|
|
from functools import cache |
|
|
|
|
import threading |
|
|
|
|
|
|
|
|
|
from cereal import messaging |
|
|
|
|
from openpilot.common.realtime import Ratekeeper |
|
|
|
@ -52,12 +53,18 @@ class Mic: |
|
|
|
|
self.sound_pressure_weighted = 0 |
|
|
|
|
self.sound_pressure_level_weighted = 0 |
|
|
|
|
|
|
|
|
|
self.lock = threading.Lock() |
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
|
msg = messaging.new_message('microphone', valid=True) |
|
|
|
|
msg.microphone.soundPressure = float(self.sound_pressure) |
|
|
|
|
msg.microphone.soundPressureWeighted = float(self.sound_pressure_weighted) |
|
|
|
|
with self.lock: |
|
|
|
|
sound_pressure = self.sound_pressure |
|
|
|
|
sound_pressure_weighted = self.sound_pressure_weighted |
|
|
|
|
sound_pressure_level_weighted = self.sound_pressure_level_weighted |
|
|
|
|
|
|
|
|
|
msg.microphone.soundPressureWeightedDb = float(self.sound_pressure_level_weighted) |
|
|
|
|
msg = messaging.new_message('microphone', valid=True) |
|
|
|
|
msg.microphone.soundPressure = float(sound_pressure) |
|
|
|
|
msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) |
|
|
|
|
msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) |
|
|
|
|
|
|
|
|
|
self.pm.send('microphone', msg) |
|
|
|
|
self.rk.keep_time() |
|
|
|
@ -69,17 +76,17 @@ class Mic: |
|
|
|
|
|
|
|
|
|
Logged A-weighted equivalents are rough approximations of the human-perceived loudness. |
|
|
|
|
""" |
|
|
|
|
with self.lock: |
|
|
|
|
self.measurements = np.concatenate((self.measurements, indata[:, 0])) |
|
|
|
|
|
|
|
|
|
self.measurements = np.concatenate((self.measurements, indata[:, 0])) |
|
|
|
|
|
|
|
|
|
while self.measurements.size >= FFT_SAMPLES: |
|
|
|
|
measurements = self.measurements[:FFT_SAMPLES] |
|
|
|
|
while self.measurements.size >= FFT_SAMPLES: |
|
|
|
|
measurements = self.measurements[:FFT_SAMPLES] |
|
|
|
|
|
|
|
|
|
self.sound_pressure, _ = calculate_spl(measurements) |
|
|
|
|
measurements_weighted = apply_a_weighting(measurements) |
|
|
|
|
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
self.sound_pressure, _ = calculate_spl(measurements) |
|
|
|
|
measurements_weighted = apply_a_weighting(measurements) |
|
|
|
|
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
|
|
|
|
|
self.measurements = self.measurements[FFT_SAMPLES:] |
|
|
|
|
self.measurements = self.measurements[FFT_SAMPLES:] |
|
|
|
|
|
|
|
|
|
@retry(attempts=7, delay=3) |
|
|
|
|
def get_stream(self, sd): |
|
|
|
|