You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							74 lines
						
					
					
						
							2.4 KiB
						
					
					
				
			
		
		
	
	
							74 lines
						
					
					
						
							2.4 KiB
						
					
					
				| import os
 | |
| from datetime import datetime, timedelta
 | |
| from functools import lru_cache
 | |
| from pathlib import Path
 | |
| from typing import IO, Union
 | |
| 
 | |
| 
 | |
| TOKEN_PATH = Path("/data/azure_token")
 | |
| 
 | |
| @lru_cache
 | |
| def get_azure_credential():
 | |
|   if "AZURE_TOKEN" in os.environ:
 | |
|     return os.environ["AZURE_TOKEN"]
 | |
|   elif TOKEN_PATH.is_file():
 | |
|     return TOKEN_PATH.read_text().strip()
 | |
|   else:
 | |
|     from azure.identity import AzureCliCredential
 | |
|     return AzureCliCredential()
 | |
| 
 | |
| @lru_cache
 | |
| def get_container_sas(account_name: str, container_name: str):
 | |
|   from azure.storage.blob import BlobServiceClient, ContainerSasPermissions, generate_container_sas
 | |
|   start_time = datetime.utcnow()
 | |
|   expiry_time = start_time + timedelta(hours=1)
 | |
|   blob_service = BlobServiceClient(
 | |
|     account_url=f"https://{account_name}.blob.core.windows.net",
 | |
|     credential=get_azure_credential(),
 | |
|   )
 | |
|   return generate_container_sas(
 | |
|     account_name,
 | |
|     container_name,
 | |
|     user_delegation_key=blob_service.get_user_delegation_key(start_time, expiry_time),
 | |
|     permission=ContainerSasPermissions(read=True, write=True, list=True),
 | |
|     expiry=expiry_time,
 | |
|   )
 | |
| 
 | |
| class AzureContainer:
 | |
|   def __init__(self, account, container):
 | |
|     self.ACCOUNT = account
 | |
|     self.CONTAINER = container
 | |
| 
 | |
|   @property
 | |
|   def ACCOUNT_URL(self) -> str:
 | |
|     return f"https://{self.ACCOUNT}.blob.core.windows.net"
 | |
| 
 | |
|   @property
 | |
|   def BASE_URL(self) -> str:
 | |
|     return f"{self.ACCOUNT_URL}/{self.CONTAINER}/"
 | |
| 
 | |
|   def get_client_and_key(self):
 | |
|     from azure.storage.blob import ContainerClient
 | |
|     client = ContainerClient(self.ACCOUNT_URL, self.CONTAINER, credential=get_azure_credential())
 | |
|     key = get_container_sas(self.ACCOUNT, self.CONTAINER)
 | |
|     return client, key
 | |
| 
 | |
|   def get_url(self, route_name: str, segment_num, log_type="rlog") -> str:
 | |
|     ext = "hevc" if log_type.endswith('camera') else "bz2"
 | |
|     return self.BASE_URL + f"{route_name.replace('|', '/')}/{segment_num}/{log_type}.{ext}"
 | |
| 
 | |
|   def upload_bytes(self, data: Union[bytes, IO], blob_name: str) -> str:
 | |
|     from azure.storage.blob import BlobClient
 | |
|     blob = BlobClient(
 | |
|       account_url=self.ACCOUNT_URL,
 | |
|       container_name=self.CONTAINER,
 | |
|       blob_name=blob_name,
 | |
|       credential=get_azure_credential(),
 | |
|       overwrite=False,
 | |
|     )
 | |
|     blob.upload_blob(data)
 | |
|     return self.BASE_URL + blob_name
 | |
| 
 | |
|   def upload_file(self, path: Union[str, os.PathLike], blob_name: str) -> str:
 | |
|     with open(path, "rb") as f:
 | |
|       return self.upload_bytes(f, blob_name)
 | |
| 
 |