You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					144 lines
				
				4.6 KiB
			
		
		
			
		
	
	
					144 lines
				
				4.6 KiB
			| 
								 
											5 years ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import random
							 | 
						||
| 
								 | 
							
								import time
							 | 
						||
| 
								 | 
							
								import unittest
							 | 
						||
| 
								 | 
							
								import numpy as np
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								import cereal.messaging as messaging
							 | 
						||
| 
								 | 
							
								from selfdrive.test.helpers import with_processes
							 | 
						||
| 
								 | 
							
								from selfdrive.camerad.snapshot.visionipc import VisionIPC
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from common.hardware import EON, TICI
							 | 
						||
| 
								 | 
							
								# only tests for EON and TICI
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								TEST_TIMESPAN = random.randint(60, 180) # seconds
							 | 
						||
| 
								 | 
							
								SKIP_FRAME_TOLERANCE = 0
							 | 
						||
| 
								 | 
							
								FRAME_COUNT_TOLERANCE = 1 # over the whole test time
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								FPS_BASELINE = 20
							 | 
						||
| 
								 | 
							
								CAMERAS = {
							 | 
						||
| 
								 | 
							
								  "frame": FPS_BASELINE,
							 | 
						||
| 
								 | 
							
								  "frontFrame": FPS_BASELINE // 2,
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if TICI:
							 | 
						||
| 
								 | 
							
								  CAMERAS["frontFrame"] = FPS_BASELINE
							 | 
						||
| 
								 | 
							
								  CAMERAS["wideFrame"] = FPS_BASELINE
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestCamerad(unittest.TestCase):
							 | 
						||
| 
								 | 
							
								  @classmethod
							 | 
						||
| 
								 | 
							
								  def setUpClass(cls):
							 | 
						||
| 
								 | 
							
								    if not (EON or TICI):
							 | 
						||
| 
								 | 
							
								      raise unittest.SkipTest
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _get_snapshots(self):
							 | 
						||
| 
								 | 
							
								    ret = None
							 | 
						||
| 
								 | 
							
								    start_time = time.time()
							 | 
						||
| 
								 | 
							
								    while time.time() - start_time < 5.0:
							 | 
						||
| 
								 | 
							
								      try:
							 | 
						||
| 
								 | 
							
								        ipc = VisionIPC()
							 | 
						||
| 
								 | 
							
								        pic = ipc.get()
							 | 
						||
| 
								 | 
							
								        del ipc
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        ipc_front = VisionIPC(front=True) # need to add another for tici
							 | 
						||
| 
								 | 
							
								        fpic = ipc_front.get()
							 | 
						||
| 
								 | 
							
								        del ipc_front
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        ret = pic, fpic
							 | 
						||
| 
								 | 
							
								        break
							 | 
						||
| 
								 | 
							
								      except Exception:
							 | 
						||
| 
								 | 
							
								        time.sleep(1)
							 | 
						||
| 
								 | 
							
								    return ret
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _numpy_bgr2gray(self, im):
							 | 
						||
| 
								 | 
							
								    ret = np.clip(im[:,:,0] * 0.114 + im[:,:,1] * 0.587 + im[:,:,2] * 0.299, 0, 255).astype(np.uint8)
							 | 
						||
| 
								 | 
							
								    return ret
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _numpy_lap(self, im):
							 | 
						||
| 
								 | 
							
								    ret = np.zeros(im.shape)
							 | 
						||
| 
								 | 
							
								    ret += -4 * im
							 | 
						||
| 
								 | 
							
								    ret += np.concatenate([np.zeros((im.shape[0],1)),im[:,:-1]], axis=1)
							 | 
						||
| 
								 | 
							
								    ret += np.concatenate([im[:,1:],np.zeros((im.shape[0],1))], axis=1)
							 | 
						||
| 
								 | 
							
								    ret += np.concatenate([np.zeros((1,im.shape[1])),im[:-1,:]], axis=0)
							 | 
						||
| 
								 | 
							
								    ret += np.concatenate([im[1:,:],np.zeros((1,im.shape[1]))], axis=0)
							 | 
						||
| 
								 | 
							
								    ret = np.clip(ret, 0, 255).astype(np.uint8)
							 | 
						||
| 
								 | 
							
								    return ret
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _is_really_sharp(self, i, threshold=800, roi_max=np.array([8,6]), roi_xxyy=np.array([1,6,2,3])):
							 | 
						||
| 
								 | 
							
								    i = self._numpy_bgr2gray(i)
							 | 
						||
| 
								 | 
							
								    x_pitch = i.shape[1] // roi_max[0]
							 | 
						||
| 
								 | 
							
								    y_pitch = i.shape[0] // roi_max[1]
							 | 
						||
| 
								 | 
							
								    lap = self._numpy_lap(i)
							 | 
						||
| 
								 | 
							
								    lap_map = np.zeros((roi_max[1], roi_max[0]))
							 | 
						||
| 
								 | 
							
								    for r in range(lap_map.shape[0]):
							 | 
						||
| 
								 | 
							
								      for c in range(lap_map.shape[1]):
							 | 
						||
| 
								 | 
							
								        selected_lap = lap[r*y_pitch:(r+1)*y_pitch, c*x_pitch:(c+1)*x_pitch]
							 | 
						||
| 
								 | 
							
								        lap_map[r][c] = 5*selected_lap.var() + selected_lap.max()
							 | 
						||
| 
								 | 
							
								    print(lap_map[roi_xxyy[2]:roi_xxyy[3]+1,roi_xxyy[0]:roi_xxyy[1]+1])
							 | 
						||
| 
								 | 
							
								    if (lap_map[roi_xxyy[2]:roi_xxyy[3]+1,roi_xxyy[0]:roi_xxyy[1]+1] > threshold).sum() > \
							 | 
						||
| 
								 | 
							
								          (roi_xxyy[1]+1-roi_xxyy[0]) * (roi_xxyy[3]+1-roi_xxyy[2]) * 0.9:
							 | 
						||
| 
								 | 
							
								      return True
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								      return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _is_exposure_okay(self, i, med_ex=np.array([0.2,0.4]), mean_ex=np.array([0.2,0.6])):
							 | 
						||
| 
								 | 
							
								    i = self._numpy_bgr2gray(i)
							 | 
						||
| 
								 | 
							
								    i_median = np.median(i) / 256
							 | 
						||
| 
								 | 
							
								    i_mean = np.mean(i) / 256
							 | 
						||
| 
								 | 
							
								    print([i_median, i_mean])
							 | 
						||
| 
								 | 
							
								    return med_ex[0] < i_median < med_ex[1] and mean_ex[0] < i_mean < mean_ex[1]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  @with_processes(['camerad'])
							 | 
						||
| 
								 | 
							
								  def test_camera_operation(self):
							 | 
						||
| 
								 | 
							
								    print("checking image outputs")
							 | 
						||
| 
								 | 
							
								    if EON:
							 | 
						||
| 
								 | 
							
								      # run checks similar to prov
							 | 
						||
| 
								 | 
							
								      time.sleep(15) # wait for startup and AF
							 | 
						||
| 
								 | 
							
								      pic, fpic = self._get_snapshots()
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_really_sharp(pic))
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_exposure_okay(pic))
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_exposure_okay(fpic))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      time.sleep(30)
							 | 
						||
| 
								 | 
							
								      # check again for consistency
							 | 
						||
| 
								 | 
							
								      pic, fpic = self._get_snapshots()
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_really_sharp(pic))
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_exposure_okay(pic))
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._is_exposure_okay(fpic))
							 | 
						||
| 
								 | 
							
								    elif TICI:
							 | 
						||
| 
								 | 
							
								      raise unittest.SkipTest # TBD
							 | 
						||
| 
								 | 
							
								    else:
							 | 
						||
| 
								 | 
							
								      raise unittest.SkipTest
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  @with_processes(['camerad'])
							 | 
						||
| 
								 | 
							
								  def test_frame_packets(self):
							 | 
						||
| 
								 | 
							
								    print("checking frame pkts continuity")
							 | 
						||
| 
								 | 
							
								    print(TEST_TIMESPAN)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    sm = messaging.SubMaster([socket_name for socket_name in CAMERAS])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    last_frame_id = dict.fromkeys(CAMERAS, None)
							 | 
						||
| 
								 | 
							
								    start_frame_id = dict.fromkeys(CAMERAS, None)
							 | 
						||
| 
								 | 
							
								    start_time_milli = int(round(time.time() * 1000))
							 | 
						||
| 
								 | 
							
								    while int(round(time.time() * 1000)) - start_time_milli < (TEST_TIMESPAN+1) * 1000:
							 | 
						||
| 
								 | 
							
								      sm.update()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      for camera in CAMERAS:
							 | 
						||
| 
								 | 
							
								        if sm.updated[camera]:
							 | 
						||
| 
								 | 
							
								          if start_frame_id[camera] is None:
							 | 
						||
| 
								 | 
							
								            start_frame_id[camera] = last_frame_id[camera] = sm[camera].frameId
							 | 
						||
| 
								 | 
							
								            continue
							 | 
						||
| 
								 | 
							
								          dfid = sm[camera].frameId - last_frame_id[camera]
							 | 
						||
| 
								 | 
							
								          self.assertTrue(abs(dfid - 1) <= SKIP_FRAME_TOLERANCE)
							 | 
						||
| 
								 | 
							
								          last_frame_id[camera] = sm[camera].frameId
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      time.sleep(0.01)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    for camera in CAMERAS:
							 | 
						||
| 
								 | 
							
								      print(camera, (last_frame_id[camera] - start_frame_id[camera]))
							 | 
						||
| 
								 | 
							
								      self.assertTrue(abs((last_frame_id[camera] - start_frame_id[camera]) - TEST_TIMESPAN*CAMERAS[camera]) <= FRAME_COUNT_TOLERANCE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == "__main__":
							 | 
						||
| 
								 | 
							
								  unittest.main()
							 |