| 
						
						
							
								
							
						
						
					 | 
					 | 
					@ -6,6 +6,7 @@ import time | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					import datetime | 
					 | 
					 | 
					 | 
					import datetime | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					import unittest | 
					 | 
					 | 
					 | 
					import unittest | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					import subprocess | 
					 | 
					 | 
					 | 
					import subprocess | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					from parameterized import parameterized | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					import cereal.messaging as messaging | 
					 | 
					 | 
					 | 
					import cereal.messaging as messaging | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem | 
					 | 
					 | 
					 | 
					from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem | 
				
			
			
		
	
	
		
		
			
				
					| 
						
							
								
							
						
						
							
								
							
						
						
					 | 
					 | 
					@ -57,24 +58,24 @@ class TestRawgpsd(unittest.TestCase): | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    os.system("sudo systemctl restart ModemManager") | 
					 | 
					 | 
					 | 
					    os.system("sudo systemctl restart ModemManager") | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    assert self._wait_for_output(30) | 
					 | 
					 | 
					 | 
					    assert self._wait_for_output(30) | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  def test_startup_time(self): | 
					 | 
					 | 
					 | 
					  @parameterized.expand([(b,) for b in (True, False)]) | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    for internet in (True, False): | 
					 | 
					 | 
					 | 
					  def test_startup_time(self, internet): | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      if not internet: | 
					 | 
					 | 
					 | 
					    if not internet: | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        os.system("sudo systemctl stop systemd-resolved") | 
					 | 
					 | 
					 | 
					      os.system("sudo systemctl stop systemd-resolved") | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      with self.subTest(internet=internet): | 
					 | 
					 | 
					 | 
					    with self.subTest(internet=internet): | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        managed_processes['qcomgpsd'].start() | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        assert self._wait_for_output(7) | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        managed_processes['qcomgpsd'].stop() | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  def test_turns_off_gnss(self): | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    for s in (0.1, 1, 5): | 
					 | 
					 | 
					 | 
					 | 
				
			
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      managed_processes['qcomgpsd'].start() | 
					 | 
					 | 
					 | 
					      managed_processes['qcomgpsd'].start() | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      time.sleep(s) | 
					 | 
					 | 
					 | 
					      assert self._wait_for_output(7) | 
				
			
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      managed_processes['qcomgpsd'].stop() | 
					 | 
					 | 
					 | 
					      managed_processes['qcomgpsd'].stop() | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      ls = subprocess.check_output("mmcli -m any --location-status --output-json", shell=True, encoding='utf-8') | 
					 | 
					 | 
					 | 
					  @parameterized.expand([(t,) for t in (0.1, 1, 5)]) | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      loc_status = json.loads(ls) | 
					 | 
					 | 
					 | 
					  def test_turns_off_gnss(self, runtime): | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'} | 
					 | 
					 | 
					 | 
					    managed_processes['qcomgpsd'].start() | 
				
			
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    time.sleep(runtime) | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    managed_processes['qcomgpsd'].stop() | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    ls = subprocess.check_output("mmcli -m any --location-status --output-json", shell=True, encoding='utf-8') | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    loc_status = json.loads(ls) | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'} | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  def check_assistance(self, should_be_loaded): | 
					 | 
					 | 
					 | 
					  def check_assistance(self, should_be_loaded): | 
				
			
			
		
	
	
		
		
			
				
					| 
						
							
								
							
						
						
						
					 | 
					 | 
					
  |