You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							124 lines
						
					
					
						
							4.0 KiB
						
					
					
				
			
		
		
	
	
							124 lines
						
					
					
						
							4.0 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import os
 | |
| import pytest
 | |
| import json
 | |
| import time
 | |
| import datetime
 | |
| import unittest
 | |
| import subprocess
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem
 | |
| from openpilot.selfdrive.manager.process_config import managed_processes
 | |
| 
 | |
| GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0')))
 | |
| 
 | |
| 
 | |
| @pytest.mark.tici
 | |
| class TestRawgpsd(unittest.TestCase):
 | |
|   @classmethod
 | |
|   def setUpClass(cls):
 | |
|     os.system("sudo systemctl start systemd-resolved")
 | |
|     os.system("sudo systemctl restart ModemManager lte")
 | |
|     wait_for_modem()
 | |
| 
 | |
|   @classmethod
 | |
|   def tearDownClass(cls):
 | |
|     managed_processes['qcomgpsd'].stop()
 | |
|     os.system("sudo systemctl restart systemd-resolved")
 | |
|     os.system("sudo systemctl restart ModemManager lte")
 | |
| 
 | |
|   def setUp(self):
 | |
|     at_cmd("AT+QGPSDEL=0")
 | |
|     self.sm = messaging.SubMaster(['qcomGnss', 'gpsLocation', 'gnssMeasurements'])
 | |
| 
 | |
|   def tearDown(self):
 | |
|     managed_processes['qcomgpsd'].stop()
 | |
|     os.system("sudo systemctl restart systemd-resolved")
 | |
| 
 | |
|   def _wait_for_output(self, t):
 | |
|     dt = 0.1
 | |
|     for _ in range(t*int(1/dt)):
 | |
|       self.sm.update(0)
 | |
|       if self.sm.updated['qcomGnss']:
 | |
|         break
 | |
|       time.sleep(dt)
 | |
|     return self.sm.updated['qcomGnss']
 | |
| 
 | |
|   def test_no_crash_double_command(self):
 | |
|     at_cmd("AT+QGPSDEL=0")
 | |
|     at_cmd("AT+QGPSDEL=0")
 | |
| 
 | |
|   def test_wait_for_modem(self):
 | |
|     os.system("sudo systemctl stop ModemManager")
 | |
|     managed_processes['qcomgpsd'].start()
 | |
|     assert not self._wait_for_output(5)
 | |
| 
 | |
|     os.system("sudo systemctl restart ModemManager")
 | |
|     assert self._wait_for_output(30)
 | |
| 
 | |
|   def test_startup_time(self):
 | |
|     for internet in (True, False):
 | |
|       if not internet:
 | |
|         os.system("sudo systemctl stop systemd-resolved")
 | |
|       with self.subTest(internet=internet):
 | |
|         managed_processes['qcomgpsd'].start()
 | |
|         assert self._wait_for_output(7)
 | |
|         managed_processes['qcomgpsd'].stop()
 | |
| 
 | |
|   def test_turns_off_gnss(self):
 | |
|     for s in (0.1, 1, 5):
 | |
|       with self.subTest(runtime=s):
 | |
|         managed_processes['qcomgpsd'].start()
 | |
|         time.sleep(s)
 | |
|         managed_processes['qcomgpsd'].stop()
 | |
| 
 | |
|         ls = subprocess.check_output("mmcli -m any --location-status --output-json", shell=True, encoding='utf-8')
 | |
|         loc_status = json.loads(ls)
 | |
|         assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'}
 | |
| 
 | |
| 
 | |
|   def check_assistance(self, should_be_loaded):
 | |
|     # after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"'
 | |
|     # after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"'
 | |
|     out = at_cmd("AT+QGPSXTRADATA?")
 | |
|     out = out.split("+QGPSXTRADATA:")[1].split("'")[0].strip()
 | |
|     valid_duration, injected_time_str = out.split(",", 1)
 | |
|     if should_be_loaded:
 | |
|       assert valid_duration == "10080"  # should be max time
 | |
|       injected_time = datetime.datetime.strptime(injected_time_str.replace("\"", ""), "%Y/%m/%d,%H:%M:%S")
 | |
|       self.assertLess(abs((datetime.datetime.utcnow() - injected_time).total_seconds()), 60*60*12)
 | |
|     else:
 | |
|       valid_duration, injected_time_str = out.split(",", 1)
 | |
|       injected_time_str = injected_time_str.replace('\"', '').replace('\'', '')
 | |
|       assert injected_time_str[:] == '1980/01/05,19:00:00'[:]
 | |
|       assert valid_duration == '0'
 | |
| 
 | |
|   def test_assistance_loading(self):
 | |
|     managed_processes['qcomgpsd'].start()
 | |
|     assert self._wait_for_output(10)
 | |
|     managed_processes['qcomgpsd'].stop()
 | |
|     self.check_assistance(True)
 | |
| 
 | |
|   def test_no_assistance_loading(self):
 | |
|     os.system("sudo systemctl stop systemd-resolved")
 | |
| 
 | |
|     managed_processes['qcomgpsd'].start()
 | |
|     assert self._wait_for_output(10)
 | |
|     managed_processes['qcomgpsd'].stop()
 | |
|     self.check_assistance(False)
 | |
| 
 | |
|   def test_late_assistance_loading(self):
 | |
|     os.system("sudo systemctl stop systemd-resolved")
 | |
| 
 | |
|     managed_processes['qcomgpsd'].start()
 | |
|     self._wait_for_output(17)
 | |
|     assert self.sm.updated['qcomGnss']
 | |
| 
 | |
|     os.system("sudo systemctl restart systemd-resolved")
 | |
|     time.sleep(15)
 | |
|     managed_processes['qcomgpsd'].stop()
 | |
|     self.check_assistance(True)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   unittest.main(failfast=True)
 | |
| 
 |