You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							139 lines
						
					
					
						
							4.0 KiB
						
					
					
				
			
		
		
	
	
							139 lines
						
					
					
						
							4.0 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import os
 | 
						|
import subprocess
 | 
						|
from typing import List, Optional
 | 
						|
from functools import lru_cache
 | 
						|
 | 
						|
from common.basedir import BASEDIR
 | 
						|
from selfdrive.swaglog import cloudlog
 | 
						|
 | 
						|
TESTED_BRANCHES = ['devel', 'release2-staging', 'release3-staging', 'dashcam-staging', 'release2', 'release3', 'dashcam']
 | 
						|
 | 
						|
training_version: bytes = b"0.2.0"
 | 
						|
terms_version: bytes = b"2"
 | 
						|
 | 
						|
 | 
						|
def cache(user_function, /):
 | 
						|
  return lru_cache(maxsize=None)(user_function)
 | 
						|
 | 
						|
 | 
						|
def run_cmd(cmd: List[str]) -> str:
 | 
						|
  return subprocess.check_output(cmd, encoding='utf8').strip()
 | 
						|
 | 
						|
 | 
						|
def run_cmd_default(cmd: List[str], default: Optional[str] = None) -> Optional[str]:
 | 
						|
  try:
 | 
						|
    return run_cmd(cmd)
 | 
						|
  except subprocess.CalledProcessError:
 | 
						|
    return default
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_commit(branch: str = "HEAD", default: Optional[str] = None) -> Optional[str]:
 | 
						|
  return run_cmd_default(["git", "rev-parse", branch], default=default)
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_short_branch(default: Optional[str] = None) -> Optional[str]:
 | 
						|
  return run_cmd_default(["git", "rev-parse", "--abbrev-ref", "HEAD"], default=default)
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_branch(default: Optional[str] = None) -> Optional[str]:
 | 
						|
  return run_cmd_default(["git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}"], default=default)
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_origin(default: Optional[str] = None) -> Optional[str]:
 | 
						|
  try:
 | 
						|
    local_branch = run_cmd(["git", "name-rev", "--name-only", "HEAD"])
 | 
						|
    tracking_remote = run_cmd(["git", "config", "branch." + local_branch + ".remote"])
 | 
						|
    return run_cmd(["git", "config", "remote." + tracking_remote + ".url"])
 | 
						|
  except subprocess.CalledProcessError:  # Not on a branch, fallback
 | 
						|
    return run_cmd_default(["git", "config", "--get", "remote.origin.url"], default=default)
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_normalized_origin(default: Optional[str] = None) -> Optional[str]:
 | 
						|
  origin = get_origin()
 | 
						|
 | 
						|
  if origin is None:
 | 
						|
    return default
 | 
						|
 | 
						|
  return origin.replace("git@", "", 1) \
 | 
						|
               .replace(".git", "", 1) \
 | 
						|
               .replace("https://", "", 1) \
 | 
						|
               .replace(":", "/", 1)
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def get_version() -> str:
 | 
						|
  with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf:
 | 
						|
    version = _versionf.read().split('"')[1]
 | 
						|
  return version
 | 
						|
 | 
						|
@cache
 | 
						|
def get_short_version() -> str:
 | 
						|
  return get_version().split('-')[0]
 | 
						|
 | 
						|
@cache
 | 
						|
def is_prebuilt() -> bool:
 | 
						|
  return os.path.exists(os.path.join(BASEDIR, 'prebuilt'))
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def is_comma_remote() -> bool:
 | 
						|
  # note to fork maintainers, this is used for release metrics. please do not
 | 
						|
  # touch this to get rid of the orange startup alert. there's better ways to do that
 | 
						|
  origin = get_origin()
 | 
						|
  if origin is None:
 | 
						|
    return False
 | 
						|
 | 
						|
  return origin.startswith('git@github.com:commaai') or origin.startswith('https://github.com/commaai')
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def is_tested_branch() -> bool:
 | 
						|
  return get_short_branch() in TESTED_BRANCHES
 | 
						|
 | 
						|
 | 
						|
@cache
 | 
						|
def is_dirty() -> bool:
 | 
						|
  origin = get_origin()
 | 
						|
  branch = get_branch()
 | 
						|
  if (origin is None) or (branch is None):
 | 
						|
    return True
 | 
						|
 | 
						|
  dirty = False
 | 
						|
  try:
 | 
						|
    # Actually check dirty files
 | 
						|
    if not is_prebuilt():
 | 
						|
      # This is needed otherwise touched files might show up as modified
 | 
						|
      try:
 | 
						|
        subprocess.check_call(["git", "update-index", "--refresh"])
 | 
						|
      except subprocess.CalledProcessError:
 | 
						|
        pass
 | 
						|
 | 
						|
      dirty = (subprocess.call(["git", "diff-index", "--quiet", branch, "--"]) != 0)
 | 
						|
  except subprocess.CalledProcessError:
 | 
						|
    cloudlog.exception("git subprocess failed while checking dirty")
 | 
						|
    dirty = True
 | 
						|
 | 
						|
  return dirty
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  from common.params import Params
 | 
						|
 | 
						|
  params = Params()
 | 
						|
  params.put("TermsVersion", terms_version)
 | 
						|
  params.put("TrainingVersion", training_version)
 | 
						|
 | 
						|
  print(f"Dirty: {is_dirty()}")
 | 
						|
  print(f"Version: {get_version()}")
 | 
						|
  print(f"Short version: {get_short_version()}")
 | 
						|
  print(f"Origin: {get_origin()}")
 | 
						|
  print(f"Normalized origin: {get_normalized_origin()}")
 | 
						|
  print(f"Branch: {get_branch()}")
 | 
						|
  print(f"Short branch: {get_short_branch()}")
 | 
						|
  print(f"Prebuilt: {is_prebuilt()}")
 | 
						|
 |