|  |  | @ -19,12 +19,12 @@ GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0'))) | 
			
		
	
		
		
			
				
					
					|  |  |  | class TestRawgpsd(unittest.TestCase): |  |  |  | class TestRawgpsd(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |   @classmethod |  |  |  |   @classmethod | 
			
		
	
		
		
			
				
					
					|  |  |  |   def setUpClass(cls): |  |  |  |   def setUpClass(cls): | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl restart systemd-resolved") |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl restart ModemManager lte") |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     wait_for_modem() |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |     if not TICI: |  |  |  |     if not TICI: | 
			
		
	
		
		
			
				
					
					|  |  |  |       raise unittest.SkipTest |  |  |  |       raise unittest.SkipTest | 
			
		
	
		
		
			
				
					
					|  |  |  |     cls.sm = messaging.SubMaster(['qcomGnss', 'gpsLocation', 'gnssMeasurements']) |  |  |  | 
 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     os.system("sudo systemctl start systemd-resolved") | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     os.system("sudo systemctl restart ModemManager lte") | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     wait_for_modem() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   @classmethod |  |  |  |   @classmethod | 
			
		
	
		
		
			
				
					
					|  |  |  |   def tearDownClass(cls): |  |  |  |   def tearDownClass(cls): | 
			
		
	
	
		
		
			
				
					|  |  | @ -34,40 +34,44 @@ class TestRawgpsd(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def setUp(self): |  |  |  |   def setUp(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     at_cmd("AT+QGPSDEL=0") |  |  |  |     at_cmd("AT+QGPSDEL=0") | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     self.sm = messaging.SubMaster(['qcomGnss', 'gpsLocation', 'gnssMeasurements']) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def tearDown(self): |  |  |  |   def tearDown(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].stop() |  |  |  |     managed_processes['rawgpsd'].stop() | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl restart systemd-resolved") |  |  |  |     os.system("sudo systemctl restart systemd-resolved") | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def _wait_for_output(self, t=10): |  |  |  |   def _wait_for_output(self, t): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     time.sleep(t) |  |  |  |     dt = 0.1 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     self.sm.update() |  |  |  |     for _ in range(t*int(1/dt)): | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       self.sm.update(0) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       if self.sm.updated['qcomGnss']: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         break | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       time.sleep(dt) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     return self.sm.updated['qcomGnss'] | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_no_crash_double_command(self): |  |  |  |   def test_no_crash_double_command(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     at_cmd("AT+QGPSDEL=0") |  |  |  |     at_cmd("AT+QGPSDEL=0") | 
			
		
	
		
		
			
				
					
					|  |  |  |     at_cmd("AT+QGPSDEL=0") |  |  |  |     at_cmd("AT+QGPSDEL=0") | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_wait_for_modem(self): |  |  |  |   def test_wait_for_modem(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl stop ModemManager lte") |  |  |  |     os.system("sudo systemctl stop ModemManager") | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].start() |  |  |  |     managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(10) |  |  |  |     assert not self._wait_for_output(5) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     assert not self.sm.updated['qcomGnss'] |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl restart ModemManager lte") |  |  |  |     os.system("sudo systemctl restart ModemManager") | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(30) |  |  |  |     assert self._wait_for_output(30) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     assert self.sm.updated['qcomGnss'] |  |  |  |  | 
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_startup_time(self): |  |  |  |   def test_startup_time(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     for i in range(2): |  |  |  |     for internet in (True, False): | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       if i == 1: |  |  |  |       if not internet: | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  |         os.system("sudo systemctl stop systemd-resolved") |  |  |  |         os.system("sudo systemctl stop systemd-resolved") | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       with self.subTest(internet=internet): | 
			
		
	
		
		
			
				
					
					|  |  |  |         managed_processes['rawgpsd'].start() |  |  |  |         managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |       self._wait_for_output(10) |  |  |  |         assert self._wait_for_output(10) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       assert self.sm.updated['qcomGnss'] |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |         managed_processes['rawgpsd'].stop() |  |  |  |         managed_processes['rawgpsd'].stop() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_turns_off_gnss(self): |  |  |  |   def test_turns_off_gnss(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     for s in (0.1, 0.5, 1, 5): |  |  |  |     for s in (0.1, 1, 5): | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |       managed_processes['rawgpsd'].start() |  |  |  |       managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |       time.sleep(s) |  |  |  |       time.sleep(s) | 
			
		
	
		
		
			
				
					
					|  |  |  |       managed_processes['rawgpsd'].stop() |  |  |  |       managed_processes['rawgpsd'].stop() | 
			
		
	
	
		
		
			
				
					|  |  | @ -95,8 +99,7 @@ class TestRawgpsd(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   def test_assistance_loading(self): |  |  |  |   def test_assistance_loading(self): | 
			
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].start() |  |  |  |     managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(10) |  |  |  |     assert self._wait_for_output(10) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     assert self.sm.updated['qcomGnss'] |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].stop() |  |  |  |     managed_processes['rawgpsd'].stop() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.check_assistance(True) |  |  |  |     self.check_assistance(True) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -104,8 +107,7 @@ class TestRawgpsd(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl stop systemd-resolved") |  |  |  |     os.system("sudo systemctl stop systemd-resolved") | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].start() |  |  |  |     managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(10) |  |  |  |     assert self._wait_for_output(10) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |     assert self.sm.updated['qcomGnss'] |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].stop() |  |  |  |     managed_processes['rawgpsd'].stop() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.check_assistance(False) |  |  |  |     self.check_assistance(False) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -115,8 +117,9 @@ class TestRawgpsd(unittest.TestCase): | 
			
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].start() |  |  |  |     managed_processes['rawgpsd'].start() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(17) |  |  |  |     self._wait_for_output(17) | 
			
		
	
		
		
			
				
					
					|  |  |  |     assert self.sm.updated['qcomGnss'] |  |  |  |     assert self.sm.updated['qcomGnss'] | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     os.system("sudo systemctl restart systemd-resolved") |  |  |  |     os.system("sudo systemctl restart systemd-resolved") | 
			
		
	
		
		
			
				
					
					|  |  |  |     self._wait_for_output(15) |  |  |  |     time.sleep(15) | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     managed_processes['rawgpsd'].stop() |  |  |  |     managed_processes['rawgpsd'].stop() | 
			
		
	
		
		
			
				
					
					|  |  |  |     self.check_assistance(True) |  |  |  |     self.check_assistance(True) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | 
 |