open source driving agent
				
			 
			
		 
		
		
		
		
		
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
	
	
		
		
			
	
	
		
			
				
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | Utilities for generating mock messages for testing.
 | 
					
						
							|  |  |  | example in common/tests/test_mock.py
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import functools
 | 
					
						
							|  |  |  | import threading
 | 
					
						
							|  |  |  | from cereal.messaging import PubMaster
 | 
					
						
							|  |  |  | from cereal.services import SERVICE_LIST
 | 
					
						
							|  |  |  | from openpilot.common.mock.generators import generate_liveLocationKalman
 | 
					
						
							|  |  |  | from openpilot.common.realtime import Ratekeeper
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | MOCK_GENERATOR = {
 | 
					
						
							|  |  |  |   "liveLocationKalman": generate_liveLocationKalman
 | 
					
						
							|  |  |  | }
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def generate_messages_loop(services: list[str], done: threading.Event):
 | 
					
						
							|  |  |  |   pm = PubMaster(services)
 | 
					
						
							|  |  |  |   rk = Ratekeeper(100)
 | 
					
						
							|  |  |  |   i = 0
 | 
					
						
							|  |  |  |   while not done.is_set():
 | 
					
						
							|  |  |  |     for s in services:
 | 
					
						
							|  |  |  |       should_send = i % (100/SERVICE_LIST[s].frequency) == 0
 | 
					
						
							|  |  |  |       if should_send:
 | 
					
						
							|  |  |  |         message = MOCK_GENERATOR[s]()
 | 
					
						
							|  |  |  |         pm.send(s, message)
 | 
					
						
							|  |  |  |     i += 1
 | 
					
						
							|  |  |  |     rk.keep_time()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def mock_messages(services: list[str] | str):
 | 
					
						
							|  |  |  |   if isinstance(services, str):
 | 
					
						
							|  |  |  |     services = [services]
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def decorator(func):
 | 
					
						
							|  |  |  |     @functools.wraps(func)
 | 
					
						
							|  |  |  |     def wrapper(*args, **kwargs):
 | 
					
						
							|  |  |  |       done = threading.Event()
 | 
					
						
							|  |  |  |       t = threading.Thread(target=generate_messages_loop, args=(services, done))
 | 
					
						
							|  |  |  |       t.start()
 | 
					
						
							|  |  |  |       try:
 | 
					
						
							|  |  |  |         return func(*args, **kwargs)
 | 
					
						
							|  |  |  |       finally:
 | 
					
						
							|  |  |  |         done.set()
 | 
					
						
							|  |  |  |         t.join()
 | 
					
						
							|  |  |  |     return wrapper
 | 
					
						
							|  |  |  |   return decorator
 |