You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							72 lines
						
					
					
						
							2.6 KiB
						
					
					
				
			
		
		
	
	
							72 lines
						
					
					
						
							2.6 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
import numpy as np
 | 
						|
 | 
						|
import cereal.messaging as messaging
 | 
						|
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType  # pylint: disable=no-name-in-module, import-error
 | 
						|
from common.transformations.camera import tici_f_frame_size
 | 
						|
from common.realtime import DT_MDL
 | 
						|
from selfdrive.manager.process_config import managed_processes
 | 
						|
 | 
						|
 | 
						|
VIPC_STREAM = {"roadCameraState": VisionStreamType.VISION_STREAM_ROAD, "driverCameraState": VisionStreamType.VISION_STREAM_DRIVER,
 | 
						|
               "wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD}
 | 
						|
 | 
						|
IMG = np.zeros(int(tici_f_frame_size[0]*tici_f_frame_size[1]*(3/2)), dtype=np.uint8)
 | 
						|
IMG_BYTES = IMG.flatten().tobytes()
 | 
						|
 | 
						|
class TestModeld(unittest.TestCase):
 | 
						|
 | 
						|
  def setUp(self):
 | 
						|
    self.vipc_server = VisionIpcServer("camerad")
 | 
						|
    self.vipc_server.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 40, False, *tici_f_frame_size)
 | 
						|
    self.vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *tici_f_frame_size)
 | 
						|
    self.vipc_server.create_buffers(VisionStreamType.VISION_STREAM_WIDE_ROAD, 40, False, *tici_f_frame_size)
 | 
						|
    self.vipc_server.start_listener()
 | 
						|
 | 
						|
    self.sm = messaging.SubMaster(['modelV2', 'cameraOdometry'])
 | 
						|
    self.pm = messaging.PubMaster(['roadCameraState', 'wideRoadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])
 | 
						|
 | 
						|
    managed_processes['modeld'].start()
 | 
						|
    time.sleep(0.2)
 | 
						|
    self.sm.update(1000)
 | 
						|
 | 
						|
  def tearDown(self):
 | 
						|
    managed_processes['modeld'].stop()
 | 
						|
    del self.vipc_server
 | 
						|
 | 
						|
  def test_modeld(self):
 | 
						|
    for n in range(1, 500):
 | 
						|
      for cam in ('roadCameraState', 'wideRoadCameraState'):
 | 
						|
        msg = messaging.new_message(cam)
 | 
						|
        cs = getattr(msg, cam)
 | 
						|
        cs.frameId = n
 | 
						|
        cs.timestampSof = int((n * DT_MDL) * 1e9)
 | 
						|
        cs.timestampEof = int(cs.timestampSof + (DT_MDL * 1e9))
 | 
						|
 | 
						|
        self.pm.send(msg.which(), msg)
 | 
						|
        self.vipc_server.send(VIPC_STREAM[msg.which()], IMG_BYTES, cs.frameId,
 | 
						|
                              cs.timestampSof, cs.timestampEof)
 | 
						|
 | 
						|
      self.sm.update(5000)
 | 
						|
      if self.sm['modelV2'].frameId != self.sm['cameraOdometry'].frameId:
 | 
						|
        self.sm.update(1000)
 | 
						|
 | 
						|
      mdl = self.sm['modelV2']
 | 
						|
      self.assertEqual(mdl.frameId, n)
 | 
						|
      self.assertEqual(mdl.frameIdExtra, n)
 | 
						|
      self.assertEqual(mdl.timestampEof, cs.timestampEof)
 | 
						|
      self.assertEqual(mdl.frameAge, 0)
 | 
						|
      self.assertEqual(mdl.frameDropPerc, 0)
 | 
						|
 | 
						|
      odo = self.sm['cameraOdometry']
 | 
						|
      self.assertEqual(odo.frameId, n)
 | 
						|
      self.assertEqual(odo.timestampEof, cs.timestampEof)
 | 
						|
 | 
						|
  def test_skipped_frames(self):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  unittest.main()
 | 
						|
 |