import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  uuid 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params ,  ParamKeyType ,  UnknownKeyName 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestParams ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_put_and_get ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " DongleId " ,  " cb38263377b873ee " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " DongleId " )  ==  b " cb38263377b873ee " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_non_ascii ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    st  =  b " \xe1 \x90 \xff " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " CarParams " ,  st ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  ==  st 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_get_cleared_manager_start ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " CarParams " ,  " test " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " DongleId " ,  " cb38263377b873ee " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  ==  b " test " 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    undefined_param  =  self . params . get_param_path ( uuid . uuid4 ( ) . hex ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  open ( undefined_param ,  " w " )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      f . write ( " test " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  os . path . isfile ( undefined_param ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . params . clear_all ( ParamKeyType . CLEAR_ON_MANAGER_START ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " DongleId " )  is  not  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  not  os . path . isfile ( undefined_param ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_two_things ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " DongleId " ,  " bob " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . params . put ( " AthenadPid " ,  " 123 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " DongleId " )  ==  b " bob " 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  self . params . get ( " AthenadPid " )  ==  b " 123 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_get_block ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def  _delayed_writer ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 0.1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . params . put ( " CarParams " ,  " test " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    threading . Thread ( target = _delayed_writer ) . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " ,  True )  ==  b " test " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_params_unknown_key_fails ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  self . assertRaises ( UnknownKeyName ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . params . get ( " swag " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    with  self . assertRaises ( UnknownKeyName ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . params . get_bool ( " swag " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  self . assertRaises ( UnknownKeyName ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . params . put ( " swag " ,  " abc " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  self . assertRaises ( UnknownKeyName ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . params . put_bool ( " swag " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_remove_not_there ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . params . remove ( " CarParams " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  self . params . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_get_bool ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . params . remove ( " IsMetric " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertFalse ( self . params . get_bool ( " IsMetric " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put_bool ( " IsMetric " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( self . params . get_bool ( " IsMetric " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put_bool ( " IsMetric " ,  False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( self . params . get_bool ( " IsMetric " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " IsMetric " ,  " 1 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( self . params . get_bool ( " IsMetric " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . params . put ( " IsMetric " ,  " 0 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( self . params . get_bool ( " IsMetric " ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_put_non_blocking_with_get_block ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    q  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    def  _delayed_writer ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 0.1 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      Params ( ) . put_nonblocking ( " CarParams " ,  " test " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    threading . Thread ( target = _delayed_writer ) . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  q . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  q . get ( " CarParams " ,  True )  ==  b " test " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_put_bool_non_blocking_with_get_block ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    q  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    def  _delayed_writer ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 0.1 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      Params ( ) . put_bool_nonblocking ( " CarParams " ,  True ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    threading . Thread ( target = _delayed_writer ) . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  q . get ( " CarParams " )  is  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  q . get ( " CarParams " ,  True )  ==  b " 1 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_params_all_keys ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    keys  =  Params ( ) . all_keys ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # sanity checks 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( keys )  >  20 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( keys )  ==  len ( set ( keys ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  b " CarParams "  in  keys 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )