diff --git a/system/micd.py b/system/micd.py index d0c661ee0d..a56140e3b9 100755 --- a/system/micd.py +++ b/system/micd.py @@ -8,9 +8,10 @@ from common.realtime import Ratekeeper from system.swaglog import cloudlog RATE = 10 -DT_MIC = 1. / RATE +FFT_SAMPLES = 4096 REFERENCE_SPL = 2e-5 # newtons/m^2 SAMPLE_RATE = 44100 +FILTER_DT = 1. / (SAMPLE_RATE / FFT_SAMPLES) def calculate_spl(measurements): @@ -45,41 +46,44 @@ class Mic: self.rk = Ratekeeper(RATE) self.measurements = np.empty(0) - self.spl_filter_weighted = FirstOrderFilter(0, 2.5, DT_MIC, initialized=False) - def update(self): - """ - Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. - Then apply A-weighting to the raw amplitudes and run the same calculations again. - - Logged A-weighted equivalents are rough approximations of the human-perceived loudness. - """ + self.sound_pressure = 0 + self.sound_pressure_weighted = 0 + self.sound_pressure_level_weighted = 0 - if len(self.measurements) > 0: - sound_pressure, _ = calculate_spl(self.measurements) - measurements_weighted = apply_a_weighting(self.measurements) - sound_pressure_weighted, sound_pressure_level_weighted = calculate_spl(measurements_weighted) - self.spl_filter_weighted.update(sound_pressure_level_weighted) - else: - sound_pressure = 0 - sound_pressure_weighted = 0 - sound_pressure_level_weighted = 0 - - self.measurements = np.empty(0) + self.spl_filter_weighted = FirstOrderFilter(0, 2.5, FILTER_DT, initialized=False) + def update(self): msg = messaging.new_message('microphone') - msg.microphone.soundPressure = float(sound_pressure) - msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) + msg.microphone.soundPressure = float(self.sound_pressure) + msg.microphone.soundPressureWeighted = float(self.sound_pressure_weighted) - msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) + msg.microphone.soundPressureWeightedDb = float(self.sound_pressure_level_weighted) msg.microphone.filteredSoundPressureWeightedDb = float(self.spl_filter_weighted.x) self.pm.send('microphone', msg) self.rk.keep_time() def callback(self, indata, frames, time, status): + """ + Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. + Then apply A-weighting to the raw amplitudes and run the same calculations again. + + Logged A-weighted equivalents are rough approximations of the human-perceived loudness. + """ + self.measurements = np.concatenate((self.measurements, indata[:, 0])) + while self.measurements.size >= FFT_SAMPLES: + measurements = self.measurements[:FFT_SAMPLES] + + self.sound_pressure, _ = calculate_spl(measurements) + measurements_weighted = apply_a_weighting(measurements) + self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) + self.spl_filter_weighted.update(self.sound_pressure_level_weighted) + + self.measurements = self.measurements[FFT_SAMPLES:] + def micd_thread(self, device=None): if device is None: device = "sysdefault"