You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					124 lines
				
				4.0 KiB
			
		
		
			
		
	
	
					124 lines
				
				4.0 KiB
			| 
											3 years ago
										 | #!/usr/bin/env python3
 | ||
| 
											2 years ago
										 | import os
 | ||
| 
											2 years ago
										 | import pytest
 | ||
| 
											3 years ago
										 | import json
 | ||
|  | import time
 | ||
| 
											2 years ago
										 | import datetime
 | ||
| 
											3 years ago
										 | import unittest
 | ||
|  | import subprocess
 | ||
|  | 
 | ||
|  | import cereal.messaging as messaging
 | ||
| 
											2 years ago
										 | from openpilot.system.qcomgpsd.qcomgpsd import at_cmd, wait_for_modem
 | ||
| 
											2 years ago
										 | from openpilot.selfdrive.manager.process_config import managed_processes
 | ||
| 
											3 years ago
										 | 
 | ||
| 
											2 years ago
										 | GOOD_SIGNAL = bool(int(os.getenv("GOOD_SIGNAL", '0')))
 | ||
|  | 
 | ||
|  | 
 | ||
| 
											2 years ago
										 | @pytest.mark.tici
 | ||
| 
											3 years ago
										 | class TestRawgpsd(unittest.TestCase):
 | ||
|  |   @classmethod
 | ||
|  |   def setUpClass(cls):
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl start systemd-resolved")
 | ||
|  |     os.system("sudo systemctl restart ModemManager lte")
 | ||
|  |     wait_for_modem()
 | ||
| 
											2 years ago
										 | 
 | ||
|  |   @classmethod
 | ||
|  |   def tearDownClass(cls):
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].stop()
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl restart systemd-resolved")
 | ||
|  |     os.system("sudo systemctl restart ModemManager lte")
 | ||
| 
											3 years ago
										 | 
 | ||
| 
											2 years ago
										 |   def setUp(self):
 | ||
|  |     at_cmd("AT+QGPSDEL=0")
 | ||
| 
											2 years ago
										 |     self.sm = messaging.SubMaster(['qcomGnss', 'gpsLocation', 'gnssMeasurements'])
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											3 years ago
										 |   def tearDown(self):
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].stop()
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl restart systemd-resolved")
 | ||
| 
											3 years ago
										 | 
 | ||
| 
											2 years ago
										 |   def _wait_for_output(self, t):
 | ||
|  |     dt = 0.1
 | ||
|  |     for _ in range(t*int(1/dt)):
 | ||
|  |       self.sm.update(0)
 | ||
|  |       if self.sm.updated['qcomGnss']:
 | ||
|  |         break
 | ||
|  |       time.sleep(dt)
 | ||
|  |     return self.sm.updated['qcomGnss']
 | ||
| 
											2 years ago
										 | 
 | ||
|  |   def test_no_crash_double_command(self):
 | ||
|  |     at_cmd("AT+QGPSDEL=0")
 | ||
|  |     at_cmd("AT+QGPSDEL=0")
 | ||
| 
											2 years ago
										 | 
 | ||
|  |   def test_wait_for_modem(self):
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl stop ModemManager")
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].start()
 | ||
| 
											2 years ago
										 |     assert not self._wait_for_output(5)
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl restart ModemManager")
 | ||
|  |     assert self._wait_for_output(30)
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											3 years ago
										 |   def test_startup_time(self):
 | ||
| 
											2 years ago
										 |     for internet in (True, False):
 | ||
|  |       if not internet:
 | ||
| 
											2 years ago
										 |         os.system("sudo systemctl stop systemd-resolved")
 | ||
| 
											2 years ago
										 |       with self.subTest(internet=internet):
 | ||
| 
											2 years ago
										 |         managed_processes['qcomgpsd'].start()
 | ||
| 
											2 years ago
										 |         assert self._wait_for_output(7)
 | ||
| 
											2 years ago
										 |         managed_processes['qcomgpsd'].stop()
 | ||
| 
											3 years ago
										 | 
 | ||
|  |   def test_turns_off_gnss(self):
 | ||
| 
											2 years ago
										 |     for s in (0.1, 1, 5):
 | ||
| 
											2 years ago
										 |       managed_processes['qcomgpsd'].start()
 | ||
| 
											3 years ago
										 |       time.sleep(s)
 | ||
| 
											2 years ago
										 |       managed_processes['qcomgpsd'].stop()
 | ||
| 
											3 years ago
										 | 
 | ||
| 
											3 years ago
										 |       ls = subprocess.check_output("mmcli -m any --location-status --output-json", shell=True, encoding='utf-8')
 | ||
| 
											3 years ago
										 |       loc_status = json.loads(ls)
 | ||
|  |       assert set(loc_status['modem']['location']['enabled']) <= {'3gpp-lac-ci'}
 | ||
|  | 
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											2 years ago
										 |   def check_assistance(self, should_be_loaded):
 | ||
| 
											2 years ago
										 |     # after QGPSDEL: '+QGPSXTRADATA: 0,"1980/01/05,19:00:00"'
 | ||
|  |     # after loading: '+QGPSXTRADATA: 10080,"2023/06/24,19:00:00"'
 | ||
|  |     out = at_cmd("AT+QGPSXTRADATA?")
 | ||
|  |     out = out.split("+QGPSXTRADATA:")[1].split("'")[0].strip()
 | ||
| 
											2 years ago
										 |     valid_duration, injected_time_str = out.split(",", 1)
 | ||
| 
											2 years ago
										 |     if should_be_loaded:
 | ||
|  |       assert valid_duration == "10080"  # should be max time
 | ||
|  |       injected_time = datetime.datetime.strptime(injected_time_str.replace("\"", ""), "%Y/%m/%d,%H:%M:%S")
 | ||
|  |       self.assertLess(abs((datetime.datetime.utcnow() - injected_time).total_seconds()), 60*60*12)
 | ||
|  |     else:
 | ||
|  |       valid_duration, injected_time_str = out.split(",", 1)
 | ||
|  |       injected_time_str = injected_time_str.replace('\"', '').replace('\'', '')
 | ||
|  |       assert injected_time_str[:] == '1980/01/05,19:00:00'[:]
 | ||
|  |       assert valid_duration == '0'
 | ||
|  | 
 | ||
|  |   def test_assistance_loading(self):
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].start()
 | ||
| 
											2 years ago
										 |     assert self._wait_for_output(10)
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].stop()
 | ||
| 
											2 years ago
										 |     self.check_assistance(True)
 | ||
|  | 
 | ||
|  |   def test_no_assistance_loading(self):
 | ||
|  |     os.system("sudo systemctl stop systemd-resolved")
 | ||
|  | 
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].start()
 | ||
| 
											2 years ago
										 |     assert self._wait_for_output(10)
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].stop()
 | ||
| 
											2 years ago
										 |     self.check_assistance(False)
 | ||
|  | 
 | ||
|  |   def test_late_assistance_loading(self):
 | ||
|  |     os.system("sudo systemctl stop systemd-resolved")
 | ||
|  | 
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].start()
 | ||
| 
											2 years ago
										 |     self._wait_for_output(17)
 | ||
|  |     assert self.sm.updated['qcomGnss']
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											2 years ago
										 |     os.system("sudo systemctl restart systemd-resolved")
 | ||
| 
											2 years ago
										 |     time.sleep(15)
 | ||
| 
											2 years ago
										 |     managed_processes['qcomgpsd'].stop()
 | ||
| 
											2 years ago
										 |     self.check_assistance(True)
 | ||
| 
											2 years ago
										 | 
 | ||
| 
											3 years ago
										 | if __name__ == "__main__":
 | ||
| 
											2 years ago
										 |   unittest.main(failfast=True)
 |