You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							145 lines
						
					
					
						
							4.1 KiB
						
					
					
				
			
		
		
	
	
							145 lines
						
					
					
						
							4.1 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| """
 | |
| Usage::
 | |
| 
 | |
|   usage: auth.py [-h] [{google,apple,github,jwt}] [jwt]
 | |
| 
 | |
|   Login to your comma account
 | |
| 
 | |
|   positional arguments:
 | |
|     {google,apple,github,jwt}
 | |
|     jwt
 | |
| 
 | |
|   optional arguments:
 | |
|     -h, --help            show this help message and exit
 | |
| 
 | |
| 
 | |
| Examples::
 | |
| 
 | |
|   ./auth.py  # Log in with google account
 | |
|   ./auth.py github  # Log in with GitHub Account
 | |
|   ./auth.py jwt ey......hw  # Log in with a JWT from https://jwt.comma.ai, for use in CI
 | |
| """
 | |
| 
 | |
| import argparse
 | |
| import sys
 | |
| import pprint
 | |
| import webbrowser
 | |
| from http.server import BaseHTTPRequestHandler, HTTPServer
 | |
| from typing import Any, Dict
 | |
| from urllib.parse import parse_qs, urlencode
 | |
| 
 | |
| from tools.lib.api import APIError, CommaApi, UnauthorizedError
 | |
| from tools.lib.auth_config import set_token, get_token
 | |
| 
 | |
| PORT = 3000
 | |
| 
 | |
| 
 | |
| class ClientRedirectServer(HTTPServer):
 | |
|   query_params: Dict[str, Any] = {}
 | |
| 
 | |
| 
 | |
| class ClientRedirectHandler(BaseHTTPRequestHandler):
 | |
|   def do_GET(self):
 | |
|     if not self.path.startswith('/auth'):
 | |
|       self.send_response(204)
 | |
|       return
 | |
| 
 | |
|     query = self.path.split('?', 1)[-1]
 | |
|     query_parsed = parse_qs(query, keep_blank_values=True)
 | |
|     self.server.query_params = query_parsed
 | |
| 
 | |
|     self.send_response(200)
 | |
|     self.send_header('Content-type', 'text/plain')
 | |
|     self.end_headers()
 | |
|     self.wfile.write(b'Return to the CLI to continue')
 | |
| 
 | |
|   def log_message(self, format, *args):  # pylint: disable=redefined-builtin
 | |
|     pass  # this prevent http server from dumping messages to stdout
 | |
| 
 | |
| 
 | |
| def auth_redirect_link(method):
 | |
|   provider_id = {
 | |
|     'google': 'g',
 | |
|     'apple': 'a',
 | |
|     'github': 'h',
 | |
|   }[method]
 | |
| 
 | |
|   params = {
 | |
|     'redirect_uri': f"https://api.comma.ai/v2/auth/{provider_id}/redirect/",
 | |
|     'state': f'service,localhost:{PORT}',
 | |
|   }
 | |
| 
 | |
|   if method == 'google':
 | |
|     params.update({
 | |
|       'type': 'web_server',
 | |
|       'client_id': '45471411055-ornt4svd2miog6dnopve7qtmh5mnu6id.apps.googleusercontent.com',
 | |
|       'response_type': 'code',
 | |
|       'scope': 'https://www.googleapis.com/auth/userinfo.email',
 | |
|       'prompt': 'select_account',
 | |
|     })
 | |
|     return 'https://accounts.google.com/o/oauth2/auth?' + urlencode(params)
 | |
|   elif method == 'github':
 | |
|     params.update({
 | |
|       'client_id': '28c4ecb54bb7272cb5a4',
 | |
|       'scope': 'read:user',
 | |
|     })
 | |
|     return 'https://github.com/login/oauth/authorize?' + urlencode(params)
 | |
|   elif method == 'apple':
 | |
|     params.update({
 | |
|       'client_id': 'ai.comma.login',
 | |
|       'response_type': 'code',
 | |
|       'response_mode': 'form_post',
 | |
|       'scope': 'name email',
 | |
|     })
 | |
|     return 'https://appleid.apple.com/auth/authorize?' + urlencode(params)
 | |
|   else:
 | |
|     raise NotImplementedError(f"no redirect implemented for method {method}")
 | |
| 
 | |
| 
 | |
| def login(method):
 | |
|   oauth_uri = auth_redirect_link(method)
 | |
| 
 | |
|   web_server = ClientRedirectServer(('localhost', PORT), ClientRedirectHandler)
 | |
|   print(f'To sign in, use your browser and navigate to {oauth_uri}')
 | |
|   webbrowser.open(oauth_uri, new=2)
 | |
| 
 | |
|   while True:
 | |
|     web_server.handle_request()
 | |
|     if 'code' in web_server.query_params:
 | |
|       break
 | |
|     elif 'error' in web_server.query_params:
 | |
|       print('Authentication Error: "{}". Description: "{}" '.format(
 | |
|         web_server.query_params['error'],
 | |
|         web_server.query_params.get('error_description')), file=sys.stderr)
 | |
|       break
 | |
| 
 | |
|   try:
 | |
|     auth_resp = CommaApi().post('v2/auth/', data={'code': web_server.query_params['code'], 'provider': web_server.query_params['provider']})
 | |
|     set_token(auth_resp['access_token'])
 | |
|   except APIError as e:
 | |
|     print(f'Authentication Error: {e}', file=sys.stderr)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|   parser = argparse.ArgumentParser(description='Login to your comma account')
 | |
|   parser.add_argument('method', default='google', const='google', nargs='?', choices=['google', 'apple', 'github', 'jwt'])
 | |
|   parser.add_argument('jwt', nargs='?')
 | |
| 
 | |
|   args = parser.parse_args()
 | |
|   if args.method == 'jwt':
 | |
|     if args.jwt is None:
 | |
|       print("method JWT selected, but no JWT was provided")
 | |
|       exit(1)
 | |
| 
 | |
|     set_token(args.jwt)
 | |
|   else:
 | |
|     login(args.method)
 | |
| 
 | |
|   try:
 | |
|     me = CommaApi(token=get_token()).get('/v1/me')
 | |
|     print("Authenticated!")
 | |
|     pprint.pprint(me)
 | |
|   except UnauthorizedError:
 | |
|     print("Got invalid JWT")
 | |
|     exit(1)
 | |
| 
 |