|  |  | @ -10,7 +10,35 @@ from system.swaglog import cloudlog | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | RATE = 10 |  |  |  | RATE = 10 | 
			
		
	
		
		
			
				
					
					|  |  |  | DT_MIC = 1. / RATE |  |  |  | DT_MIC = 1. / RATE | 
			
		
	
		
		
			
				
					
					|  |  |  | REFERENCE_SPL = 2 * 10 ** -5  # newtons/m^2 |  |  |  | REFERENCE_SPL = 2e-5  # newtons/m^2 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | SAMPLE_RATE = 44100 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | def calculate_spl(measurements): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # https://www.engineeringtoolbox.com/sound-pressure-d_711.html | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   sound_pressure = np.sqrt(np.mean(measurements ** 2))  # RMS of amplitudes | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   if sound_pressure > 0: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL)  # dB | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   else: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     sound_pressure_level = 0 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   return sound_pressure, sound_pressure_level | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | def apply_a_weighting(measurements: np.ndarray) -> np.ndarray: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # Generate a Hanning window of the same length as the audio measurements | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   hanning_window = np.hanning(len(measurements)) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   measurements_windowed = measurements * hanning_window | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # Calculate the frequency axis for the signal | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   freqs = np.fft.fftfreq(measurements_windowed.size, d=1 / SAMPLE_RATE) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # Calculate the A-weighting filter | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # https://en.wikipedia.org/wiki/A-weighting | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   A = 12194 ** 2 * freqs ** 4 / ((freqs ** 2 + 20.6 ** 2) * (freqs ** 2 + 12194 ** 2) * np.sqrt((freqs ** 2 + 107.7 ** 2) * (freqs ** 2 + 737.9 ** 2))) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   A /= np.max(A)  # Normalize the filter | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # Apply the A-weighting filter to the signal | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   return np.abs(np.fft.ifft(np.fft.fft(measurements_windowed) * A)) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | class Mic: |  |  |  | class Mic: | 
			
		
	
	
		
		
			
				
					|  |  | @ -19,27 +47,35 @@ class Mic: | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.rk = Ratekeeper(RATE) |  |  |  |     self.rk = Ratekeeper(RATE) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.measurements = np.empty(0) |  |  |  |     self.measurements = np.empty(0) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.spl_filter = FirstOrderFilter(0, 4, DT_MIC, initialized=False) |  |  |  |     self.spl_filter_weighted = FirstOrderFilter(0, 2.5, DT_MIC, initialized=False) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def update(self): |  |  |  |   def update(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     # self.measurements contains amplitudes from -1 to 1 which we use to |  |  |  |     """ | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     # calculate an uncalibrated sound pressure level |  |  |  |     Using amplitude measurements, calculate an uncalibrated sound pressure and sound pressure level. | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     Then apply A-weighting to the raw amplitudes and run the same calculations again. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     Logged A-weighted equivalents are rough approximations of the human-perceived loudness. | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     """ | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     if len(self.measurements) > 0: |  |  |  |     if len(self.measurements) > 0: | 
			
		
	
		
		
			
				
					
					|  |  |  |       # https://www.engineeringtoolbox.com/sound-pressure-d_711.html |  |  |  |       sound_pressure, _ = calculate_spl(self.measurements) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       sound_pressure = np.sqrt(np.mean(self.measurements ** 2))  # RMS of amplitudes |  |  |  |       measurements_weighted = apply_a_weighting(self.measurements) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL) if sound_pressure > 0 else 0  # dB |  |  |  |       sound_pressure_weighted, sound_pressure_level_weighted = calculate_spl(measurements_weighted) | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |       if not HARDWARE.is_sound_playing(): |  |  |  |       if not HARDWARE.is_sound_playing(): | 
			
		
	
		
		
			
				
					
					|  |  |  |         self.spl_filter.update(sound_pressure_level) |  |  |  |         self.spl_filter_weighted.update(sound_pressure_level_weighted) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     else: |  |  |  |     else: | 
			
		
	
		
		
			
				
					
					|  |  |  |       sound_pressure = 0 |  |  |  |       sound_pressure = 0 | 
			
		
	
		
		
			
				
					
					|  |  |  |       sound_pressure_level = 0 |  |  |  |       sound_pressure_weighted = 0 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       sound_pressure_level_weighted = 0 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.measurements = np.empty(0) |  |  |  |     self.measurements = np.empty(0) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     msg = messaging.new_message('microphone') |  |  |  |     msg = messaging.new_message('microphone') | 
			
		
	
		
		
			
				
					
					|  |  |  |     msg.microphone.soundPressure = float(sound_pressure) |  |  |  |     msg.microphone.soundPressure = float(sound_pressure) | 
			
		
	
		
		
			
				
					
					|  |  |  |     msg.microphone.soundPressureDb = float(sound_pressure_level) |  |  |  |     msg.microphone.soundPressureWeighted = float(sound_pressure_weighted) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     msg.microphone.filteredSoundPressureDb = float(self.spl_filter.x) |  |  |  | 
 | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     msg.microphone.soundPressureWeightedDb = float(sound_pressure_level_weighted) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     msg.microphone.filteredSoundPressureWeightedDb = float(self.spl_filter_weighted.x) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.pm.send('microphone', msg) |  |  |  |     self.pm.send('microphone', msg) | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.rk.keep_time() |  |  |  |     self.rk.keep_time() | 
			
		
	
	
		
		
			
				
					|  |  | @ -51,13 +87,13 @@ class Mic: | 
			
		
	
		
		
			
				
					
					|  |  |  |     if device is None: |  |  |  |     if device is None: | 
			
		
	
		
		
			
				
					
					|  |  |  |       device = "sysdefault" |  |  |  |       device = "sysdefault" | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     with sd.InputStream(device=device, channels=1, samplerate=44100, callback=self.callback) as stream: |  |  |  |     with sd.InputStream(device=device, channels=1, samplerate=SAMPLE_RATE, callback=self.callback) as stream: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |       cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}") |  |  |  |       cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}") | 
			
		
	
		
		
			
				
					
					|  |  |  |       while True: |  |  |  |       while True: | 
			
		
	
		
		
			
				
					
					|  |  |  |         self.update() |  |  |  |         self.update() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def main(pm=None, sm=None): |  |  |  | def main(pm=None): | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   if pm is None: |  |  |  |   if pm is None: | 
			
		
	
		
		
			
				
					
					|  |  |  |     pm = messaging.PubMaster(['microphone']) |  |  |  |     pm = messaging.PubMaster(['microphone']) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | 
 |