|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | # simple boardd wrapper that updates the panda first
 | 
					
						
							|  |  |  | import os
 | 
					
						
							|  |  |  | import usb1
 | 
					
						
							|  |  |  | import time
 | 
					
						
							|  |  |  | import subprocess
 | 
					
						
							|  |  |  | from typing import List, NoReturn
 | 
					
						
							|  |  |  | from functools import cmp_to_key
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from panda import Panda, PandaDFU
 | 
					
						
							|  |  |  | from common.basedir import BASEDIR
 | 
					
						
							|  |  |  | from common.params import Params
 | 
					
						
							|  |  |  | from system.hardware import HARDWARE
 | 
					
						
							|  |  |  | from system.swaglog import cloudlog
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_expected_signature(panda: Panda) -> bytes:
 | 
					
						
							|  |  |  |   try:
 | 
					
						
							|  |  |  |     return Panda.get_signature_from_firmware(panda.get_mcu_type().config.app_path)
 | 
					
						
							|  |  |  |   except Exception:
 | 
					
						
							|  |  |  |     cloudlog.exception("Error computing expected signature")
 | 
					
						
							|  |  |  |     return b""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def flash_panda(panda_serial: str) -> Panda:
 | 
					
						
							|  |  |  |   panda = Panda(panda_serial)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   fw_signature = get_expected_signature(panda)
 | 
					
						
							|  |  |  |   internal_panda = panda.is_internal() and not panda.bootstub
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   panda_version = "bootstub" if panda.bootstub else panda.get_version()
 | 
					
						
							|  |  |  |   panda_signature = b"" if panda.bootstub else panda.get_signature()
 | 
					
						
							|  |  |  |   cloudlog.warning(f"Panda {panda_serial} connected, version: {panda_version}, signature {panda_signature.hex()[:16]}, expected {fw_signature.hex()[:16]}")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if panda.bootstub or panda_signature != fw_signature:
 | 
					
						
							|  |  |  |     cloudlog.info("Panda firmware out of date, update required")
 | 
					
						
							|  |  |  |     panda.flash()
 | 
					
						
							|  |  |  |     cloudlog.info("Done flashing")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if panda.bootstub:
 | 
					
						
							|  |  |  |     bootstub_version = panda.get_version()
 | 
					
						
							|  |  |  |     cloudlog.info(f"Flashed firmware not booting, flashing development bootloader. Bootstub version: {bootstub_version}")
 | 
					
						
							|  |  |  |     if internal_panda:
 | 
					
						
							|  |  |  |       HARDWARE.recover_internal_panda()
 | 
					
						
							|  |  |  |     panda.recover(reset=(not internal_panda))
 | 
					
						
							|  |  |  |     cloudlog.info("Done flashing bootloader")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   if panda.bootstub:
 | 
					
						
							|  |  |  |     cloudlog.info("Panda still not booting, exiting")
 | 
					
						
							|  |  |  |     raise AssertionError
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   panda_signature = panda.get_signature()
 | 
					
						
							|  |  |  |   if panda_signature != fw_signature:
 | 
					
						
							|  |  |  |     cloudlog.info("Version mismatch after flashing, exiting")
 | 
					
						
							|  |  |  |     raise AssertionError
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   return panda
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def panda_sort_cmp(a: Panda, b: Panda):
 | 
					
						
							|  |  |  |   a_type = a.get_type()
 | 
					
						
							|  |  |  |   b_type = b.get_type()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # make sure the internal one is always first
 | 
					
						
							|  |  |  |   if a.is_internal() and not b.is_internal():
 | 
					
						
							|  |  |  |     return -1
 | 
					
						
							|  |  |  |   if not a.is_internal() and b.is_internal():
 | 
					
						
							|  |  |  |     return 1
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # sort by hardware type
 | 
					
						
							|  |  |  |   if a_type != b_type:
 | 
					
						
							|  |  |  |     return a_type < b_type
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   # last resort: sort by serial number
 | 
					
						
							|  |  |  |   return a.get_usb_serial() < b.get_usb_serial()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main() -> NoReturn:
 | 
					
						
							|  |  |  |   first_run = True
 | 
					
						
							|  |  |  |   params = Params()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   while True:
 | 
					
						
							|  |  |  |     try:
 | 
					
						
							|  |  |  |       params.remove("PandaSignatures")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # Flash all Pandas in DFU mode
 | 
					
						
							|  |  |  |       for serial in PandaDFU.list():
 | 
					
						
							|  |  |  |         cloudlog.info(f"Panda in DFU mode found, flashing recovery {serial}")
 | 
					
						
							|  |  |  |         PandaDFU(serial).recover()
 | 
					
						
							|  |  |  |       time.sleep(1)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       panda_serials = Panda.list()
 | 
					
						
							|  |  |  |       if len(panda_serials) == 0:
 | 
					
						
							|  |  |  |         if first_run:
 | 
					
						
							|  |  |  |           cloudlog.info("Resetting internal panda")
 | 
					
						
							|  |  |  |           HARDWARE.reset_internal_panda()
 | 
					
						
							|  |  |  |           time.sleep(2)  # wait to come back up
 | 
					
						
							|  |  |  |         continue
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # Flash pandas
 | 
					
						
							|  |  |  |       pandas: List[Panda] = []
 | 
					
						
							|  |  |  |       for serial in panda_serials:
 | 
					
						
							|  |  |  |         pandas.append(flash_panda(serial))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # check health for lost heartbeat
 | 
					
						
							|  |  |  |       for panda in pandas:
 | 
					
						
							|  |  |  |         health = panda.health()
 | 
					
						
							|  |  |  |         if health["heartbeat_lost"]:
 | 
					
						
							|  |  |  |           params.put_bool("PandaHeartbeatLost", True)
 | 
					
						
							|  |  |  |           cloudlog.event("heartbeat lost", deviceState=health, serial=panda.get_usb_serial())
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if first_run:
 | 
					
						
							|  |  |  |           cloudlog.info(f"Resetting panda {panda.get_usb_serial()}")
 | 
					
						
							|  |  |  |           panda.reset()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # sort pandas to have deterministic order
 | 
					
						
							|  |  |  |       pandas.sort(key=cmp_to_key(panda_sort_cmp))
 | 
					
						
							|  |  |  |       panda_serials = list(map(lambda p: p.get_usb_serial(), pandas))  # type: ignore
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # log panda fw versions
 | 
					
						
							|  |  |  |       params.put("PandaSignatures", b','.join(p.get_signature() for p in pandas))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       # close all pandas
 | 
					
						
							|  |  |  |       for p in pandas:
 | 
					
						
							|  |  |  |         p.close()
 | 
					
						
							|  |  |  |     except (usb1.USBErrorNoDevice, usb1.USBErrorPipe):
 | 
					
						
							|  |  |  |       # a panda was disconnected while setting everything up. let's try again
 | 
					
						
							|  |  |  |       cloudlog.exception("Panda USB exception while setting up")
 | 
					
						
							|  |  |  |       continue
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     first_run = False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # run boardd with all connected serials as arguments
 | 
					
						
							|  |  |  |     os.environ['MANAGER_DAEMON'] = 'boardd'
 | 
					
						
							|  |  |  |     os.chdir(os.path.join(BASEDIR, "selfdrive/boardd"))
 | 
					
						
							|  |  |  |     subprocess.run(["./boardd", *panda_serials], check=True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   main()
 |