openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
3.6 KiB

#!/usr/bin/env python3
import argparse
import asyncio
import aiortc
import aiohttp
import cv2
import pygame
from teleoprtc import WebRTCOfferBuilder, StreamingOffer
def pygame_should_quit():
for event in pygame.event.get():
if event.type == pygame.QUIT:
return True
return False
class WebrtcdConnectionProvider:
"""
Connection provider reaching webrtcd server on comma three
"""
def __init__(self, host, port=5001):
self.url = f"http://{host}:{port}/stream"
async def __call__(self, offer: StreamingOffer) -> aiortc.RTCSessionDescription:
async with aiohttp.ClientSession() as session:
body = {'sdp': offer.sdp, 'cameras': offer.video, 'bridge_services_in': [], 'bridge_services_out': []}
async with session.post(self.url, json=body) as resp:
payload = await resp.json()
answer = aiortc.RTCSessionDescription(**payload)
return answer
class FaceDetector:
"""
Simple face detector using opencv
"""
def __init__(self):
self.classifier = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def detect(self, array):
gray_array = cv2.cvtColor(array, cv2.COLOR_RGB2GRAY)
faces = self.classifier.detectMultiScale(gray_array, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
return faces
def draw(self, array, faces):
for (x, y, w, h) in faces:
cv2.rectangle(array, (x, y), (x + w, y + h), (0, 255, 0), 2)
return array
async def run_face_detection(stream):
# setup pygame window
pygame.init()
screen_width, screen_height = 1280, 720
screen = pygame.display.set_mode((screen_width, screen_height))
pygame.display.set_caption("Face detection demo")
surface = pygame.Surface((screen_width, screen_height))
# get the driver camera video track from the stream
# generally its better to reuse the track object instead of getting it every time
track = stream.get_incoming_video_track("driver", buffered=False)
# cv2 face detector
detector = FaceDetector()
while stream.is_connected_and_ready and not pygame_should_quit():
try:
# receive frame as pyAV VideoFrame, convert to rgb24 numpy array
frame = await track.recv()
array = frame.to_ndarray(format="rgb24")
# detect faces and draw rects around them
resized_array = cv2.resize(array, (screen_width, screen_height))
faces = detector.detect(resized_array)
detector.draw(resized_array, faces)
# display the image
pygame.surfarray.blit_array(surface, resized_array.swapaxes(0, 1))
screen.blit(surface, (0, 0))
pygame.display.flip()
print("Received frame from", "driver", frame.time)
except aiortc.mediastreams.MediaStreamError:
break
pygame.quit()
await stream.stop()
async def run(args):
# build your own the offer stream
builder = WebRTCOfferBuilder(WebrtcdConnectionProvider(args.host))
# request video stream from drivers camera
builder.offer_to_receive_video_stream("driver")
# add cereal messaging streaming support
builder.add_messaging()
stream = builder.stream()
# start the stream then wait for connection
# server will receive the offer and attempt to fulfill it
await stream.start()
await stream.wait_for_connection()
# all the tracks and channel are ready to be used at this point
assert stream.has_incoming_video_track("driver") and stream.has_messaging_channel()
# run face detection loop on the drivers camera
await run_face_detection(stream)
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost", help="Host for webrtcd server")
args = parser.parse_args()
asyncio.run(run(args))