|
|
|
@ -10,6 +10,7 @@ from system.swaglog import cloudlog |
|
|
|
|
|
|
|
|
|
RATE = 10 |
|
|
|
|
DT_MIC = 1. / RATE |
|
|
|
|
FFT_SAMPLES = 4096 |
|
|
|
|
REFERENCE_SPL = 2e-5 # newtons/m^2 |
|
|
|
|
SAMPLE_RATE = 44100 |
|
|
|
|
|
|
|
|
@ -46,6 +47,11 @@ class Mic: |
|
|
|
|
self.rk = Ratekeeper(RATE) |
|
|
|
|
|
|
|
|
|
self.measurements = np.empty(0) |
|
|
|
|
|
|
|
|
|
self.sound_pressure = 0 |
|
|
|
|
self.sound_pressure_weighted = 0 |
|
|
|
|
self.sound_pressure_level_weighted = 0 |
|
|
|
|
|
|
|
|
|
self.spl_filter_weighted = FirstOrderFilter(0, 2.5, DT_MIC, initialized=False) |
|
|
|
|
|
|
|
|
|
def update(self): |
|
|
|
@ -56,24 +62,25 @@ class Mic: |
|
|
|
|
Logged A-weighted equivalents are rough approximations of the human-perceived loudness. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
if len(self.measurements) > 0: |
|
|
|
|
sound_pressure, _ = calculate_spl(self.measurements) |
|
|
|
|
measurements_weighted = apply_a_weighting(self.measurements) |
|
|
|
|
sound_pressure_weighted, sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
if not HARDWARE.is_sound_playing(): |
|
|
|
|
self.spl_filter_weighted.update(sound_pressure_level_weighted) |
|
|
|
|
else: |
|
|
|
|
sound_pressure = 0 |
|
|
|
|
sound_pressure_weighted = 0 |
|
|
|
|
sound_pressure_level_weighted = 0 |
|
|
|
|
# if len(self.measurements) > 0: |
|
|
|
|
# sound_pressure, _ = calculate_spl(self.measurements) |
|
|
|
|
# measurements_weighted = apply_a_weighting(self.measurements) |
|
|
|
|
# sound_pressure_weighted, sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
print(self.measurements.size, 'update()') |
|
|
|
|
if not HARDWARE.is_sound_playing(): |
|
|
|
|
self.spl_filter_weighted.update(self.sound_pressure_level_weighted) |
|
|
|
|
# else: |
|
|
|
|
# sound_pressure = 0 |
|
|
|
|
# sound_pressure_weighted = 0 |
|
|
|
|
# sound_pressure_level_weighted = 0 |
|
|
|
|
|
|
|
|
|
self.measurements = np.empty(0) |
|
|
|
|
|
|
|
|
|
msg = messaging.new_message('microphone') |
|
|
|
|
msg.microphone.soundPressure = float(sound_pressure) |
|
|
|
|
msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) |
|
|
|
|
msg.microphone.soundPressure = float(self.sound_pressure) |
|
|
|
|
msg.microphone.soundPressureWeighted = float(self.sound_pressure_weighted) |
|
|
|
|
|
|
|
|
|
msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) |
|
|
|
|
msg.microphone.soundPressureWeightedDb = float(self.sound_pressure_level_weighted) |
|
|
|
|
msg.microphone.filteredSoundPressureWeightedDb = float(self.spl_filter_weighted.x) |
|
|
|
|
|
|
|
|
|
self.pm.send('microphone', msg) |
|
|
|
@ -81,6 +88,15 @@ class Mic: |
|
|
|
|
|
|
|
|
|
def callback(self, indata, frames, time, status): |
|
|
|
|
self.measurements = np.concatenate((self.measurements, indata[:, 0])) |
|
|
|
|
print(self.measurements.size) |
|
|
|
|
|
|
|
|
|
if self.measurements.size >= FFT_SAMPLES: |
|
|
|
|
measurements = self.measurements[:FFT_SAMPLES] |
|
|
|
|
self.sound_pressure, _ = calculate_spl(measurements) |
|
|
|
|
measurements_weighted = apply_a_weighting(measurements) |
|
|
|
|
self.sound_pressure_weighted, self.sound_pressure_level_weighted = calculate_spl(measurements_weighted) |
|
|
|
|
|
|
|
|
|
self.measurements = self.measurements[FFT_SAMPLES:] |
|
|
|
|
|
|
|
|
|
def micd_thread(self, device=None): |
|
|
|
|
if device is None: |
|
|
|
|