cleanup azure handling (#31034)
	
		
	
				
					
				
			* wip cleanup
* Wip
* fixes
* fix
* cleanup
* keep this for now
* dest
old-commit-hash: 3f0b56b364
			
			
				vw-mqb-aeb
			
			
		
							parent
							
								
									7f09117bf0
								
							
						
					
					
						commit
						09b95b715c
					
				
				 4 changed files with 102 additions and 87 deletions
			
			
		@ -0,0 +1,74 @@ | 
				
			|||||||
 | 
					import os | 
				
			||||||
 | 
					from datetime import datetime, timedelta | 
				
			||||||
 | 
					from functools import lru_cache | 
				
			||||||
 | 
					from pathlib import Path | 
				
			||||||
 | 
					from typing import IO, Union | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from azure.storage.blob import ContainerClient | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TOKEN_PATH = Path("/data/azure_token") | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@lru_cache | 
				
			||||||
 | 
					def get_azure_credential(): | 
				
			||||||
 | 
					  if "AZURE_TOKEN" in os.environ: | 
				
			||||||
 | 
					    return os.environ["AZURE_TOKEN"] | 
				
			||||||
 | 
					  elif TOKEN_PATH.is_file(): | 
				
			||||||
 | 
					    return TOKEN_PATH.read_text().strip() | 
				
			||||||
 | 
					  else: | 
				
			||||||
 | 
					    from azure.identity import AzureCliCredential | 
				
			||||||
 | 
					    return AzureCliCredential() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@lru_cache | 
				
			||||||
 | 
					def get_container_sas(account_name: str, container_name: str): | 
				
			||||||
 | 
					  from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas | 
				
			||||||
 | 
					  start_time = datetime.utcnow() | 
				
			||||||
 | 
					  expiry_time = start_time + timedelta(hours=1) | 
				
			||||||
 | 
					  blob_service = BlobServiceClient( | 
				
			||||||
 | 
					    account_url=f"https://{account_name}.blob.core.windows.net", | 
				
			||||||
 | 
					    credential=get_azure_credential(), | 
				
			||||||
 | 
					  ) | 
				
			||||||
 | 
					  return generate_container_sas( | 
				
			||||||
 | 
					    account_name, | 
				
			||||||
 | 
					    container_name, | 
				
			||||||
 | 
					    user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time), | 
				
			||||||
 | 
					    permission=ContainerSasPermissions(read=True, write=True, list=True), | 
				
			||||||
 | 
					    expiry=expiry_time, | 
				
			||||||
 | 
					  ) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class AzureContainer: | 
				
			||||||
 | 
					  def __init__(self, account, container): | 
				
			||||||
 | 
					    self.ACCOUNT = account | 
				
			||||||
 | 
					    self.CONTAINER = container | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @property | 
				
			||||||
 | 
					  def ACCOUNT_URL(self) -> str: | 
				
			||||||
 | 
					    return f"https://{self.ACCOUNT}.blob.core.windows.net" | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  @property | 
				
			||||||
 | 
					  def BASE_URL(self) -> str: | 
				
			||||||
 | 
					    return f"{self.ACCOUNT_URL}/{self.CONTAINER}/" | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def get_client_and_key(self): | 
				
			||||||
 | 
					    client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential()) | 
				
			||||||
 | 
					    key = get_container_sas(self.ACCOUNT, self.CONTAINER) | 
				
			||||||
 | 
					    return client, key | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def get_url(self, route_name: str, segment_num, log_type="rlog") -> str: | 
				
			||||||
 | 
					    ext = "hevc" if log_type.endswith('camera') else "bz2" | 
				
			||||||
 | 
					    return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}" | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def upload_bytes(self, data: Union[bytes, IO], blob_name: str) -> str: | 
				
			||||||
 | 
					    from azure.storage.blob import BlobClient | 
				
			||||||
 | 
					    blob = BlobClient( | 
				
			||||||
 | 
					      account_url=self.ACCOUNT_URL, | 
				
			||||||
 | 
					      container_name=self.CONTAINER, | 
				
			||||||
 | 
					      blob_name=blob_name, | 
				
			||||||
 | 
					      credential=get_azure_credential(), | 
				
			||||||
 | 
					      overwrite=False, | 
				
			||||||
 | 
					    ) | 
				
			||||||
 | 
					    blob.upload_blob(data) | 
				
			||||||
 | 
					    return self.BASE_URL + blob_name | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def upload_file(self, path: Union[str, os.PathLike], blob_name: str) -> str: | 
				
			||||||
 | 
					    with open(path, "rb") as f: | 
				
			||||||
 | 
					      return self.upload_bytes(f, blob_name) | 
				
			||||||
@ -1,66 +1,12 @@ | 
				
			|||||||
#!/usr/bin/env python3 | 
					from openpilot.tools.lib.openpilotcontainers import OpenpilotCIContainer | 
				
			||||||
import os | 
					 | 
				
			||||||
from datetime import datetime, timedelta | 
					 | 
				
			||||||
from functools import lru_cache | 
					 | 
				
			||||||
from pathlib import Path | 
					 | 
				
			||||||
from typing import IO, Union | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
DATA_CI_ACCOUNT = "commadataci" | 
					def get_url(*args, **kwargs): | 
				
			||||||
DATA_CI_ACCOUNT_URL = f"https://{DATA_CI_ACCOUNT}.blob.core.windows.net" | 
					  return OpenpilotCIContainer.get_url(*args, **kwargs) | 
				
			||||||
OPENPILOT_CI_CONTAINER = "openpilotci" | 
					 | 
				
			||||||
DATA_CI_CONTAINER = "commadataci" | 
					 | 
				
			||||||
BASE_URL = f"{DATA_CI_ACCOUNT_URL}/{OPENPILOT_CI_CONTAINER}/" | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
TOKEN_PATH = Path("/data/azure_token") | 
					def upload_file(*args, **kwargs): | 
				
			||||||
 | 
					  return OpenpilotCIContainer.upload_file(*args, **kwargs) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def upload_bytes(*args, **kwargs): | 
				
			||||||
 | 
					  return OpenpilotCIContainer.upload_bytes(*args, **kwargs) | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_url(route_name: str, segment_num, log_type="rlog") -> str: | 
					BASE_URL = OpenpilotCIContainer.BASE_URL | 
				
			||||||
  ext = "hevc" if log_type.endswith('camera') else "bz2" | 
					 | 
				
			||||||
  return BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}" | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@lru_cache | 
					 | 
				
			||||||
def get_azure_credential(): | 
					 | 
				
			||||||
  if "AZURE_TOKEN" in os.environ: | 
					 | 
				
			||||||
    return os.environ["AZURE_TOKEN"] | 
					 | 
				
			||||||
  elif TOKEN_PATH.is_file(): | 
					 | 
				
			||||||
    return TOKEN_PATH.read_text().strip() | 
					 | 
				
			||||||
  else: | 
					 | 
				
			||||||
    from azure.identity import AzureCliCredential | 
					 | 
				
			||||||
    return AzureCliCredential() | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
@lru_cache | 
					 | 
				
			||||||
def get_container_sas(account_name: str, container_name: str): | 
					 | 
				
			||||||
  from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas | 
					 | 
				
			||||||
  start_time = datetime.utcnow() | 
					 | 
				
			||||||
  expiry_time = start_time + timedelta(hours=1) | 
					 | 
				
			||||||
  blob_service = BlobServiceClient( | 
					 | 
				
			||||||
    account_url=f"https://{account_name}.blob.core.windows.net", | 
					 | 
				
			||||||
    credential=get_azure_credential(), | 
					 | 
				
			||||||
  ) | 
					 | 
				
			||||||
  return generate_container_sas( | 
					 | 
				
			||||||
    account_name, | 
					 | 
				
			||||||
    container_name, | 
					 | 
				
			||||||
    user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time), | 
					 | 
				
			||||||
    permission=ContainerSasPermissions(read=True, write=True, list=True), | 
					 | 
				
			||||||
    expiry=expiry_time, | 
					 | 
				
			||||||
  ) | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def upload_bytes(data: Union[bytes, IO], blob_name: str) -> str: | 
					 | 
				
			||||||
  from azure.storage.blob import BlobClient | 
					 | 
				
			||||||
  blob = BlobClient( | 
					 | 
				
			||||||
    account_url=DATA_CI_ACCOUNT_URL, | 
					 | 
				
			||||||
    container_name=OPENPILOT_CI_CONTAINER, | 
					 | 
				
			||||||
    blob_name=blob_name, | 
					 | 
				
			||||||
    credential=get_azure_credential(), | 
					 | 
				
			||||||
    overwrite=False, | 
					 | 
				
			||||||
  ) | 
					 | 
				
			||||||
  blob.upload_blob(data) | 
					 | 
				
			||||||
  return BASE_URL + blob_name | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def upload_file(path: Union[str, os.PathLike], blob_name: str) -> str: | 
					 | 
				
			||||||
  with open(path, "rb") as f: | 
					 | 
				
			||||||
    return upload_bytes(f, blob_name) | 
					 | 
				
			||||||
 | 
				
			|||||||
@ -0,0 +1,6 @@ | 
				
			|||||||
 | 
					#!/usr/bin/env python3 | 
				
			||||||
 | 
					from openpilot.tools.lib.azure_container import AzureContainer | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					OpenpilotCIContainer = AzureContainer("commadataci", "openpilotci") | 
				
			||||||
 | 
					DataCIContainer = AzureContainer("commadataci", "commadataci") | 
				
			||||||
 | 
					DataProdContainer = AzureContainer("commadata2", "commadata2") | 
				
			||||||
					Loading…
					
					
				
		Reference in new issue