openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

143 lines
4.6 KiB

#!/usr/bin/env python3
import random
import time
import unittest
import numpy as np
import cereal.messaging as messaging
from selfdrive.test.helpers import with_processes
from selfdrive.camerad.snapshot.visionipc import VisionIPC
# only tests for EON and TICI
from selfdrive.hardware import EON, TICI
TEST_TIMESPAN = random.randint(60, 180) # seconds
SKIP_FRAME_TOLERANCE = 0
FRAME_COUNT_TOLERANCE = 1 # over the whole test time
FPS_BASELINE = 20
CAMERAS = {
"frame": FPS_BASELINE,
"frontFrame": FPS_BASELINE // 2,
}
if TICI:
CAMERAS["frontFrame"] = FPS_BASELINE
CAMERAS["wideFrame"] = FPS_BASELINE
class TestCamerad(unittest.TestCase):
@classmethod
def setUpClass(cls):
if not (EON or TICI):
raise unittest.SkipTest
def _get_snapshots(self):
ret = None
start_time = time.time()
while time.time() - start_time < 5.0:
try:
ipc = VisionIPC()
pic = ipc.get()
del ipc
ipc_front = VisionIPC(front=True) # need to add another for tici
fpic = ipc_front.get()
del ipc_front
ret = pic, fpic
break
except Exception:
time.sleep(1)
return ret
def _numpy_bgr2gray(self, im):
ret = np.clip(im[:,:,0] * 0.114 + im[:,:,1] * 0.587 + im[:,:,2] * 0.299, 0, 255).astype(np.uint8)
return ret
def _numpy_lap(self, im):
ret = np.zeros(im.shape)
ret += -4 * im
ret += np.concatenate([np.zeros((im.shape[0],1)),im[:,:-1]], axis=1)
ret += np.concatenate([im[:,1:],np.zeros((im.shape[0],1))], axis=1)
ret += np.concatenate([np.zeros((1,im.shape[1])),im[:-1,:]], axis=0)
ret += np.concatenate([im[1:,:],np.zeros((1,im.shape[1]))], axis=0)
ret = np.clip(ret, 0, 255).astype(np.uint8)
return ret
def _is_really_sharp(self, i, threshold=800, roi_max=np.array([8,6]), roi_xxyy=np.array([1,6,2,3])):
i = self._numpy_bgr2gray(i)
x_pitch = i.shape[1] // roi_max[0]
y_pitch = i.shape[0] // roi_max[1]
lap = self._numpy_lap(i)
lap_map = np.zeros((roi_max[1], roi_max[0]))
for r in range(lap_map.shape[0]):
for c in range(lap_map.shape[1]):
selected_lap = lap[r*y_pitch:(r+1)*y_pitch, c*x_pitch:(c+1)*x_pitch]
lap_map[r][c] = 5*selected_lap.var() + selected_lap.max()
print(lap_map[roi_xxyy[2]:roi_xxyy[3]+1,roi_xxyy[0]:roi_xxyy[1]+1])
if (lap_map[roi_xxyy[2]:roi_xxyy[3]+1,roi_xxyy[0]:roi_xxyy[1]+1] > threshold).sum() > \
(roi_xxyy[1]+1-roi_xxyy[0]) * (roi_xxyy[3]+1-roi_xxyy[2]) * 0.9:
return True
else:
return False
def _is_exposure_okay(self, i, med_ex=np.array([0.2,0.4]), mean_ex=np.array([0.2,0.6])):
i = self._numpy_bgr2gray(i)
i_median = np.median(i) / 256
i_mean = np.mean(i) / 256
print([i_median, i_mean])
return med_ex[0] < i_median < med_ex[1] and mean_ex[0] < i_mean < mean_ex[1]
@with_processes(['camerad'])
def test_camera_operation(self):
print("checking image outputs")
if EON:
# run checks similar to prov
time.sleep(15) # wait for startup and AF
pic, fpic = self._get_snapshots()
self.assertTrue(self._is_really_sharp(pic))
self.assertTrue(self._is_exposure_okay(pic))
self.assertTrue(self._is_exposure_okay(fpic))
time.sleep(30)
# check again for consistency
pic, fpic = self._get_snapshots()
self.assertTrue(self._is_really_sharp(pic))
self.assertTrue(self._is_exposure_okay(pic))
self.assertTrue(self._is_exposure_okay(fpic))
elif TICI:
raise unittest.SkipTest # TBD
else:
raise unittest.SkipTest
@with_processes(['camerad'])
def test_frame_packets(self):
print("checking frame pkts continuity")
print(TEST_TIMESPAN)
sm = messaging.SubMaster([socket_name for socket_name in CAMERAS])
last_frame_id = dict.fromkeys(CAMERAS, None)
start_frame_id = dict.fromkeys(CAMERAS, None)
start_time_milli = int(round(time.time() * 1000))
while int(round(time.time() * 1000)) - start_time_milli < (TEST_TIMESPAN+1) * 1000:
sm.update()
for camera in CAMERAS:
if sm.updated[camera]:
if start_frame_id[camera] is None:
start_frame_id[camera] = last_frame_id[camera] = sm[camera].frameId
continue
dfid = sm[camera].frameId - last_frame_id[camera]
self.assertTrue(abs(dfid - 1) <= SKIP_FRAME_TOLERANCE)
last_frame_id[camera] = sm[camera].frameId
time.sleep(0.01)
for camera in CAMERAS:
print(camera, (last_frame_id[camera] - start_frame_id[camera]))
self.assertTrue(abs((last_frame_id[camera] - start_frame_id[camera]) - TEST_TIMESPAN*CAMERAS[camera]) <= FRAME_COUNT_TOLERANCE)
if __name__ == "__main__":
unittest.main()