laikad: add laikad startup tests (#27207)
	
		
	
				
					
				
			* add laikad startup test * move debug files to xx * add laikad startup and ephemeris test --------- Co-authored-by: Kurt Nistelberger <kurt.nistelberger@gmail.com>pull/214/head
							parent
							
								
									d6516059bd
								
							
						
					
					
						commit
						74187463da
					
				
				 1 changed files with 105 additions and 0 deletions
			
			
		@ -0,0 +1,105 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
import os | 
				
			||||
import time | 
				
			||||
import unittest | 
				
			||||
 | 
				
			||||
import cereal.messaging as messaging | 
				
			||||
import selfdrive.sensord.pigeond as pd | 
				
			||||
 | 
				
			||||
from common.params import Params | 
				
			||||
from system.hardware import TICI | 
				
			||||
from selfdrive.manager.process_config import managed_processes | 
				
			||||
from selfdrive.test.helpers import with_processes | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def wait_for_location(sm, timeout, con=10): | 
				
			||||
  cons_meas = 0 | 
				
			||||
  start_time = time.monotonic() | 
				
			||||
  while (time.monotonic() - start_time) < timeout: | 
				
			||||
    sm.update() | 
				
			||||
    if not sm.updated["gnssMeasurements"]: | 
				
			||||
      continue | 
				
			||||
 | 
				
			||||
    msg = sm["gnssMeasurements"] | 
				
			||||
    cons_meas = (cons_meas + 1) if 'positionECEF' in msg.to_dict() else 0 | 
				
			||||
    if cons_meas >= con: | 
				
			||||
      return True | 
				
			||||
  return False | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class TestLaikad(unittest.TestCase): | 
				
			||||
  @classmethod | 
				
			||||
  def setUpClass(self): | 
				
			||||
    if not TICI: | 
				
			||||
      raise unittest.SkipTest | 
				
			||||
 | 
				
			||||
    ublox_available = Params().get_bool("UbloxAvailable") | 
				
			||||
    if not ublox_available: | 
				
			||||
      raise unittest.SkipTest | 
				
			||||
 | 
				
			||||
  def setUp(self): | 
				
			||||
    # ensure laikad cold start | 
				
			||||
    Params().remove("LaikadEphemeris") | 
				
			||||
    os.environ["LAIKAD_NO_INTERNET"] = "1" | 
				
			||||
    managed_processes['laikad'].start() | 
				
			||||
 | 
				
			||||
  def tearDown(self): | 
				
			||||
    managed_processes['laikad'].stop() | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  @with_processes(['pigeond', 'ubloxd']) | 
				
			||||
  def test_laikad_cold_start(self): | 
				
			||||
    time.sleep(5) | 
				
			||||
 | 
				
			||||
    start_time = time.monotonic() | 
				
			||||
    sm = messaging.SubMaster(["gnssMeasurements"]) | 
				
			||||
 | 
				
			||||
    success = wait_for_location(sm, 60*2, con=10) | 
				
			||||
    duration = time.monotonic() - start_time | 
				
			||||
 | 
				
			||||
    assert success, "Waiting for location timed out (2min)!" | 
				
			||||
    assert duration < 60, f"Received Location {duration}!" | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  @with_processes(['ubloxd']) | 
				
			||||
  def test_laikad_ublox_reset_start(self): | 
				
			||||
    time.sleep(2) | 
				
			||||
 | 
				
			||||
    pigeon, pm = pd.create_pigeon() | 
				
			||||
    pd.init_baudrate(pigeon) | 
				
			||||
    assert pigeon.reset_device(), "Could not reset device!" | 
				
			||||
 | 
				
			||||
    laikad_sock = messaging.sub_sock("gnssMeasurements", timeout=0.1) | 
				
			||||
    ublox_gnss_sock = messaging.sub_sock("ubloxGnss", timeout=0.1) | 
				
			||||
 | 
				
			||||
    pd.init_baudrate(pigeon) | 
				
			||||
    pd.initialize_pigeon(pigeon) | 
				
			||||
    pd.run_receiving(pigeon, pm, 180) | 
				
			||||
 | 
				
			||||
    ublox_msgs = messaging.drain_sock(ublox_gnss_sock) | 
				
			||||
    laikad_msgs = messaging.drain_sock(laikad_sock) | 
				
			||||
 | 
				
			||||
    gps_ephem_cnt = 0 | 
				
			||||
    glonass_ephem_cnt = 0 | 
				
			||||
    for um in ublox_msgs: | 
				
			||||
      if um.ubloxGnss.which() == 'ephemeris': | 
				
			||||
        gps_ephem_cnt += 1 | 
				
			||||
      elif um.ubloxGnss.which() == 'glonassEphemeris': | 
				
			||||
        glonass_ephem_cnt += 1 | 
				
			||||
 | 
				
			||||
    assert gps_ephem_cnt > 0, "NO gps ephemeris collected!" | 
				
			||||
    assert glonass_ephem_cnt > 0, "NO glonass ephemeris collected!" | 
				
			||||
 | 
				
			||||
    pos_meas = 0 | 
				
			||||
    duration = -1 | 
				
			||||
    for lm in laikad_msgs: | 
				
			||||
      pos_meas = (pos_meas + 1) if 'positionECEF' in lm.gnssMeasurements.to_dict() else 0 | 
				
			||||
      if pos_meas > 5: | 
				
			||||
        duration = (lm.logMonoTime - laikad_msgs[0].logMonoTime)*1e-9 | 
				
			||||
        break | 
				
			||||
 | 
				
			||||
    assert pos_meas > 5, "NOT enough positions at end of read!" | 
				
			||||
    assert duration < 120, "Laikad took too long to get a Position!" | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  unittest.main() | 
				
			||||
					Loading…
					
					
				
		Reference in new issue