openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models. 
				 
			
		 
		
		
		
		
		
		
			You can not select more than 25 topics 
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long. 
		
		
		
	
	
		
		
			
	
	
		
			
				
					
						
							
								
							 
							
								
							 
							
								 
							
							
								""" 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Utilities  for  generating  mock  messages  for  testing . 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								example  in  common / tests / test_mock . py 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								""" 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  functools 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . messaging  import  PubMaster 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . services  import  SERVICE_LIST 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . mock . generators  import  generate_liveLocationKalman 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . realtime  import  Ratekeeper 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								MOCK_GENERATOR  =  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  " liveLocationKalman " :  generate_liveLocationKalman 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  generate_messages_loop ( services :  list [ str ] ,  done :  threading . Event ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  pm  =  PubMaster ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  rk  =  Ratekeeper ( 100 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  i  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  not  done . is_set ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  s  in  services : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      should_send  =  i  %  ( 100 / SERVICE_LIST [ s ] . frequency )  ==  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  should_send : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        message  =  MOCK_GENERATOR [ s ] ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pm . send ( s ,  message ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    i  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    rk . keep_time ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  mock_messages ( services :  list [ str ]  |  str ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  isinstance ( services ,  str ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services  =  [ services ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  decorator ( func ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @functools . wraps ( func ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  wrapper ( * args ,  * * kwargs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      done  =  threading . Event ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      t  =  threading . Thread ( target = generate_messages_loop ,  args = ( services ,  done ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      t . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  func ( * args ,  * * kwargs ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        done . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        t . join ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  wrapper 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  decorator