@ -1,5 +1,6 @@ 
			
		
	
		
		
			
				
					
					#!/usr/bin/env python3 #!/usr/bin/env python3  
			
		
	
		
		
			
				
					
					from  functools  import  partial ,  wraps import  pytest  
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					from  functools  import  wraps  
			
		
	
		
		
			
				
					
					import  json import  json  
			
		
	
		
		
			
				
					
					import  multiprocessing import  multiprocessing  
			
		
	
		
		
			
				
					
					import  os import  os  
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -8,12 +9,9 @@ import shutil 
			
		
	
		
		
			
				
					
					import  time import  time  
			
		
	
		
		
			
				
					
					import  threading import  threading  
			
		
	
		
		
			
				
					
					import  queue import  queue  
			
		
	
		
		
			
				
					
					import  unittest  
			
		
	
		
		
			
				
					
					from  dataclasses  import  asdict ,  replace from  dataclasses  import  asdict ,  replace  
			
		
	
		
		
			
				
					
					from  datetime  import  datetime ,  timedelta from  datetime  import  datetime ,  timedelta  
			
		
	
		
		
			
				
					
					from  parameterized  import  parameterized  
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					from  unittest  import  mock  
			
		
	
		
		
			
				
					
					from  websocket  import  ABNF from  websocket  import  ABNF  
			
		
	
		
		
			
				
					
					from  websocket . _exceptions  import  WebSocketConnectionClosedException from  websocket . _exceptions  import  WebSocketConnectionClosedException  
			
		
	
		
		
			
				
					
					
 
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -24,7 +22,7 @@ from openpilot.common.timeout import Timeout 
			
		
	
		
		
			
				
					
					from  openpilot . selfdrive . athena  import  athenad from  openpilot . selfdrive . athena  import  athenad  
			
		
	
		
		
			
				
					
					from  openpilot . selfdrive . athena . athenad  import  MAX_RETRY_COUNT ,  dispatcher from  openpilot . selfdrive . athena . athenad  import  MAX_RETRY_COUNT ,  dispatcher  
			
		
	
		
		
			
				
					
					from  openpilot . selfdrive . athena . tests . helpers  import  HTTPRequestHandler ,  MockWebsocket ,  MockApi ,  EchoSocket from  openpilot . selfdrive . athena . tests . helpers  import  HTTPRequestHandler ,  MockWebsocket ,  MockApi ,  EchoSocket  
			
		
	
		
		
			
				
					
					from  openpilot . selfdrive . test . helpers  import  with_ http_serverfrom  openpilot . selfdrive . test . helpers  import  http_server_context   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					from  openpilot . system . hardware . hw  import  Paths from  openpilot . system . hardware . hw  import  Paths  
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -37,10 +35,6 @@ def seed_athena_server(host, port): 
			
		
	
		
		
			
				
					
					      except  requests . exceptions . ConnectionError :        except  requests . exceptions . ConnectionError :   
			
		
	
		
		
			
				
					
					        time . sleep ( 0.1 )          time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					with_mock_athena  =  partial ( with_http_server ,  handler = HTTPRequestHandler ,  setup = seed_athena_server )  
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					def  with_upload_handler ( func ) : def  with_upload_handler ( func ) :  
			
		
	
		
		
			
				
					
					  @wraps ( func )    @wraps ( func )   
			
		
	
		
		
			
				
					
					  def  wrapper ( * args ,  * * kwargs ) :    def  wrapper ( * args ,  * * kwargs ) :   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -54,15 +48,23 @@ def with_upload_handler(func): 
			
		
	
		
		
			
				
					
					      thread . join ( )        thread . join ( )   
			
		
	
		
		
			
				
					
					  return  wrapper    return  wrapper   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					@pytest . fixture  
			
		
	
		
		
			
				
					
					def  mock_create_connection ( mocker ) :  
			
		
	
		
		
			
				
					
					    return  mocker . patch ( ' openpilot.selfdrive.athena.athenad.create_connection ' )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					@pytest . fixture  
			
		
	
		
		
			
				
					
					def  host ( ) :  
			
		
	
		
		
			
				
					
					  with  http_server_context ( handler = HTTPRequestHandler ,  setup = seed_athena_server )  as  ( host ,  port ) :   
			
		
	
		
		
			
				
					
					    yield  f " http:// { host } : { port } "   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					class  TestAthenadMethods ( unittest . TestCase ) : class  TestAthenadMethods :  
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					  @classmethod    @classmethod   
			
		
	
		
		
			
				
					
					  def  setUpClass ( cls ) :    def  setup_c lass ( cls ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    cls . SOCKET_PORT  =  45454      cls . SOCKET_PORT  =  45454   
			
		
	
		
		
			
				
					
					    athenad . Api  =  MockApi      athenad . Api  =  MockApi   
			
		
	
		
		
			
				
					
					    athenad . LOCAL_PORT_WHITELIST  =  { cls . SOCKET_PORT }      athenad . LOCAL_PORT_WHITELIST  =  { cls . SOCKET_PORT }   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  setUp  ( self ) :    def  setup_method  ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    self . default_params  =  {      self . default_params  =  {   
			
		
	
		
		
			
				
					
					      " DongleId " :  " 0000000000000000 " ,        " DongleId " :  " 0000000000000000 " ,   
			
		
	
		
		
			
				
					
					      " GithubSshKeys " :  b " ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private " ,  # noqa: E501        " GithubSshKeys " :  b " ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private " ,  # noqa: E501   
			
		
	
	
		
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
					@ -109,8 +111,8 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					  def  test_echo ( self ) :    def  test_echo ( self ) :   
			
		
	
		
		
			
				
					
					    assert  dispatcher [ " echo " ] ( " bob " )  ==  " bob "      assert  dispatcher [ " echo " ] ( " bob " )  ==  " bob "   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_getM essage ( self ) :    def  test_get_m essage ( self ) :   
			
				
				
			
		
	
		
		
			
				
					
					    with  self . assertR aises( TimeoutError )  as  _ :      with  pytest . r aises( TimeoutError )  as  _ :   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					      dispatcher [ " getMessage " ] ( " controlsState " )        dispatcher [ " getMessage " ] ( " controlsState " )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    end_event  =  multiprocessing . Event ( )      end_event  =  multiprocessing . Event ( )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -133,7 +135,7 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					      end_event . set ( )        end_event . set ( )   
			
		
	
		
		
			
				
					
					      p . join ( )        p . join ( )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_listDataD irectory ( self ) :    def  test_list_data_d irectory ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    route  =  ' 2021-03-29--13-32-47 '      route  =  ' 2021-03-29--13-32-47 '   
			
		
	
		
		
			
				
					
					    segments  =  [ 0 ,  1 ,  2 ,  3 ,  11 ]      segments  =  [ 0 ,  1 ,  2 ,  3 ,  11 ]   
			
		
	
		
		
			
				
					
					
 
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -143,69 +145,66 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					      self . _create_file ( file )        self . _create_file ( file )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( )      resp  =  dispatcher [ " listDataDirectory " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertTrue ( resp ,  ' list empty! ' )      assert  resp ,  ' list empty! '   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  files )      assert  len ( resp )  ==  len ( files )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( f ' { route } --123 ' )      resp  =  dispatcher [ " listDataDirectory " ] ( f ' { route } --123 ' )   
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  [ ] )      assert  len ( resp )  ==  0   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    prefix  =  f ' { route } '      prefix  =  f ' { route } '   
			
		
	
		
		
			
				
					
					    expected  =  filter ( lambda  f :  f . startswith ( prefix ) ,  files )      expected  =  list ( filter ( lambda  f :  f . startswith ( prefix ) ,  files ) )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( prefix )      resp  =  dispatcher [ " listDataDirectory " ] ( prefix )   
			
		
	
		
		
			
				
					
					    self . assertTrue ( resp ,  ' list empty! ' )      assert  resp ,  ' list empty! '   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  expected )      assert  len ( resp )  ==  len ( expected )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    prefix  =  f ' { route } --1 '      prefix  =  f ' { route } --1 '   
			
		
	
		
		
			
				
					
					    expected  =  filter ( lambda  f :  f . startswith ( prefix ) ,  files )      expected  =  list ( filter ( lambda  f :  f . startswith ( prefix ) ,  files ) )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( prefix )      resp  =  dispatcher [ " listDataDirectory " ] ( prefix )   
			
		
	
		
		
			
				
					
					    self . assertTrue ( resp ,  ' list empty! ' )      assert  resp ,  ' list empty! '   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  expected )      assert  len ( resp )  ==  len ( expected )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    prefix  =  f ' { route } --1/ '      prefix  =  f ' { route } --1/ '   
			
		
	
		
		
			
				
					
					    expected  =  filter ( lambda  f :  f . startswith ( prefix ) ,  files )      expected  =  list ( filter ( lambda  f :  f . startswith ( prefix ) ,  files ) )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( prefix )      resp  =  dispatcher [ " listDataDirectory " ] ( prefix )   
			
		
	
		
		
			
				
					
					    self . assertTrue ( resp ,  ' list empty! ' )      assert  resp ,  ' list empty! '   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  expected )      assert  len ( resp )  ==  len ( expected )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    prefix  =  f ' { route } --1/q '      prefix  =  f ' { route } --1/q '   
			
		
	
		
		
			
				
					
					    expected  =  filter ( lambda  f :  f . startswith ( prefix ) ,  files )      expected  =  list ( filter ( lambda  f :  f . startswith ( prefix ) ,  files ) )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " listDataDirectory " ] ( prefix )      resp  =  dispatcher [ " listDataDirectory " ] ( prefix )   
			
		
	
		
		
			
				
					
					    self . assertTrue ( resp ,  ' list empty! ' )      assert  resp ,  ' list empty! '   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertCountEqual ( resp ,  expected )      assert  len ( resp )  ==  len ( expected )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_strip_bz2_extension ( self ) :    def  test_strip_bz2_extension ( self ) :   
			
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog.bz2 ' )      fn  =  self . _create_file ( ' qlog.bz2 ' )   
			
		
	
		
		
			
				
					
					    if  fn . endswith ( ' .bz2 ' ) :      if  fn . endswith ( ' .bz2 ' ) :   
			
		
	
		
		
			
				
					
					      self . assertEqual ( athenad . strip_bz2_extension ( fn ) , fn [ : - 4 ] )        assert  athenad . strip_bz2_extension ( fn )  ==   fn [ : - 4 ]   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @parameterized . expand ( [ ( True , ) ,  ( False , ) ] )    @pytest . mark . parametrize ( " compress " ,  [ True ,  False ] )   
			
				
				
			
		
	
		
		
			
				
					
					  @with_mock_athena    def  test_do_upload ( self ,  host ,  compress ) :   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_do_upload ( self ,  compress ,  host ) :   
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					    # random bytes to ensure rather large object post-compression      # random bytes to ensure rather large object post-compression   
			
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog ' ,  data = os . urandom ( 10000  *  1024 ) )      fn  =  self . _create_file ( ' qlog ' ,  data = os . urandom ( 10000  *  1024 ) )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    upload_fn  =  fn  +  ( ' .bz2 '  if  compress  else  ' ' )      upload_fn  =  fn  +  ( ' .bz2 '  if  compress  else  ' ' )   
			
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = upload_fn ,  url = " http://localhost:1238 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' )      item  =  athenad . UploadItem ( path = upload_fn ,  url = " http://localhost:1238 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' )   
			
		
	
		
		
			
				
					
					    with  self . assertR aises( requests . exceptions . ConnectionError ) :      with  pytest . r aises( requests . exceptions . ConnectionError ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					      athenad . _do_upload ( item )        athenad . _do_upload ( item )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = upload_fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' )      item  =  athenad . UploadItem ( path = upload_fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' )   
			
		
	
		
		
			
				
					
					    resp  =  athenad . _do_upload ( item )      resp  =  athenad . _do_upload ( item )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( resp . status_code ,  201 )      assert  resp . status_code  ==  201   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_mock_athena    def  test_upload_file_to_url ( self ,  host ) :   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_uploadFileToUrl ( self ,  host ) :   
			
		
	
		
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog.bz2 ' )      fn  =  self . _create_file ( ' qlog.bz2 ' )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " ,  f " { host } /qlog.bz2 " ,  { } )      resp  =  dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " ,  f " { host } /qlog.bz2 " ,  { } )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( resp [ ' enqueued ' ] ,  1 )      assert  resp [ ' enqueued ' ]  ==  1   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertNotIn ( ' failed ' ,  resp )      assert  ' failed '  not  in  resp   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertLessEqual ( { " path " :  fn ,  " url " :  f " { host } /qlog.bz2 " ,  " headers " :  { } } . items ( ) ,  resp [ ' items ' ] [ 0 ] . items ( ) )      assert  { " path " :  fn ,  " url " :  f " { host } /qlog.bz2 " ,  " headers " :  { } } . items ( )  < =  resp [ ' items ' ] [ 0 ] . items ( )   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertIsNotNone ( resp [ ' items ' ] [ 0 ] . get ( ' id ' ) )      assert  resp [ ' items ' ] [ 0 ] . get ( ' id ' )  is  not  None   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  1 )      assert  athenad . upload_queue . qsize ( )  ==  1   
			
				
				
			
		
	
		
		
			
				
					
					
 
			
				
				
			
		
	
		
		
			
				
					
					  @with_mock_athena    def  test_upload_file_to_url_duplicate ( self ,  host ) :   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_uploadFileToUrl_duplicate ( self ,  host ) :   
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					    self . _create_file ( ' qlog.bz2 ' )      self . _create_file ( ' qlog.bz2 ' )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    url1  =  f " { host } /qlog.bz2?sig=sig1 "      url1  =  f " { host } /qlog.bz2?sig=sig1 "   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -214,14 +213,12 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    # Upload same file again, but with different signature      # Upload same file again, but with different signature   
			
		
	
		
		
			
				
					
					    url2  =  f " { host } /qlog.bz2?sig=sig2 "      url2  =  f " { host } /qlog.bz2?sig=sig2 "   
			
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " ,  url2 ,  { } )      resp  =  dispatcher [ " uploadFileToUrl " ] ( " qlog.bz2 " ,  url2 ,  { } )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( resp , { ' enqueued ' :  0 ,  ' items ' :  [ ] } )      assert  resp  == { ' enqueued ' :  0 ,  ' items ' :  [ ] }   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_mock_athena    def  test_upload_file_to_url_does_not_exist ( self ,  host ) :   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_uploadFileToUrl_does_not_exist ( self ,  host ) :   
			
		
	
		
		
	
		
		
			
				
					
					    not_exists_resp  =  dispatcher [ " uploadFileToUrl " ] ( " does_not_exist.bz2 " ,  " http://localhost:1238 " ,  { } )      not_exists_resp  =  dispatcher [ " uploadFileToUrl " ] ( " does_not_exist.bz2 " ,  " http://localhost:1238 " ,  { } )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( not_exists_resp , { ' enqueued ' :  0 ,  ' items ' :  [ ] ,  ' failed ' :  [ ' does_not_exist.bz2 ' ] } )      assert  not_exists_resp  == { ' enqueued ' :  0 ,  ' items ' :  [ ] ,  ' failed ' :  [ ' does_not_exist.bz2 ' ] }   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_mock_athena   
			
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_upload_handler ( self ,  host ) :    def  test_upload_handler ( self ,  host ) :   
			
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog.bz2 ' )      fn  =  self . _create_file ( ' qlog.bz2 ' )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -233,13 +230,12 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    # TODO: verify that upload actually succeeded      # TODO: verify that upload actually succeeded   
			
		
	
		
		
			
				
					
					    # TODO: also check that end_event and metered network raises AbortTransferException      # TODO: also check that end_event and metered network raises AbortTransferException   
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  0 )      assert  athenad . upload_queue . qsize ( )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @parameterized . expand ( [ ( 500 ,  True ) ,  ( 412 ,  False ) ] )    @pytest . mark . parametrize ( " status,retry " ,  [ ( 500 , True ) ,  ( 412 , False ) ] )   
			
				
				
			
		
	
		
		
			
				
					
					  @with_mock_athena   
			
		
	
		
		
			
				
					
					  @mock . patch ( ' requests.put ' )   
			
		
	
		
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_upload_handler_retry ( self ,  status ,  retry ,  mock_put ,  host ) :    def  test_upload_handler_retry ( self ,  mocker ,  host ,  status ,  retry ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    mock_put  =  mocker . patch ( ' requests.put ' )   
			
		
	
		
		
			
				
					
					    mock_put . return_value . status_code  =  status      mock_put . return_value . status_code  =  status   
			
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog.bz2 ' )      fn  =  self . _create_file ( ' qlog.bz2 ' )   
			
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' ,  allow_cellular = True )      item  =  athenad . UploadItem ( path = fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' ,  allow_cellular = True )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -248,10 +244,10 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    self . _wait_for_upload ( )      self . _wait_for_upload ( )   
			
		
	
		
		
			
				
					
					    time . sleep ( 0.1 )      time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  1  if  retry  else  0 )      assert  athenad . upload_queue . qsize ( )  ==  (  1  if  retry  else  0 )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    if  retry :      if  retry :   
			
		
	
		
		
			
				
					
					      self . assertEqual ( athenad . upload_queue . get ( ) . retry_count ,  1 )        assert  athenad . upload_queue . get ( ) . retry_count  ==  1    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_upload_handler_timeout ( self ) :    def  test_upload_handler_timeout ( self ) :   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -265,33 +261,33 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    time . sleep ( 0.1 )      time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    # Check that upload with retry count exceeded is not put back      # Check that upload with retry count exceeded is not put back   
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  0 )      assert  athenad . upload_queue . qsize ( )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    athenad . upload_queue . put_nowait ( item )      athenad . upload_queue . put_nowait ( item )   
			
		
	
		
		
			
				
					
					    self . _wait_for_upload ( )      self . _wait_for_upload ( )   
			
		
	
		
		
			
				
					
					    time . sleep ( 0.1 )      time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    # Check that upload item was put back in the queue with incremented retry count      # Check that upload item was put back in the queue with incremented retry count   
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  1 )      assert  athenad . upload_queue . qsize ( )  ==  1    
			
				
				
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . get ( ) . retry_count ,  1 )      assert  athenad . upload_queue . get ( ) . retry_count  ==  1    
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_cancelU pload ( self ) :    def  test_cancel_u pload ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = " qlog.bz2 " ,  url = " http://localhost:44444/qlog.bz2 " ,  headers = { } ,      item  =  athenad . UploadItem ( path = " qlog.bz2 " ,  url = " http://localhost:44444/qlog.bz2 " ,  headers = { } ,   
			
		
	
		
		
			
				
					
					                              created_at = int ( time . time ( ) * 1000 ) ,  id = ' id ' ,  allow_cellular = True )                                created_at = int ( time . time ( ) * 1000 ) ,  id = ' id ' ,  allow_cellular = True )   
			
		
	
		
		
			
				
					
					    athenad . upload_queue . put_nowait ( item )      athenad . upload_queue . put_nowait ( item )   
			
		
	
		
		
			
				
					
					    dispatcher [ " cancelUpload " ] ( item . id )      dispatcher [ " cancelUpload " ] ( item . id )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . assertIn ( item . id , athenad . cancelled_uploads )      assert  item . id  in athenad . cancelled_uploads   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . _wait_for_upload ( )      self . _wait_for_upload ( )   
			
		
	
		
		
			
				
					
					    time . sleep ( 0.1 )      time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  0 )      assert  athenad . upload_queue . qsize ( )  ==  0    
			
				
				
			
		
	
		
		
			
				
					
					    self . assertEqual ( len ( athenad . cancelled_uploads ) ,  0 )      assert  len ( athenad . cancelled_uploads )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_cancelE xpiry ( self ) :    def  test_cancel_e xpiry ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    t_future  =  datetime . now ( )  -  timedelta ( days = 40 )      t_future  =  datetime . now ( )  -  timedelta ( days = 40 )   
			
		
	
		
		
			
				
					
					    ts  =  int ( t_future . strftime ( " %s " ) )  *  1000      ts  =  int ( t_future . strftime ( " %s " ) )  *  1000   
			
		
	
		
		
			
				
					
					
 
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -303,15 +299,14 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    self . _wait_for_upload ( )      self . _wait_for_upload ( )   
			
		
	
		
		
			
				
					
					    time . sleep ( 0.1 )      time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  0 )      assert  athenad . upload_queue . qsize ( )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_listUploadQueueE mpty ( self ) :    def  test_list_upload_queue_e mpty ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    items  =  dispatcher [ " listUploadQueue " ] ( )      items  =  dispatcher [ " listUploadQueue " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( len ( items ) ,  0 )      assert  len ( items )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @with_http_server   
			
		
	
		
		
			
				
					
					  @with_upload_handler    @with_upload_handler   
			
		
	
		
		
			
				
					
					  def  test_listUploadQueueC urrent ( self ,  host :  str ) :    def  test_list_upload_queue_c urrent ( self ,  host :  str ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    fn  =  self . _create_file ( ' qlog.bz2 ' )      fn  =  self . _create_file ( ' qlog.bz2 ' )   
			
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' ,  allow_cellular = True )      item  =  athenad . UploadItem ( path = fn ,  url = f " { host } /qlog.bz2 " ,  headers = { } ,  created_at = int ( time . time ( ) * 1000 ) ,  id = ' ' ,  allow_cellular = True )   
			
		
	
		
		
			
				
					
					
 
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -319,22 +314,22 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    self . _wait_for_upload ( )      self . _wait_for_upload ( )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    items  =  dispatcher [ " listUploadQueue " ] ( )      items  =  dispatcher [ " listUploadQueue " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( len ( items ) ,  1 )      assert  len ( items )  ==  1    
			
				
				
			
		
	
		
		
			
				
					
					    self . assertTrue ( items [ 0 ] [ ' current ' ] )      assert  items [ 0 ] [ ' current ' ]   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_listUploadQ ueue ( self ) :    def  test_list_upload_q ueue ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    item  =  athenad . UploadItem ( path = " qlog.bz2 " ,  url = " http://localhost:44444/qlog.bz2 " ,  headers = { } ,      item  =  athenad . UploadItem ( path = " qlog.bz2 " ,  url = " http://localhost:44444/qlog.bz2 " ,  headers = { } ,   
			
		
	
		
		
			
				
					
					                              created_at = int ( time . time ( ) * 1000 ) ,  id = ' id ' ,  allow_cellular = True )                                created_at = int ( time . time ( ) * 1000 ) ,  id = ' id ' ,  allow_cellular = True )   
			
		
	
		
		
			
				
					
					    athenad . upload_queue . put_nowait ( item )      athenad . upload_queue . put_nowait ( item )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    items  =  dispatcher [ " listUploadQueue " ] ( )      items  =  dispatcher [ " listUploadQueue " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( len ( items ) ,  1 )      assert  len ( items )  ==  1    
			
				
				
			
		
	
		
		
			
				
					
					    self . assertDictEqual ( items [ 0 ] , asdict ( item ) )      assert  items [ 0 ]  ==   asdict ( item )   
			
				
				
			
		
	
		
		
			
				
					
					    self . assertFalse ( items [ 0 ] [ ' current ' ] )      assert  not  items [ 0 ] [ ' current ' ]   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    athenad . cancelled_uploads . add ( item . id )      athenad . cancelled_uploads . add ( item . id )   
			
		
	
		
		
			
				
					
					    items  =  dispatcher [ " listUploadQueue " ] ( )      items  =  dispatcher [ " listUploadQueue " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( len ( items ) ,  0 )      assert  len ( items )  ==  0    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_upload_queue_persistence ( self ) :    def  test_upload_queue_persistence ( self ) :   
			
		
	
		
		
			
				
					
					    item1  =  athenad . UploadItem ( path = " _ " ,  url = " _ " ,  headers = { } ,  created_at = int ( time . time ( ) ) ,  id = ' id1 ' )      item1  =  athenad . UploadItem ( path = " _ " ,  url = " _ " ,  headers = { } ,  created_at = int ( time . time ( ) ) ,  id = ' id1 ' )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -353,11 +348,10 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    athenad . upload_queue . queue . clear ( )      athenad . upload_queue . queue . clear ( )   
			
		
	
		
		
			
				
					
					    athenad . UploadQueueCache . initialize ( athenad . upload_queue )      athenad . UploadQueueCache . initialize ( athenad . upload_queue )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    self . assertEqual ( athenad . upload_queue . qsize ( ) ,  1 )      assert  athenad . upload_queue . qsize ( )  ==  1    
			
				
				
			
		
	
		
		
			
				
					
					    self . assertDictEqual ( asdict ( athenad . upload_queue . queue [ - 1 ] ) , asdict ( item1 ) )      assert  asdict ( athenad . upload_queue . queue [ - 1 ] )  ==   asdict ( item1 )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @mock . patch ( ' openpilot.selfdrive.athena.athenad.create_connection ' )    def  test_start_local_proxy ( self ,  mock_create_connection ) :   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_startLocalProxy ( self ,  mock_create_connection ) :   
			
		
	
		
		
	
		
		
			
				
					
					    end_event  =  threading . Event ( )      end_event  =  threading . Event ( )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    ws_recv  =  queue . Queue ( )      ws_recv  =  queue . Queue ( )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -380,21 +374,21 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					      ws_recv . put_nowait ( WebSocketConnectionClosedException ( ) )        ws_recv . put_nowait ( WebSocketConnectionClosedException ( ) )   
			
		
	
		
		
			
				
					
					      socket_thread . join ( )        socket_thread . join ( )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_getSshAuthorizedK eys ( self ) :    def  test_get_ssh_authorized_k eys ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    keys  =  dispatcher [ " getSshAuthorizedKeys " ] ( )      keys  =  dispatcher [ " getSshAuthorizedKeys " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( keys , self . default_params [ " GithubSshKeys " ] . decode ( ' utf-8 ' ) )      assert  keys  == self . default_params [ " GithubSshKeys " ] . decode ( ' utf-8 ' )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_getGithubU sername ( self ) :    def  test_get_github_u sername ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    keys  =  dispatcher [ " getGithubUsername " ] ( )      keys  =  dispatcher [ " getGithubUsername " ] ( )   
			
		
	
		
		
			
				
					
					    self . assertEqual ( keys , self . default_params [ " GithubUsername " ] . decode ( ' utf-8 ' ) )      assert  keys  == self . default_params [ " GithubUsername " ] . decode ( ' utf-8 ' )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_getV ersion ( self ) :    def  test_get_v ersion ( self ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    resp  =  dispatcher [ " getVersion " ] ( )      resp  =  dispatcher [ " getVersion " ] ( )   
			
		
	
		
		
			
				
					
					    keys  =  [ " version " ,  " remote " ,  " branch " ,  " commit " ]      keys  =  [ " version " ,  " remote " ,  " branch " ,  " commit " ]   
			
		
	
		
		
			
				
					
					    self . assertEqual ( list ( resp . keys ( ) ) ,  keys )      assert  list ( resp . keys ( ) )  ==  keys    
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    for  k  in  keys :      for  k  in  keys :   
			
		
	
		
		
			
				
					
					      self . assertIsI nstance( resp [ k ] ,  str ,  f " { k }  is not a string " )        assert  isi nstance( resp [ k ] ,  str ) ,  f " { k }  is not a string "   
			
				
				
			
		
	
		
		
			
				
					
					      self . assertTrue ( len ( resp [ k ] )  >  0 ,  f " { k }  has no value " )        assert  len ( resp [ k ] )  >  0 ,  f " { k }  has no value "   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  test_jsonrpc_handler ( self ) :    def  test_jsonrpc_handler ( self ) :   
			
		
	
		
		
			
				
					
					    end_event  =  threading . Event ( )      end_event  =  threading . Event ( )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -405,15 +399,15 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					      # with params        # with params   
			
		
	
		
		
			
				
					
					      athenad . recv_queue . put_nowait ( json . dumps ( { " method " :  " echo " ,  " params " :  [ " hello " ] ,  " jsonrpc " :  " 2.0 " ,  " id " :  0 } ) )        athenad . recv_queue . put_nowait ( json . dumps ( { " method " :  " echo " ,  " params " :  [ " hello " ] ,  " jsonrpc " :  " 2.0 " ,  " id " :  0 } ) )   
			
		
	
		
		
			
				
					
					      resp  =  athenad . send_queue . get ( timeout = 3 )        resp  =  athenad . send_queue . get ( timeout = 3 )   
			
		
	
		
		
			
				
					
					      self . assertDictEqual ( json . loads ( resp ) , { ' result ' :  ' hello ' ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' } )        assert  json . loads ( resp )  ==   { ' result ' :  ' hello ' ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' }   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					      # without params        # without params   
			
		
	
		
		
			
				
					
					      athenad . recv_queue . put_nowait ( json . dumps ( { " method " :  " getNetworkType " ,  " jsonrpc " :  " 2.0 " ,  " id " :  0 } ) )        athenad . recv_queue . put_nowait ( json . dumps ( { " method " :  " getNetworkType " ,  " jsonrpc " :  " 2.0 " ,  " id " :  0 } ) )   
			
		
	
		
		
			
				
					
					      resp  =  athenad . send_queue . get ( timeout = 3 )        resp  =  athenad . send_queue . get ( timeout = 3 )   
			
		
	
		
		
			
				
					
					      self . assertDictEqual ( json . loads ( resp ) , { ' result ' :  1 ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' } )        assert  json . loads ( resp )  ==   { ' result ' :  1 ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' }   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					      # log forwarding        # log forwarding   
			
		
	
		
		
			
				
					
					      athenad . recv_queue . put_nowait ( json . dumps ( { ' result ' :  { ' success ' :  1 } ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' } ) )        athenad . recv_queue . put_nowait ( json . dumps ( { ' result ' :  { ' success ' :  1 } ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' } ) )   
			
		
	
		
		
			
				
					
					      resp  =  athenad . log_recv_queue . get ( timeout = 3 )        resp  =  athenad . log_recv_queue . get ( timeout = 3 )   
			
		
	
		
		
			
				
					
					      self . assertDictEqual ( json . loads ( resp ) , { ' result ' :  { ' success ' :  1 } ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' } )        assert  json . loads ( resp )  ==   { ' result ' :  { ' success ' :  1 } ,  ' id ' :  0 ,  ' jsonrpc ' :  ' 2.0 ' }   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					    finally :      finally :   
			
		
	
		
		
			
				
					
					      end_event . set ( )        end_event . set ( )   
			
		
	
		
		
			
				
					
					      thread . join ( )        thread . join ( )   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -427,8 +421,4 @@ class TestAthenadMethods(unittest.TestCase): 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    # ensure the list is all logs except most recent      # ensure the list is all logs except most recent   
			
		
	
		
		
			
				
					
					    sl  =  athenad . get_logs_to_send_sorted ( )      sl  =  athenad . get_logs_to_send_sorted ( )   
			
		
	
		
		
			
				
					
					    self . assertListEqual ( sl ,  fl [ : - 1 ] )      assert  sl  ==  fl [ : - 1 ]   
			
				
				
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					if  __name__  ==  ' __main__ ' :  
			
		
	
		
		
			
				
					
					  unittest . main ( )