You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							98 lines
						
					
					
						
							2.7 KiB
						
					
					
				
			
		
		
	
	
							98 lines
						
					
					
						
							2.7 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import os
 | |
| import signal
 | |
| import subprocess
 | |
| import time
 | |
| 
 | |
| import numpy as np
 | |
| from PIL import Image
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from common.basedir import BASEDIR
 | |
| from common.params import Params
 | |
| from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, leon_d_frame_size, tici_f_frame_size
 | |
| from selfdrive.hardware import TICI
 | |
| from selfdrive.controls.lib.alertmanager import set_offroad_alert
 | |
| 
 | |
| 
 | |
| def jpeg_write(fn, dat):
 | |
|   img = Image.fromarray(dat)
 | |
|   img.save(fn, "JPEG")
 | |
| 
 | |
| 
 | |
| def extract_image(dat, frame_sizes):
 | |
|   img = np.frombuffer(dat, dtype=np.uint8)
 | |
|   w, h = frame_sizes[len(img) // 3]
 | |
|   b = img[::3].reshape(h, w)
 | |
|   g = img[1::3].reshape(h, w)
 | |
|   r = img[2::3].reshape(h, w)
 | |
|   return np.dstack([r, g, b])
 | |
| 
 | |
| 
 | |
| def get_snapshots(frame="roadCameraState", front_frame="driverCameraState"):
 | |
|   frame_sizes = [eon_f_frame_size, eon_d_frame_size, leon_d_frame_size, tici_f_frame_size]
 | |
|   frame_sizes = {w * h: (w, h) for (w, h) in frame_sizes}
 | |
| 
 | |
|   sm = messaging.SubMaster([frame, front_frame])
 | |
|   while min(sm.logMonoTime.values()) == 0:
 | |
|     sm.update()
 | |
| 
 | |
|   rear = extract_image(sm[frame].image, frame_sizes)
 | |
|   front = extract_image(sm[front_frame].image, frame_sizes)
 | |
|   return rear, front
 | |
| 
 | |
| 
 | |
| def snapshot():
 | |
|   params = Params()
 | |
|   front_camera_allowed = int(params.get("RecordFront"))
 | |
| 
 | |
|   if params.get("IsOffroad") != b"1" or params.get("IsTakingSnapshot") == b"1":
 | |
|     print("Already taking snapshot")
 | |
|     return None, None
 | |
| 
 | |
|   params.put("IsTakingSnapshot", "1")
 | |
|   set_offroad_alert("Offroad_IsTakingSnapshot", True)
 | |
|   time.sleep(2.0)  # Give thermald time to read the param, or if just started give camerad time to start
 | |
| 
 | |
|   # Check if camerad is already started
 | |
|   try:
 | |
|     subprocess.check_call(["pgrep", "camerad"])
 | |
| 
 | |
|     print("Camerad already running")
 | |
|     params.put("IsTakingSnapshot", "0")
 | |
|     params.delete("Offroad_IsTakingSnapshot")
 | |
|     return None, None
 | |
|   except subprocess.CalledProcessError:
 | |
|     pass
 | |
| 
 | |
|   env = os.environ.copy()
 | |
|   env["SEND_ROAD"] = "1"
 | |
|   env["SEND_WIDE_ROAD"] = "1"
 | |
|   env["SEND_DRIVER"] = "1"
 | |
|   proc = subprocess.Popen(os.path.join(BASEDIR, "selfdrive/camerad/camerad"),
 | |
|                           cwd=os.path.join(BASEDIR, "selfdrive/camerad"), env=env)
 | |
|   time.sleep(3.0)
 | |
| 
 | |
|   frame = "wideRoadCameraState" if TICI else "roadCameraState"
 | |
|   rear, front = get_snapshots(frame)
 | |
| 
 | |
|   proc.send_signal(signal.SIGINT)
 | |
|   proc.communicate()
 | |
| 
 | |
|   params.put("IsTakingSnapshot", "0")
 | |
|   set_offroad_alert("Offroad_IsTakingSnapshot", False)
 | |
| 
 | |
|   if not front_camera_allowed:
 | |
|     front = None
 | |
| 
 | |
|   return rear, front
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   pic, fpic = snapshot()
 | |
|   if pic is not None:
 | |
|     print(pic.shape)
 | |
|     jpeg_write("/tmp/back.jpg", pic)
 | |
|     jpeg_write("/tmp/front.jpg", fpic)
 | |
|   else:
 | |
|     print("Error taking snapshot")
 | |
| 
 |