|
|
@ -8,9 +8,10 @@ from common.realtime import Ratekeeper |
|
|
|
from system.swaglog import cloudlog |
|
|
|
from system.swaglog import cloudlog |
|
|
|
|
|
|
|
|
|
|
|
RATE = 10 |
|
|
|
RATE = 10 |
|
|
|
DT_MIC = 1. / RATE |
|
|
|
FFT_SAMPLES = 4096 |
|
|
|
REFERENCE_SPL = 2e-5 # newtons/m^2 |
|
|
|
REFERENCE_SPL = 2e-5 # newtons/m^2 |
|
|
|
SAMPLE_RATE = 44100 |
|
|
|
SAMPLE_RATE = 44100 |
|
|
|
|
|
|
|
FILTER_DT = 1. / (SAMPLE_RATE / FFT_SAMPLES) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_spl(measurements): |
|
|
|
def calculate_spl(measurements): |
|
|
@ -45,41 +46,44 @@ class Mic: |
|
|
|
self.rk = Ratekeeper(RATE) |
|
|
|
self.rk = Ratekeeper(RATE) |
|
|
|
|
|
|
|
|
|
|
|
self.measurements = np.empty(0) |
|
|
|
self.measurements = np.empty(0) |
|
|
|
self.spl_filter_weighted = FirstOrderFilter(0, 2.5, DT_MIC, initialized=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
self.sound_pressure = 0 |
|
|
|
""" |
|
|
|
self.sound_pressure_weighted = 0 |
|
|
|
Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. |
|
|
|
self.sound_pressure_level_weighted = 0 |
|
|
|
Then apply A-weighting to the raw amplitudes and run the same calculations again. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Logged A-weighted equivalents are rough approximations of the human-perceived loudness. |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(self.measurements) > 0: |
|
|
|
self.spl_filter_weighted = FirstOrderFilter(0, 2.5, FILTER_DT, initialized=False) |
|
|
|
sound_pressure, _ = calculate_spl(self.measurements) |
|
|
|
|
|
|
|
measurements_weighted = apply_a_weighting(self.measurements) |
|
|
|
|
|
|
|
sound_pressure_weighted, sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
|
|
|
self.spl_filter_weighted.update(sound_pressure_level_weighted) |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
sound_pressure = 0 |
|
|
|
|
|
|
|
sound_pressure_weighted = 0 |
|
|
|
|
|
|
|
sound_pressure_level_weighted = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.measurements = np.empty(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
msg = messaging.new_message('microphone') |
|
|
|
msg = messaging.new_message('microphone') |
|
|
|
msg.microphone.soundPressure = float(sound_pressure) |
|
|
|
msg.microphone.soundPressure = float(self.sound_pressure) |
|
|
|
msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) |
|
|
|
msg.microphone.soundPressureWeighted = float(self.sound_pressure_weighted) |
|
|
|
|
|
|
|
|
|
|
|
msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) |
|
|
|
msg.microphone.soundPressureWeightedDb = float(self.sound_pressure_level_weighted) |
|
|
|
msg.microphone.filteredSoundPressureWeightedDb = float(self.spl_filter_weighted.x) |
|
|
|
msg.microphone.filteredSoundPressureWeightedDb = float(self.spl_filter_weighted.x) |
|
|
|
|
|
|
|
|
|
|
|
self.pm.send('microphone', msg) |
|
|
|
self.pm.send('microphone', msg) |
|
|
|
self.rk.keep_time() |
|
|
|
self.rk.keep_time() |
|
|
|
|
|
|
|
|
|
|
|
def callback(self, indata, frames, time, status): |
|
|
|
def callback(self, indata, frames, time, status): |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. |
|
|
|
|
|
|
|
Then apply A-weighting to the raw amplitudes and run the same calculations again. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Logged A-weighted equivalents are rough approximations of the human-perceived loudness. |
|
|
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
self.measurements = np.concatenate((self.measurements, indata[:, 0])) |
|
|
|
self.measurements = np.concatenate((self.measurements, indata[:, 0])) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while self.measurements.size >= FFT_SAMPLES: |
|
|
|
|
|
|
|
measurements = self.measurements[:FFT_SAMPLES] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.sound_pressure, _ = calculate_spl(measurements) |
|
|
|
|
|
|
|
measurements_weighted = apply_a_weighting(measurements) |
|
|
|
|
|
|
|
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
|
|
|
self.spl_filter_weighted.update(self.sound_pressure_level_weighted) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.measurements = self.measurements[FFT_SAMPLES:] |
|
|
|
|
|
|
|
|
|
|
|
def micd_thread(self, device=None): |
|
|
|
def micd_thread(self, device=None): |
|
|
|
if device is None: |
|
|
|
if device is None: |
|
|
|
device = "sysdefault" |
|
|
|
device = "sysdefault" |
|
|
|