You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							50 lines
						
					
					
						
							1.2 KiB
						
					
					
				
			
		
		
	
	
							50 lines
						
					
					
						
							1.2 KiB
						
					
					
				| """
 | |
| Utilities for generating mock messages for testing.
 | |
| example in common/tests/test_mock.py
 | |
| """
 | |
| 
 | |
| 
 | |
| import functools
 | |
| import threading
 | |
| from cereal.messaging import PubMaster
 | |
| from cereal.services import SERVICE_LIST
 | |
| from openpilot.common.mock.generators import generate_livePose
 | |
| from openpilot.common.realtime import Ratekeeper
 | |
| 
 | |
| 
 | |
| MOCK_GENERATOR = {
 | |
|   "livePose": generate_livePose
 | |
| }
 | |
| 
 | |
| 
 | |
| def generate_messages_loop(services: list[str], done: threading.Event):
 | |
|   pm = PubMaster(services)
 | |
|   rk = Ratekeeper(100)
 | |
|   i = 0
 | |
|   while not done.is_set():
 | |
|     for s in services:
 | |
|       should_send = i % (100/SERVICE_LIST[s].frequency) == 0
 | |
|       if should_send:
 | |
|         message = MOCK_GENERATOR[s]()
 | |
|         pm.send(s, message)
 | |
|     i += 1
 | |
|     rk.keep_time()
 | |
| 
 | |
| 
 | |
| def mock_messages(services: list[str] | str):
 | |
|   if isinstance(services, str):
 | |
|     services = [services]
 | |
| 
 | |
|   def decorator(func):
 | |
|     @functools.wraps(func)
 | |
|     def wrapper(*args, **kwargs):
 | |
|       done = threading.Event()
 | |
|       t = threading.Thread(target=generate_messages_loop, args=(services, done))
 | |
|       t.start()
 | |
|       try:
 | |
|         return func(*args, **kwargs)
 | |
|       finally:
 | |
|         done.set()
 | |
|         t.join()
 | |
|     return wrapper
 | |
|   return decorator
 | |
| 
 |