You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
126 lines
3.7 KiB
126 lines
3.7 KiB
6 years ago
|
#!/usr/bin/env python3
|
||
|
import subprocess
|
||
|
import time
|
||
5 years ago
|
|
||
|
import numpy as np
|
||
6 years ago
|
from PIL import Image
|
||
5 years ago
|
|
||
|
import cereal.messaging as messaging
|
||
1 year ago
|
from msgq.visionipc import VisionIpcClient, VisionStreamType
|
||
2 years ago
|
from openpilot.common.params import Params
|
||
|
from openpilot.common.realtime import DT_MDL
|
||
|
from openpilot.system.hardware import PC
|
||
1 year ago
|
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
|
||
1 year ago
|
from openpilot.system.manager.process_config import managed_processes
|
||
4 years ago
|
|
||
6 years ago
|
|
||
4 years ago
|
VISION_STREAMS = {
|
||
4 years ago
|
"roadCameraState": VisionStreamType.VISION_STREAM_ROAD,
|
||
|
"driverCameraState": VisionStreamType.VISION_STREAM_DRIVER,
|
||
|
"wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD,
|
||
4 years ago
|
}
|
||
|
|
||
6 years ago
|
|
||
|
def jpeg_write(fn, dat):
|
||
|
img = Image.fromarray(dat)
|
||
|
img.save(fn, "JPEG")
|
||
|
|
||
|
|
||
4 years ago
|
def yuv_to_rgb(y, u, v):
|
||
|
ul = np.repeat(np.repeat(u, 2).reshape(u.shape[0], y.shape[1]), 2, axis=0).reshape(y.shape)
|
||
|
vl = np.repeat(np.repeat(v, 2).reshape(v.shape[0], y.shape[1]), 2, axis=0).reshape(y.shape)
|
||
|
|
||
|
yuv = np.dstack((y, ul, vl)).astype(np.int16)
|
||
|
yuv[:, :, 1:] -= 128
|
||
|
|
||
|
m = np.array([
|
||
|
[1.00000, 1.00000, 1.00000],
|
||
|
[0.00000, -0.39465, 2.03211],
|
||
|
[1.13983, -0.58060, 0.00000],
|
||
|
])
|
||
3 years ago
|
rgb = np.dot(yuv, m).clip(0, 255)
|
||
4 years ago
|
return rgb.astype(np.uint8)
|
||
|
|
||
|
|
||
2 years ago
|
def extract_image(buf):
|
||
|
y = np.array(buf.data[:buf.uv_offset], dtype=np.uint8).reshape((-1, buf.stride))[:buf.height, :buf.width]
|
||
|
u = np.array(buf.data[buf.uv_offset::2], dtype=np.uint8).reshape((-1, buf.stride//2))[:buf.height//2, :buf.width//2]
|
||
|
v = np.array(buf.data[buf.uv_offset+1::2], dtype=np.uint8).reshape((-1, buf.stride//2))[:buf.height//2, :buf.width//2]
|
||
4 years ago
|
|
||
|
return yuv_to_rgb(y, u, v)
|
||
5 years ago
|
|
||
|
|
||
3 years ago
|
def get_snapshots(frame="roadCameraState", front_frame="driverCameraState"):
|
||
4 years ago
|
sockets = [s for s in (frame, front_frame) if s is not None]
|
||
|
sm = messaging.SubMaster(sockets)
|
||
|
vipc_clients = {s: VisionIpcClient("camerad", VISION_STREAMS[s], True) for s in sockets}
|
||
5 years ago
|
|
||
4 years ago
|
# wait 4 sec from camerad startup for focus and exposure
|
||
|
while sm[sockets[0]].frameId < int(4. / DT_MDL):
|
||
5 years ago
|
sm.update()
|
||
|
|
||
4 years ago
|
for client in vipc_clients.values():
|
||
|
client.connect(True)
|
||
|
|
||
|
# grab images
|
||
|
rear, front = None, None
|
||
|
if frame is not None:
|
||
|
c = vipc_clients[frame]
|
||
2 years ago
|
rear = extract_image(c.recv())
|
||
4 years ago
|
if front_frame is not None:
|
||
|
c = vipc_clients[front_frame]
|
||
2 years ago
|
front = extract_image(c.recv())
|
||
5 years ago
|
return rear, front
|
||
|
|
||
|
|
||
6 years ago
|
def snapshot():
|
||
|
params = Params()
|
||
|
|
||
5 years ago
|
if (not params.get_bool("IsOffroad")) or params.get_bool("IsTakingSnapshot"):
|
||
5 years ago
|
print("Already taking snapshot")
|
||
|
return None, None
|
||
6 years ago
|
|
||
4 years ago
|
front_camera_allowed = params.get_bool("RecordFront")
|
||
5 years ago
|
params.put_bool("IsTakingSnapshot", True)
|
||
5 years ago
|
set_offroad_alert("Offroad_IsTakingSnapshot", True)
|
||
1 year ago
|
time.sleep(2.0) # Give hardwared time to read the param, or if just started give camerad time to start
|
||
6 years ago
|
|
||
|
# Check if camerad is already started
|
||
5 years ago
|
try:
|
||
|
subprocess.check_call(["pgrep", "camerad"])
|
||
|
print("Camerad already running")
|
||
5 years ago
|
params.put_bool("IsTakingSnapshot", False)
|
||
3 years ago
|
params.remove("Offroad_IsTakingSnapshot")
|
||
5 years ago
|
return None, None
|
||
|
except subprocess.CalledProcessError:
|
||
|
pass
|
||
6 years ago
|
|
||
4 years ago
|
try:
|
||
4 years ago
|
# Allow testing on replay on PC
|
||
|
if not PC:
|
||
|
managed_processes['camerad'].start()
|
||
|
|
||
3 years ago
|
frame = "wideRoadCameraState"
|
||
4 years ago
|
front_frame = "driverCameraState" if front_camera_allowed else None
|
||
3 years ago
|
rear, front = get_snapshots(frame, front_frame)
|
||
4 years ago
|
finally:
|
||
|
managed_processes['camerad'].stop()
|
||
|
params.put_bool("IsTakingSnapshot", False)
|
||
|
set_offroad_alert("Offroad_IsTakingSnapshot", False)
|
||
5 years ago
|
|
||
|
if not front_camera_allowed:
|
||
|
front = None
|
||
|
|
||
|
return rear, front
|
||
6 years ago
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
pic, fpic = snapshot()
|
||
5 years ago
|
if pic is not None:
|
||
|
print(pic.shape)
|
||
|
jpeg_write("/tmp/back.jpg", pic)
|
||
4 years ago
|
if fpic is not None:
|
||
|
jpeg_write("/tmp/front.jpg", fpic)
|
||
5 years ago
|
else:
|
||
|
print("Error taking snapshot")
|