openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

69 lines
2.1 KiB

#!/usr/bin/env python3
import sounddevice as sd
import numpy as np
from cereal import messaging
from common.filter_simple import FirstOrderFilter
from common.realtime import Ratekeeper
from system.hardware import HARDWARE
from system.swaglog import cloudlog
RATE = 10
DT_MIC = 1. / RATE
REFERENCE_SPL = 2 * 10 ** -5 # newtons/m^2
class Mic:
def __init__(self, pm):
self.pm = pm
self.rk = Ratekeeper(RATE)
self.measurements = np.empty(0)
self.spl_filter = FirstOrderFilter(0, 4, DT_MIC, initialized=False)
def update(self):
# self.measurements contains amplitudes from -1 to 1 which we use to
# calculate an uncalibrated sound pressure level
if len(self.measurements) > 0:
# https://www.engineeringtoolbox.com/sound-pressure-d_711.html
sound_pressure = np.sqrt(np.mean(self.measurements ** 2)) # RMS of amplitudes
sound_pressure_level = 20 * np.log10(sound_pressure / REFERENCE_SPL) if sound_pressure > 0 else 0 # dB
if not HARDWARE.is_sound_playing():
self.spl_filter.update(sound_pressure_level)
else:
sound_pressure = 0
sound_pressure_level = 0
self.measurements = np.empty(0)
msg = messaging.new_message('microphone')
msg.microphone.soundPressure = float(sound_pressure)
msg.microphone.soundPressureDb = float(sound_pressure_level)
msg.microphone.filteredSoundPressureDb = float(self.spl_filter.x)
self.pm.send('microphone', msg)
self.rk.keep_time()
def callback(self, indata, frames, time, status):
self.measurements = np.concatenate((self.measurements, indata[:, 0]))
def micd_thread(self, device=None):
if device is None:
device = "sysdefault"
with sd.InputStream(device=device, channels=1, samplerate=44100, callback=self.callback) as stream:
cloudlog.info(f"micd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}")
while True:
self.update()
def main(pm=None, sm=None):
if pm is None:
pm = messaging.PubMaster(['microphone'])
mic = Mic(pm)
mic.micd_thread()
if __name__ == "__main__":
main()