|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import os
 | 
					
						
							|  |  |  | from datetime import datetime, timedelta
 | 
					
						
							|  |  |  | from functools import lru_cache
 | 
					
						
							|  |  |  | from pathlib import Path
 | 
					
						
							|  |  |  | from typing import IO, Union
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | DATA_CI_ACCOUNT = "commadataci"
 | 
					
						
							|  |  |  | DATA_CI_ACCOUNT_URL = f"https://{DATA_CI_ACCOUNT}.blob.core.windows.net"
 | 
					
						
							|  |  |  | OPENPILOT_CI_CONTAINER = "openpilotci"
 | 
					
						
							|  |  |  | DATA_CI_CONTAINER = "commadataci"
 | 
					
						
							|  |  |  | BASE_URL = f"{DATA_CI_ACCOUNT_URL}/{OPENPILOT_CI_CONTAINER}/"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | TOKEN_PATH = Path("/data/azure_token")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_url(route_name: str, segment_num, log_type="rlog") -> str:
 | 
					
						
							|  |  |  |   ext = "hevc" if log_type.endswith('camera') else "bz2"
 | 
					
						
							|  |  |  |   return BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @lru_cache
 | 
					
						
							|  |  |  | def get_azure_credential():
 | 
					
						
							|  |  |  |   if "AZURE_TOKEN" in os.environ:
 | 
					
						
							|  |  |  |     return os.environ["AZURE_TOKEN"]
 | 
					
						
							|  |  |  |   elif TOKEN_PATH.is_file():
 | 
					
						
							|  |  |  |     return TOKEN_PATH.read_text().strip()
 | 
					
						
							|  |  |  |   else:
 | 
					
						
							|  |  |  |     from azure.identity import AzureCliCredential
 | 
					
						
							|  |  |  |     return AzureCliCredential()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @lru_cache
 | 
					
						
							|  |  |  | def get_container_sas(account_name: str, container_name: str):
 | 
					
						
							|  |  |  |   from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
 | 
					
						
							|  |  |  |   start_time = datetime.utcnow()
 | 
					
						
							|  |  |  |   expiry_time = start_time + timedelta(hours=1)
 | 
					
						
							|  |  |  |   blob_service = BlobServiceClient(
 | 
					
						
							|  |  |  |     account_url=f"https://{account_name}.blob.core.windows.net",
 | 
					
						
							|  |  |  |     credential=get_azure_credential(),
 | 
					
						
							|  |  |  |   )
 | 
					
						
							|  |  |  |   return generate_container_sas(
 | 
					
						
							|  |  |  |     account_name,
 | 
					
						
							|  |  |  |     container_name,
 | 
					
						
							|  |  |  |     user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
 | 
					
						
							|  |  |  |     permission=ContainerSasPermissions(read=True, write=True, list=True),
 | 
					
						
							|  |  |  |     expiry=expiry_time,
 | 
					
						
							|  |  |  |   )
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def upload_bytes(data: Union[bytes, IO], blob_name: str) -> str:
 | 
					
						
							|  |  |  |   from azure.storage.blob import BlobClient
 | 
					
						
							|  |  |  |   blob = BlobClient(
 | 
					
						
							|  |  |  |     account_url=DATA_CI_ACCOUNT_URL,
 | 
					
						
							|  |  |  |     container_name=OPENPILOT_CI_CONTAINER,
 | 
					
						
							|  |  |  |     blob_name=blob_name,
 | 
					
						
							|  |  |  |     credential=get_azure_credential(),
 | 
					
						
							|  |  |  |     overwrite=False,
 | 
					
						
							|  |  |  |   )
 | 
					
						
							|  |  |  |   blob.upload_blob(data)
 | 
					
						
							|  |  |  |   return BASE_URL + blob_name
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def upload_file(path: Union[str, os.PathLike], blob_name: str) -> str:
 | 
					
						
							|  |  |  |   with open(path, "rb") as f:
 | 
					
						
							|  |  |  |     return upload_bytes(f, blob_name)
 |