openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

143 lines
4.4 KiB

#!/usr/bin/env python3
# simple boardd wrapper that updates the panda first
import os
import usb1
import time
import subprocess
from typing import List, NoReturn
from functools import cmp_to_key
from panda import DEFAULT_FW_FN, DEFAULT_H7_FW_FN, MCU_TYPE_H7, Panda, PandaDFU
from common.basedir import BASEDIR
from common.params import Params
from system.hardware import HARDWARE
from selfdrive.swaglog import cloudlog
def get_expected_signature(panda: Panda) -> bytes:
fn = DEFAULT_H7_FW_FN if (panda.get_mcu_type() == MCU_TYPE_H7) else DEFAULT_FW_FN
try:
return Panda.get_signature_from_firmware(fn)
except Exception:
cloudlog.exception("Error computing expected signature")
return b""
def flash_panda(panda_serial: str) -> Panda:
panda = Panda(panda_serial)
fw_signature = get_expected_signature(panda)
internal_panda = panda.is_internal() and not panda.bootstub
panda_version = "bootstub" if panda.bootstub else panda.get_version()
panda_signature = b"" if panda.bootstub else panda.get_signature()
cloudlog.warning(f"Panda {panda_serial} connected, version: {panda_version}, signature {panda_signature.hex()[:16]}, expected {fw_signature.hex()[:16]}")
if panda.bootstub or panda_signature != fw_signature:
cloudlog.info("Panda firmware out of date, update required")
panda.flash()
cloudlog.info("Done flashing")
if panda.bootstub:
bootstub_version = panda.get_version()
cloudlog.info(f"Flashed firmware not booting, flashing development bootloader. Bootstub version: {bootstub_version}")
if internal_panda:
HARDWARE.recover_internal_panda()
panda.recover(reset=(not internal_panda))
cloudlog.info("Done flashing bootloader")
if panda.bootstub:
cloudlog.info("Panda still not booting, exiting")
raise AssertionError
panda_signature = panda.get_signature()
if panda_signature != fw_signature:
cloudlog.info("Version mismatch after flashing, exiting")
raise AssertionError
return panda
def panda_sort_cmp(a: Panda, b: Panda):
a_type = a.get_type()
b_type = b.get_type()
# make sure the internal one is always first
if a.is_internal() and not b.is_internal():
return -1
if not a.is_internal() and b.is_internal():
return 1
# sort by hardware type
if a_type != b_type:
return a_type < b_type
# last resort: sort by serial number
return a.get_usb_serial() < b.get_usb_serial()
def main() -> NoReturn:
first_run = True
params = Params()
while True:
try:
params.delete("PandaSignatures")
# Flash all Pandas in DFU mode
for p in PandaDFU.list():
cloudlog.info(f"Panda in DFU mode found, flashing recovery {p}")
PandaDFU(p).recover()
time.sleep(1)
panda_serials = Panda.list()
if len(panda_serials) == 0:
if first_run:
cloudlog.info("Resetting internal panda")
HARDWARE.reset_internal_panda()
time.sleep(2) # wait to come back up
continue
cloudlog.info(f"{len(panda_serials)} panda(s) found, connecting - {panda_serials}")
# Flash pandas
pandas: List[Panda] = []
for serial in panda_serials:
pandas.append(flash_panda(serial))
# check health for lost heartbeat
for panda in pandas:
health = panda.health()
if health["heartbeat_lost"]:
params.put_bool("PandaHeartbeatLost", True)
cloudlog.event("heartbeat lost", deviceState=health, serial=panda.get_usb_serial())
if first_run:
cloudlog.info(f"Resetting panda {panda.get_usb_serial()}")
panda.reset()
# sort pandas to have deterministic order
pandas.sort(key=cmp_to_key(panda_sort_cmp))
panda_serials = list(map(lambda p: p.get_usb_serial(), pandas)) # type: ignore
# log panda fw versions
params.put("PandaSignatures", b','.join(p.get_signature() for p in pandas))
# close all pandas
for p in pandas:
p.close()
except (usb1.USBErrorNoDevice, usb1.USBErrorPipe):
# a panda was disconnected while setting everything up. let's try again
cloudlog.exception("Panda USB exception while setting up")
continue
first_run = False
# run boardd with all connected serials as arguments
os.environ['MANAGER_DAEMON'] = 'boardd'
os.chdir(os.path.join(BASEDIR, "selfdrive/boardd"))
subprocess.run(["./boardd", *panda_serials], check=True)
if __name__ == "__main__":
main()