You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							93 lines
						
					
					
						
							2.2 KiB
						
					
					
				
			
		
		
	
	
							93 lines
						
					
					
						
							2.2 KiB
						
					
					
				import os
 | 
						|
os.environ['FAKEUPLOAD'] = "1"
 | 
						|
 | 
						|
from common.testing import phone_only
 | 
						|
from selfdrive.manager import manager_init, manager_prepare
 | 
						|
from selfdrive.manager import start_managed_process, kill_managed_process, get_running
 | 
						|
from functools import wraps
 | 
						|
import time
 | 
						|
 | 
						|
DID_INIT = False
 | 
						|
 | 
						|
# must run first
 | 
						|
@phone_only
 | 
						|
def test_manager_prepare():
 | 
						|
  global DID_INIT
 | 
						|
  manager_init()
 | 
						|
  manager_prepare()
 | 
						|
  DID_INIT = True
 | 
						|
 | 
						|
def with_processes(processes):
 | 
						|
  def wrapper(func):
 | 
						|
    @wraps(func)
 | 
						|
    def wrap():
 | 
						|
      if not DID_INIT:
 | 
						|
        test_manager_prepare()
 | 
						|
 | 
						|
      # start and assert started
 | 
						|
      [start_managed_process(p) for p in processes]
 | 
						|
      assert all(get_running()[name].exitcode is None for name in processes)
 | 
						|
 | 
						|
      # call the function
 | 
						|
      func()
 | 
						|
 | 
						|
      # assert processes are still started
 | 
						|
      assert all(get_running()[name].exitcode is None for name in processes)
 | 
						|
 | 
						|
      # kill and assert all stopped
 | 
						|
      [kill_managed_process(p) for p in processes]
 | 
						|
      assert len(get_running()) == 0
 | 
						|
    return wrap
 | 
						|
  return wrapper
 | 
						|
 | 
						|
#@phone_only
 | 
						|
#@with_processes(['controlsd', 'radard'])
 | 
						|
#def test_controls():
 | 
						|
#  from selfdrive.test.plant.plant import Plant
 | 
						|
#
 | 
						|
#  # start the fake car for 2 seconds
 | 
						|
#  plant = Plant(100)
 | 
						|
#  for i in range(200):
 | 
						|
#    if plant.rk.frame >= 20 and plant.rk.frame <= 25:
 | 
						|
#      cruise_buttons = CruiseButtons.RES_ACCEL
 | 
						|
#      # rolling forward
 | 
						|
#      assert plant.speed > 0
 | 
						|
#    else:
 | 
						|
#      cruise_buttons = 0
 | 
						|
#    plant.step(cruise_buttons = cruise_buttons)
 | 
						|
#  plant.close()
 | 
						|
#
 | 
						|
#  # assert that we stopped
 | 
						|
#  assert plant.speed == 0.0
 | 
						|
 | 
						|
@phone_only
 | 
						|
@with_processes(['loggerd', 'logmessaged', 'tombstoned', 'proclogd', 'logcatd'])
 | 
						|
def test_logging():
 | 
						|
  print "LOGGING IS SET UP"
 | 
						|
  time.sleep(1.0)
 | 
						|
 | 
						|
@phone_only
 | 
						|
@with_processes(['visiond'])
 | 
						|
def test_visiond():
 | 
						|
  print "VISIOND IS SET UP"
 | 
						|
  time.sleep(5.0)
 | 
						|
 | 
						|
@phone_only
 | 
						|
@with_processes(['sensord'])
 | 
						|
def test_sensord():
 | 
						|
  print "SENSORS ARE SET UP"
 | 
						|
  time.sleep(1.0)
 | 
						|
 | 
						|
@phone_only
 | 
						|
@with_processes(['ui'])
 | 
						|
def test_ui():
 | 
						|
  print "RUNNING UI"
 | 
						|
  time.sleep(1.0)
 | 
						|
 | 
						|
# will have one thing to upload if loggerd ran
 | 
						|
# TODO: assert it actually uploaded
 | 
						|
@phone_only
 | 
						|
@with_processes(['uploader'])
 | 
						|
def test_uploader():
 | 
						|
  print "UPLOADER"
 | 
						|
  time.sleep(10.0)
 | 
						|
 |