You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							45 lines
						
					
					
						
							1.7 KiB
						
					
					
				
			
		
		
	
	
							45 lines
						
					
					
						
							1.7 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| 
 | |
| import unittest
 | |
| 
 | |
| from parameterized import parameterized
 | |
| 
 | |
| from openpilot.selfdrive.test.process_replay.regen import regen_segment, DummyFrameReader
 | |
| from openpilot.selfdrive.test.process_replay.process_replay import check_openpilot_enabled
 | |
| from openpilot.tools.lib.openpilotci import get_url
 | |
| from openpilot.tools.lib.logreader import LogReader
 | |
| from openpilot.tools.lib.framereader import FrameReader
 | |
| 
 | |
| TESTED_SEGMENTS = [
 | |
|   ("PRIUS_C2", "0982d79ebb0de295|2021-01-04--17-13-21--13"), # TOYOTA.TOYOTA_PRIUS:     NEO, pandaStateDEPRECATED, no peripheralState, sensorEventsDEPRECATED
 | |
|   # Enable these once regen on CI becomes faster or use them for different tests running controlsd in isolation
 | |
|   # ("MAZDA_C3", "bd6a637565e91581|2021-10-30--15-14-53--4"),  # MAZDA.CX9_2021:        TICI, incomplete managerState
 | |
|   # ("FORD_C3", "54827bf84c38b14f|2023-01-26--21-59-07--4"),   # FORD.BRONCO_SPORT_MK1: TICI
 | |
| ]
 | |
| 
 | |
| 
 | |
| def ci_setup_data_readers(route, sidx):
 | |
|   lr = LogReader(get_url(route, sidx, "rlog"))
 | |
|   frs = {
 | |
|     'roadCameraState': FrameReader(get_url(route, sidx, "fcamera")),
 | |
|     'driverCameraState': DummyFrameReader.zero_dcamera()
 | |
|   }
 | |
|   if next((True for m in lr if m.which() == "wideRoadCameraState"), False):
 | |
|     frs["wideRoadCameraState"] = FrameReader(get_url(route, sidx, "ecamera"))
 | |
| 
 | |
|   return lr, frs
 | |
| 
 | |
| 
 | |
| class TestRegen(unittest.TestCase):
 | |
|   @parameterized.expand(TESTED_SEGMENTS)
 | |
|   def test_engaged(self, case_name, segment):
 | |
|     route, sidx = segment.rsplit("--", 1)
 | |
|     lr, frs = ci_setup_data_readers(route, sidx)
 | |
|     output_logs = regen_segment(lr, frs, disable_tqdm=True)
 | |
| 
 | |
|     engaged = check_openpilot_enabled(output_logs)
 | |
|     self.assertTrue(engaged, f"openpilot not engaged in {case_name}")
 | |
| 
 | |
| 
 | |
| if __name__=='__main__':
 | |
|   unittest.main()
 | |
| 
 |